Support common-dataservice 2.2.x
[federation.git] / gateway / src / main / java / org / acumos / federation / gateway / adapter / PeerGateway.java
1 /*-
2  * ===============LICENSE_START=======================================================
3  * Acumos
4  * ===================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6  * ===================================================================================
7  * This Acumos software file is distributed by AT&T and Tech Mahindra
8  * under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * This file is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ===============LICENSE_END=========================================================
19  */
20
21 package org.acumos.federation.gateway.adapter;
22
23 import java.io.IOException;
24 import java.lang.invoke.MethodHandles;
25 import java.time.Instant;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.LinkedHashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37
38 import org.acumos.cds.client.ICommonDataServiceRestClient;
39 import org.acumos.cds.domain.MLPCatalog;
40 import org.acumos.cds.domain.MLPPeer;
41 import org.acumos.cds.domain.MLPPeerSubscription;
42 import org.acumos.cds.domain.MLPRevCatDescription;
43 import org.acumos.cds.domain.MLPSolution;
44 import org.acumos.cds.domain.MLPSolutionRevision;
45 import org.acumos.cds.domain.MLPTag;
46 import org.acumos.federation.gateway.cds.Artifact;
47 import org.acumos.federation.gateway.cds.Document;
48 import org.acumos.federation.gateway.cds.PeerSubscription;
49 import org.acumos.federation.gateway.cds.Solution;
50 import org.acumos.federation.gateway.cds.SolutionRevision;
51 import org.acumos.federation.gateway.cds.TimestampedEntity;
52 import org.acumos.federation.gateway.common.API;
53 import org.acumos.federation.gateway.common.Clients;
54 import org.acumos.federation.gateway.common.FederationClient;
55 import org.acumos.federation.gateway.common.FederationClient.StreamingResource;
56 import org.acumos.federation.gateway.common.FederationException;
57 import org.acumos.federation.gateway.common.JsonResponse;
58 import org.acumos.federation.gateway.config.GatewayCondition;
59 import org.acumos.federation.gateway.event.PeerSubscriptionEvent;
60 import org.acumos.federation.gateway.service.CatalogService;
61 import org.acumos.federation.gateway.service.CatalogServiceConfiguration;
62 import org.acumos.federation.gateway.service.ContentService;
63 import org.acumos.federation.gateway.service.PeerSubscriptionService;
64 import org.acumos.federation.gateway.service.ServiceContext;
65 import org.acumos.federation.gateway.service.ServiceException;
66 import org.acumos.federation.gateway.util.Utils;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import org.springframework.beans.factory.annotation.Autowired;
70 import org.springframework.beans.factory.annotation.Qualifier;
71 import org.springframework.beans.factory.annotation.Value;
72 import org.springframework.context.annotation.Conditional;
73 import org.springframework.context.annotation.Scope;
74 import org.springframework.context.event.EventListener;
75 import org.springframework.core.task.TaskExecutor;
76 import org.springframework.stereotype.Component;
77 import org.springframework.web.client.HttpStatusCodeException;
78
79 import org.omg.CORBA.BooleanHolder;
80 import java.util.function.BiConsumer;
81 import java.util.function.BiPredicate;
82 import java.util.function.BiFunction;
83 import java.util.function.Function;
84 import org.acumos.cds.domain.MLPArtifact;
85 import org.acumos.cds.domain.MLPDocument;
86
87
88 @Component("peergateway")
89 @Scope("singleton")
90 @Conditional({GatewayCondition.class})
91 public class PeerGateway {
92
93         private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
94         @Value("${federation.operator}")
95         private String defaultUser;
96         @Autowired
97         @Qualifier("acumos")
98         private TaskExecutor taskExecutor;
99         @Autowired
100         private Clients clients;
101         @Autowired
102         private ContentService content;
103         @Autowired
104         private CatalogService catalog;
105         @Autowired
106         private CatalogServiceConfiguration catalogConfig;
107         @Autowired
108         private PeerSubscriptionService peerSubscriptionService;
109
110
111         public PeerGateway() {
112                 log.trace("PeerGateway::new");
113         }
114
115         @PostConstruct
116         public void initGateway() {
117                 log.trace("initPeerGateway");
118                 /* make sure an operator was specified and that it is a declared user */
119                 try {
120                         if (this.clients.getCDSClient().getUser(defaultUser) == null) {
121                                 log.warn("The default federation operator {} is not a known user ID", defaultUser);
122                         }
123                 } catch (Exception dx) {
124                         log.warn("failed to verify default federation operator", dx);
125                 }
126                 log.debug("PeerGateway available");
127         }
128
129         @PreDestroy
130         public void cleanupGateway() {
131                 log.debug("PeerGateway destroyed");
132         }
133
134         protected String getUserId(MLPPeerSubscription theSubscription) {
135                 String userId = theSubscription.getUserId();
136                 return userId != null ? userId : defaultUser;
137         }
138
139         @EventListener
140         public void handlePeerSubscriptionUpdate(PeerSubscriptionEvent theEvent) {
141                 log.info("received peer subscription update event {}", theEvent);
142                 taskExecutor.execute(
143                                 new PeerGatewayUpdateTask(theEvent.getPeer(), theEvent.getSubscription()));
144         }
145
146         /**
147          * The list of solutions processed here represents the solutions (with respect
148          * to the subscription filter definition) that were reported by the peer as
149          * being updated since the last check.
150          */
151         public class PeerGatewayUpdateTask implements Runnable {
152
153                 private MLPPeer peer;
154                 private PeerSubscription sub;
155                 private FederationClient peerClient;
156
157                 public PeerGatewayUpdateTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
158                         this.peer = thePeer;
159                         this.sub = new PeerSubscription(theSub);
160                 }
161
162                 public void run() {
163                         peerClient = clients.getFederationClient(this.peer.getApiUrl());
164                         if (peerClient == null) {
165                                 log.error("Failed to get client for peer {}", this.peer);
166                                 return;
167                         }
168                         Map selector = null;
169                         try {
170                                 selector = Utils.jsonStringToMap(this.sub.getSelector());
171                         }
172                         catch(Exception x) {
173                                 log.error("Failed to parse selector for subscription {}", this.sub);
174                                 return;
175                         }
176                         Instant lastProcessed = Instant.now();
177                         boolean isComplete = true;
178                         Object catids = selector.get(API.QueryParameters.CATALOG_ID);
179                         if (catids instanceof String) {
180                                 isComplete &= scanCatalog((String)catids);
181                         } else if (catids instanceof String[]) {
182                                 for (String catid: (String[]) catids) {
183                                         isComplete &= scanCatalog(catid);
184                                 }
185                         } else {
186                                 log.error("Selector for subscription {} needs catalog ID(s)", this.sub);
187                                 return;
188                         }
189                         log.info("Processing of subscription {} completed succesfully: {}", this.sub, isComplete);
190                         //only commit the last processed date if we completed succesfully
191                         if (isComplete) {
192                                 try {
193                                         this.sub.setProcessed(lastProcessed);
194                                         peerSubscriptionService.updatePeerSubscription(this.sub);
195                                 }
196                                 catch (ServiceException sx) {
197                                         log.error("Failed to update subscription information", sx);
198                                 }
199                         }
200                 }
201                 private boolean scanCatalog(String theCatalogId) {
202
203                         JsonResponse<List<MLPSolution>> peerSolutionsResponse = null;
204                         try {
205                                 peerSolutionsResponse = peerClient.getSolutions(theCatalogId);
206                         }
207                         catch (FederationException fx) {
208                                 log.error("Processing peer " + this.peer + " subscription " + this.sub.getSubId() + " error.", fx);
209                                 return false;
210                         }
211                         List<MLPSolution> peerSolutions = peerSolutionsResponse.getContent();
212                         Set<String> localSolutions = new HashSet<>();
213                         try {
214                                 MLPCatalog localCatalog = catalog.getCatalogs().stream().filter(x -> x.getCatalogId().equals(theCatalogId)).findAny().orElse(null);
215                                 if (localCatalog == null) {
216                                         log.info("Subscription local catalog id {} missing: trying to create it", theCatalogId);
217                                         MLPCatalog peerCatalog = peerClient.getCatalogs().getContent().stream().filter(x -> x.getCatalogId().equals(theCatalogId)).findAny().orElse(null);
218                                         localCatalog = clients.getCDSClient().createCatalog(peerCatalog);
219                                 }
220                                 for (MLPSolution solution: catalog.getSolutions(theCatalogId)) {
221                                         localSolutions.add(solution.getSolutionId());
222                                 }
223                         } catch (FederationException fe) {
224                                 log.error("Failed to retrieve peer catalog " + theCatalogId, fe);
225                                 return false;
226                         } catch (ServiceException se) {
227                                 log.error("Failed to list solutions in local catalog " + theCatalogId, se);
228                                 return false;
229                         }
230                         log.info("Processing peer {} subscription {}, {} yielded solutions {}", this.peer, this.sub.getSubId(), theCatalogId, peerSolutions);
231                         if (peerSolutions == null) {
232                                 log.warn("No solutions available for peer {} subscription {} in {}", this.peer, this.sub.getSubId(), peerSolutionsResponse);
233                                 peerSolutions = Collections.emptyList();
234                                 //and let it proceed so we end up marking it as processed
235                         }
236
237                         ServiceContext ctx = catalog.selfService();
238                         boolean isComplete = true;
239
240                         for (MLPSolution peerSolution : peerSolutions) {
241                                 log.info("Processing peer solution {}", peerSolution);
242
243                                 try {
244                                         isComplete &= mapSolution(theCatalogId, peerSolution, !localSolutions.contains(peerSolution.getSolutionId()), ctx);
245                                 }
246                                 catch (Throwable t) {
247                                         log.error("Mapping of acumos solution failed for " + peerSolution, t);
248                                 }
249                         }
250                         return isComplete;
251                 }
252
253                 //this should go away once the move to service interface based operations is complete
254                 //as ugly as they come
255                 private ICommonDataServiceRestClient getCDSClient() {
256                         return PeerGateway.this.clients.getCDSClient();
257                 }
258
259                 public Artifact copyArtifact(Artifact peerArtifact) {
260                         return Artifact.buildFrom(peerArtifact)
261                                                                 .withUser(getUserId(this.sub))
262                                                                 .withCreated(TimestampedEntity.ORIGIN)
263                                                                 .withModified(TimestampedEntity.ORIGIN)
264                                                                 .build();
265                 }
266
267                 /* we create a new one as nothing is preserved. assumes matching ids. */
268                 public Artifact copyArtifact(Artifact peerArtifact, Artifact localArtifact) {
269                         return Artifact.buildFrom(peerArtifact)
270                                                                 .withId(localArtifact.getArtifactId())
271                                                                 .withUser(getUserId(this.sub))
272                                                                 .build();
273                 }
274
275                 public Document copyDocument(Document peerDocument) {
276                         return Document.buildFrom(peerDocument)
277                                                                 .withUser(getUserId(this.sub))
278                                                                 .withCreated(TimestampedEntity.ORIGIN)
279                                                                 .withModified(TimestampedEntity.ORIGIN)
280                                                                 .build();
281                 }
282
283                 public Document copyDocument(Document peerDocument, Document localDocument) {
284                         return Document.buildFrom(peerDocument)
285                                                                 .withId(localDocument.getDocumentId())
286                                                                 .withUser(getUserId(this.sub))
287                                                                 .build();
288                 }
289
290                 private MLPRevCatDescription copyRevCatDescription(MLPRevCatDescription peerDescription) {
291                         MLPRevCatDescription localDescription = new MLPRevCatDescription(peerDescription);
292                         localDescription.setCreated(TimestampedEntity.ORIGIN);
293                         localDescription.setModified(TimestampedEntity.ORIGIN);
294                         return localDescription;
295                 }
296
297                 private MLPRevCatDescription copyRevCatDescription(MLPRevCatDescription peerDescription, MLPRevCatDescription localDescription) {
298                         localDescription.setDescription(peerDescription.getDescription());
299                         return localDescription;
300                 }
301
302                 private void putRevCatDescription(MLPRevCatDescription theDescription) throws ServiceException {
303
304                         try {
305                                 if (theDescription.getCreated() == Instant.MIN) {
306                                         getCDSClient().createRevCatDescription(theDescription);
307                                         log.info("Local description created: {}", theDescription);
308                                 }
309                                 else {
310                                         getCDSClient().updateRevCatDescription(theDescription);
311                                 }
312                         }
313                         catch (HttpStatusCodeException restx) {
314                                 log.error("Revision description CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
315                                 throw new ServiceException("Revision description CDS call failed.", restx);
316                         }
317                         catch (Exception x) {
318                                 log.error("Revision description handling unexpected failure", x);
319                                 throw new ServiceException("Revision description handling unexpected failure", x);
320                         }
321                 }
322
323                 private Solution copySolution(Solution thePeerSolution) {
324                         return Solution.buildFrom(thePeerSolution)
325                                                                 .withCreated(TimestampedEntity.ORIGIN)
326                                                                 .withModified(TimestampedEntity.ORIGIN)
327                                                                 .withUser(getUserId(this.sub))
328                                                                 .withSource(this.peer.getPeerId())
329                                                                 .withPicture(thePeerSolution.getPicture())
330                                                                 .resetStats()
331                                                                 .build();
332                 }
333
334                 private Solution copySolution(Solution thePeerSolution, Solution theLocalSolution) {
335                         String newUserId = getUserId(this.sub);
336                         String newSourceId = this.peer.getPeerId();
337
338                         //some basic warnings
339                         if (!theLocalSolution.getUserId().equals(newUserId)) {
340                                 // is this solution being updated as part of different/new subscription?
341                                 log.warn("Updating solution {} triggers a user change", theLocalSolution.getSolutionId());
342                                 //but make the change anyway
343                                 theLocalSolution.setUserId(newUserId);
344                         }
345
346                         if (theLocalSolution.getSourceId() == null) {
347                                 //this is a local solution that made its way back
348                                 log.info("Solution {} was originally provisioned locally, avoiding user update", theLocalSolution.getSolutionId());
349                         }
350                         else {
351                                 if (!theLocalSolution.getSourceId().equals(newSourceId)) {
352                                         // we will see this if a solution is available in more than one peer
353                                         log.warn("Solution {} triggers a source change", theLocalSolution.getSolutionId());
354                                         //but make the change anyway
355                                         theLocalSolution.setSourceId(newSourceId);
356                                 }
357                         }
358
359                         theLocalSolution.setPicture(thePeerSolution.getPicture());
360                         //tags, keep only the delta
361                         Set<MLPTag> tags = thePeerSolution.getTags();
362                         tags.removeAll(theLocalSolution.getTags());
363                         theLocalSolution.setTags(tags);
364
365                         return theLocalSolution;
366                 }
367
368                 private boolean hasChanged(Solution thePeerSolution, Solution theLocalSolution) {
369                         if (!Arrays.equals(theLocalSolution.getPicture(), thePeerSolution.getPicture())) {
370                                 return true;
371                         }
372                         if (!theLocalSolution.getTags().containsAll(thePeerSolution.getTags()))
373                                 return false;
374
375                         return true;
376                 }
377
378                 /**
379                  * Here comes the core process of updating a local solution's related
380                  * information with what is available from a peer.
381                  *
382                  * @param theCatalogId
383                  *            the catalog containing the solution for fetching
384                  *            revision descriptions and documents
385                  * @param theSolution
386                  *            the local solution who's related information (revisions and
387                  *            artifacts) we are trying to sync
388                  * @param addToCatalog
389                  *            true if solution is not yet in local catalog
390                  * @param theContext
391                  *            the context in which we perform the catalog operations
392                  * @return true if mapping was succesful, false otherwise
393                  * @throws Exception
394                  *             any error related to CDS and peer interaction
395                  */
396                 protected boolean mapSolution(String theCatalogId, MLPSolution theSolution, boolean addToCatalog, ServiceContext theContext) throws Exception {
397
398                         boolean isComplete = true;
399                         boolean isSolutionNew = false;
400                         boolean hasSolutionChanged = false;
401
402                         Solution localSolution = null;
403                         Solution peerSolution = null;
404
405                         //retrieve the full representation from the peer
406                         JsonResponse<MLPSolution> peerSolutionResponse = null;
407                         try {
408                                 peerSolutionResponse = peerClient.getSolution(theSolution.getSolutionId());
409                         }
410                         catch (FederationException fx) {
411                                 log.warn("Failed to retrieve peer solution details for " + theSolution, fx);
412                                 return false;
413                         }
414
415                         peerSolution = (Solution)peerSolutionResponse.getContent();
416                         if (peerSolution == null) {
417                                 log.warn("No solution details available for {} in {}", theSolution, peerSolutionResponse);
418                                 return false;
419                         }
420
421                         localSolution = catalog.getSolution(peerSolution.getSolutionId());
422                         if (localSolution == null) {
423                                 localSolution = catalog.putSolution(copySolution(peerSolution), theContext);
424                                 isSolutionNew = true;
425                         }
426                         else {
427                                 hasSolutionChanged = hasChanged(peerSolution, localSolution);
428                         }
429
430                         if (addToCatalog) {
431                                 clients.getCDSClient().addSolutionToCatalog(localSolution.getSolutionId(), theCatalogId);
432                         }
433                         List<MLPSolutionRevision> peerRevisions = peerSolution.getRevisions();
434                         Collections.sort(peerRevisions, (arev, brev) -> arev.getModified().compareTo(brev.getModified()));
435
436                         // this should not happen as any solution should have at least one
437                         // revision (but that's an assumption on how on-boarding works)
438                         if (peerRevisions == null || peerRevisions.isEmpty()) {
439                                 log.warn("No solution revisions were retrieved from the peer");
440                                 return true;
441                         }
442
443                         // check if we have locally the latest revision available on the peer
444                         List<MLPSolutionRevision> catalogRevisions = localSolution.getRevisions();
445                         final List<MLPSolutionRevision> localRevisions = catalogRevisions == null ? Collections.emptyList() : catalogRevisions;
446
447                         // map peer revisions to local ones; new peer revisions have a null mapping
448                         Map<MLPSolutionRevision, MLPSolutionRevision> peerToLocalRevisions =
449                                         new LinkedHashMap<>();
450                         peerRevisions.forEach(peerRevision -> peerToLocalRevisions.put(peerRevision,
451                                         localRevisions.stream()
452                                                         .filter(localRevision -> localRevision.getRevisionId().equals(peerRevision.getRevisionId()))
453                                                         .findFirst().orElse(null)));
454
455                         for (Map.Entry<MLPSolutionRevision, MLPSolutionRevision> revisionEntry : peerToLocalRevisions.entrySet()) {
456                                 MLPSolutionRevision peerRevision = revisionEntry.getKey();
457                                 MLPSolutionRevision localRevision = revisionEntry.getValue();
458                                 boolean isRevisionNew = false;
459
460                                 //revision related information (artifacts/documents/description/..) is now embedded in the revision details
461                                 //federation api call so one call is all is needed
462                                 JsonResponse<MLPSolutionRevision> peerRevisionResponse = null;
463                                 try {
464                                         peerRevisionResponse = peerClient.getSolutionRevision(peerSolution.getSolutionId(), peerRevision.getRevisionId(), theCatalogId);
465                                 }
466                                 catch (FederationException fx) {
467                                         isComplete = false; //try the next revision but mark the overall processing as incomplete
468                                         continue;
469                                 }
470
471                                 peerRevision = peerRevisionResponse.getContent();
472                                 if (peerRevision == null) {
473                                         isComplete = false; //try the next revision but mark the overall processing as incomplete
474                                         continue;
475                                 }
476
477                                 if (localRevision == null) {
478                                         try {
479                                                 localRevision = catalog.putRevision(
480                                                     SolutionRevision.buildFrom(peerRevision)
481                                                         .withCreated(TimestampedEntity.ORIGIN)
482                                                         .withModified(TimestampedEntity.ORIGIN)
483                                                         .withUser(getUserId(this.sub))
484                                                         .withSource(this.peer.getPeerId())
485                                                                                                                                         .build(), theContext);
486                                         }
487                                         catch (ServiceException sx) {
488                                                 log.error("Failed to put revision " + theSolution.getSolutionId() + "/" + peerRevision.getRevisionId() + " into catalog", sx);
489                                                 isComplete = false; //try procecessing the next revision but mark the processing as incomplete
490                                                 continue;
491                                         }
492                                         isRevisionNew = true;
493                                 }
494
495                                 BooleanHolder hasRevChanged = new BooleanHolder(false);
496                                 isComplete &= artifactsHandler.handle(theCatalogId, peerRevision, localRevision, this, hasRevChanged);
497                                 isComplete &= documentsHandler.handle(theCatalogId, peerRevision, localRevision, this, hasRevChanged);
498                                 boolean hasRevisionChanged = hasRevChanged.value;
499
500                                 MLPRevCatDescription localDescription = ((SolutionRevision)localRevision).getRevCatDescription();
501                                 MLPRevCatDescription peerDescription = ((SolutionRevision)peerRevision).getRevCatDescription();
502
503                                 if (peerDescription != null) {
504                                         boolean doCatalog = false;
505
506                                         if (localDescription == null) {
507                                                 localDescription = copyRevCatDescription(peerDescription);
508                                                 doCatalog = true;
509                                         }
510                                         else {
511                                                 //is this a good enough test ?? it implies time sync ..
512                                                 if (peerDescription.getModified().isAfter(localDescription.getModified())) {
513                                                         localDescription = copyRevCatDescription(peerDescription, localDescription);
514                                                         doCatalog = true;
515                                                 }
516                                         }
517
518                                         if (doCatalog) {
519                                                 try {
520                                                         putRevCatDescription(localDescription);
521                                                 }
522                                                 catch (ServiceException sx) {
523                                                         log.error("Description processing failed",      sx);
524                                                         isComplete = false;
525                                                 }
526                                                 hasRevisionChanged = true;
527                                         }
528                                 } //end revision processing
529
530                                 if (!isRevisionNew && hasRevisionChanged) {
531                                         try {
532                                                 //we do not actually update any properties, just give CDS a chance to update the timestamps as to mark it as updated.
533                                                 catalog.putRevision(SolutionRevision.buildFrom(localRevision).build(),
534                                                                                                                                                                 theContext);
535                                         }
536                                         catch (ServiceException sx) {
537                                                 log.error("Failed to update local revision",    sx);
538                                                 isComplete = false;
539                                         }
540                                 }
541
542                                 hasSolutionChanged |= (isRevisionNew || hasRevisionChanged);
543                         } //end revisions processing
544
545                         if (!isSolutionNew && hasSolutionChanged) {
546                                 try {
547                                         catalog.putSolution(copySolution(peerSolution, localSolution), theContext);
548                                 }
549                                 catch (ServiceException sx) {
550                                         log.error("Failed to update local solution",    sx);
551                                         isComplete = false;
552                                 }
553                         }
554
555                         return isComplete;
556                 } // mapSolution
557
558                 public boolean alwaysUpdateCatalog() {
559                         return sub.getSubscriptionOptions().alwaysUpdateCatalog();
560                 }
561
562                 public boolean copyDocumentContent(String solutionId, Document document) {
563                         try (StreamingResource data = peerClient.getDocumentContent(document.getDocumentId())) {
564                                 content.putDocumentContent(solutionId, document, data);
565                                 log.info("Received {} bytes of document content", data.contentLength());
566                                 return true;
567                         } catch (FederationException fedex) {
568                                 log.error("Failed to retrieve acumos document content", fedex);
569                         } catch (ServiceException srvex) {
570                                 log.error("Failed to store document content to local repo", srvex);
571                         } catch (IOException ioex) {
572                                 log.error("Exception receiving document content ", ioex);
573                         }
574                         return false;
575                 }
576
577                 public boolean copyArtifactContent(String solutionId, Artifact artifact) {
578                         try (StreamingResource data = peerClient.getArtifactContent(artifact.getArtifactId())) {
579                                 content.putArtifactContent(solutionId, artifact, data);
580                                 log.info("Received {} bytes of artifact content", data.contentLength());
581                                 return true;
582                         } catch (FederationException fedex) {
583                                 log.error("Failed to retrieve acumos artifact content", fedex);
584                         } catch (ServiceException srvex) {
585                                 log.error("Failed to store artifact content to local repo", srvex);
586                         } catch (IOException ioex) {
587                                 log.error("Exception receiving artifact content ", ioex);
588                         }
589                         return false;
590                 }
591         }
592
593         /*
594          * The process for federating Artifacts and Documents is the same,
595          * with just different method names and, in a few cases method
596          * arguments.  These interfaces and the ItemsHandler class are about
597          * factoring out that duplicated code.
598          */
599
600         @FunctionalInterface
601         private interface TriFunction<T, U, V, R> {
602                 R apply(T t, U u, V v);
603         }
604
605         @FunctionalInterface
606         private interface TriPredicate<T, U, V> {
607                 boolean test(T t, U u, V v);
608         }
609
610         @FunctionalInterface
611         private interface QuadConsumer<T, U, V, W> {
612                 void accept(T t, U u, V v, W w);
613         }
614
615         private static class ItemsHandler<T>    {
616                 // get list of items to process
617                 private Function<SolutionRevision, List<T>> getList;
618                 // get ID of an item
619                 private Function<T, String> getId;
620                 // get item from local data store
621                 private BiFunction<ICommonDataServiceRestClient, String, T> localGet;
622                 // link item to revision/catalog
623                 private QuadConsumer<ICommonDataServiceRestClient, MLPSolutionRevision, String, String> link;
624                 // copy item from peer
625                 private BiFunction<PeerGatewayUpdateTask, T, T> copy;
626                 // check if peer item updates local item
627                 private BiPredicate<T, T> changed;
628                 // merge peer and local items
629                 private TriFunction<PeerGatewayUpdateTask, T, T, T> merge;
630                 // transfer content
631                 private TriPredicate<PeerGatewayUpdateTask, String, T> copybody;
632                 // create item
633                 private BiConsumer<ICommonDataServiceRestClient, T> create;
634                 // update item
635                 private BiConsumer<ICommonDataServiceRestClient, T> update;
636
637                 public ItemsHandler(
638                     Function<SolutionRevision, List<T>> getList,
639                     Function<T, String> getId,
640                     BiFunction<ICommonDataServiceRestClient, String, T> localGet,
641                     QuadConsumer<ICommonDataServiceRestClient, MLPSolutionRevision, String, String> link,
642                     BiFunction<PeerGatewayUpdateTask, T, T> copy,
643                     BiPredicate<T, T> changed,
644                     TriFunction<PeerGatewayUpdateTask, T, T, T> merge,
645                     TriPredicate<PeerGatewayUpdateTask, String, T> copybody,
646                     BiConsumer<ICommonDataServiceRestClient, T> create,
647                     BiConsumer<ICommonDataServiceRestClient, T> update) {
648                         this.getList = getList;
649                         this.getId = getId;
650                         this.localGet = localGet;
651                         this.link = link;
652                         this.copy = copy;
653                         this.changed = changed;
654                         this.merge = merge;
655                         this.copybody = copybody;
656                         this.create = create;
657                         this.update = update;
658                 }
659
660                 public boolean handle(String catalogId, MLPSolutionRevision peer, MLPSolutionRevision local, PeerGatewayUpdateTask task, BooleanHolder updated) {
661                         ICommonDataServiceRestClient cdsClient = task.getCDSClient();
662                         HashMap<String, T> localItems = new HashMap();
663                         for (T item: getList.apply((SolutionRevision)local)) {
664                                 localItems.put(getId.apply(item), item);
665                         }
666                         boolean success = true;
667                         for (T peerItem: getList.apply((SolutionRevision)peer)) {
668                                 String itemId = getId.apply(peerItem);
669                                 T localItem = localItems.get(itemId);
670                                 log.info("Processing peer item {} against local item {}", peerItem, localItem);
671                                 if (localItem == null) {
672                                         localItem = localGet.apply(cdsClient, itemId);
673                                         if (localItem != null) {
674                                                 link.accept(cdsClient, local, catalogId, itemId);
675                                         }
676                                 }
677                                 boolean isNew = (localItem == null);
678                                 if (isNew) {
679                                         localItem = copy.apply(task, peerItem);
680                                 } else if (changed.test(peerItem, localItem)) {
681                                         localItem = merge.apply(task, peerItem, localItem);
682                                 } else {
683                                         continue;
684                                 }
685                                 if (!copybody.test(task, local.getSolutionId(), localItem)) {
686                                         success = false;
687                                         if (!task.alwaysUpdateCatalog()) {
688                                                 continue;
689                                         }
690                                 }
691                                 if (isNew) {
692                                         create.accept(cdsClient, localItem);
693                                         link.accept(cdsClient, local, catalogId, itemId);
694                                 } else {
695                                         update.accept(cdsClient, localItem);
696                                 }
697                                 updated.value = true;
698                         }
699                         return(success);
700                 }
701         }
702
703         private static ItemsHandler<MLPDocument> documentsHandler = new ItemsHandler<MLPDocument>(
704             rev -> rev.getDocuments(),
705             doc -> doc.getDocumentId(),
706             (client, id) -> client.getDocument(id),
707             (client, rev, catid, itemid) -> client.addRevisionCatalogDocument(rev.getRevisionId(), catid, itemid),
708             (task, doc) -> task.copyDocument((Document)doc),
709             (peerdoc, localdoc) -> PeerGateway.hasChanged((Document)peerdoc, (Document)localdoc),
710             (task, peerdoc, localdoc) -> task.copyDocument((Document)peerdoc, (Document)localdoc),
711             (task, solid, doc) -> task.copyDocumentContent(solid, (Document)doc),
712             (client, doc) -> client.createDocument(doc),
713             (client, doc) -> client.updateDocument(doc));
714         private static ItemsHandler<MLPArtifact> artifactsHandler = new ItemsHandler<MLPArtifact>(
715             rev -> rev.getArtifacts(),
716             art -> art.getArtifactId(),
717             (client, id) -> client.getArtifact(id),
718             (client, rev, catid, itemid) -> client.addSolutionRevisionArtifact(rev.getSolutionId(), rev.getRevisionId(), itemid),
719             (task, art) -> task.copyArtifact((Artifact)art),
720             (peerart, localart) -> PeerGateway.hasChanged((Artifact)peerart, (Artifact)localart),
721             (task, peerart, localart) -> task.copyArtifact((Artifact)peerart, (Artifact)localart),
722             (task, solid, art) -> task.copyArtifactContent(solid, (Artifact)art),
723             (client, art) -> client.createArtifact(art),
724             (client, art) -> client.updateArtifact(art));
725
726         public static boolean hasChanged(Document thePeerDoc, Document theLocalDoc) {
727                 if (thePeerDoc.getVersion() != null && theLocalDoc.getVersion() != null) {
728                         return !thePeerDoc.getVersion().equals(theLocalDoc.getVersion());
729                 }
730
731                 if (thePeerDoc.getSize() != null && theLocalDoc.getSize() != null) {
732                         return !thePeerDoc.getSize().equals(theLocalDoc.getSize());
733                 }
734
735                 return true;
736         }
737
738         public static boolean hasChanged(Artifact thePeerArtifact, Artifact theLocalArtifact) {
739                 if (thePeerArtifact.getVersion() != null && theLocalArtifact.getVersion() != null) {
740                         return !thePeerArtifact.getVersion().equals(theLocalArtifact.getVersion());
741                 }
742
743                 if (thePeerArtifact.getSize() != null && theLocalArtifact.getSize() != null) {
744                         return !thePeerArtifact.getSize().equals(theLocalArtifact.getSize());
745                 }
746
747                 return true;
748         }
749 }