4bb64448ef036c1212ff684dc0c4a0ee1278f59c
[federation.git] / gateway / src / main / java / org / acumos / federation / gateway / adapter / PeerGateway.java
1 /*-
2  * ===============LICENSE_START=======================================================
3  * Acumos
4  * ===================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6  * ===================================================================================
7  * This Acumos software file is distributed by AT&T and Tech Mahindra
8  * under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *  
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  * This file is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ===============LICENSE_END=========================================================
19  */
20
21 package org.acumos.federation.gateway.adapter;
22
23 import java.io.Closeable;
24 import java.lang.invoke.MethodHandles;
25 import java.time.Instant;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33
34 import javax.annotation.PostConstruct;
35 import javax.annotation.PreDestroy;
36
37 import org.acumos.cds.AccessTypeCode;
38 import org.acumos.cds.client.ICommonDataServiceRestClient;
39 import org.acumos.cds.domain.MLPPeer;
40 import org.acumos.cds.domain.MLPPeerSubscription;
41 import org.acumos.cds.domain.MLPRevisionDescription;
42 import org.acumos.cds.domain.MLPSolution;
43 import org.acumos.cds.domain.MLPSolutionRevision;
44 import org.acumos.cds.domain.MLPTag;
45 import org.acumos.federation.gateway.cds.Artifact;
46 import org.acumos.federation.gateway.cds.Document;
47 import org.acumos.federation.gateway.cds.PeerSubscription;
48 import org.acumos.federation.gateway.cds.Solution;
49 import org.acumos.federation.gateway.cds.SolutionRevision;
50 import org.acumos.federation.gateway.cds.SubscriptionScope;
51 import org.acumos.federation.gateway.cds.TimestampedEntity;
52 import org.acumos.federation.gateway.common.Clients;
53 import org.acumos.federation.gateway.common.FederationClient;
54 import org.acumos.federation.gateway.common.FederationException;
55 import org.acumos.federation.gateway.common.JsonResponse;
56 import org.acumos.federation.gateway.config.GatewayCondition;
57 import org.acumos.federation.gateway.event.PeerSubscriptionEvent;
58 import org.acumos.federation.gateway.service.CatalogService;
59 import org.acumos.federation.gateway.service.ContentService;
60 import org.acumos.federation.gateway.service.PeerSubscriptionService;
61 import org.acumos.federation.gateway.service.ServiceContext;
62 import org.acumos.federation.gateway.service.ServiceException;
63 import org.acumos.federation.gateway.util.Utils;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66 import org.springframework.beans.factory.BeanInitializationException;
67 import org.springframework.beans.factory.annotation.Autowired;
68 import org.springframework.beans.factory.annotation.Qualifier;
69 import org.springframework.context.annotation.Conditional;
70 import org.springframework.context.annotation.Scope;
71 import org.springframework.context.event.EventListener;
72 import org.springframework.core.env.Environment;
73 import org.springframework.core.io.Resource;
74 import org.springframework.core.task.TaskExecutor;
75 import org.springframework.stereotype.Component;
76 import org.springframework.web.client.HttpStatusCodeException;
77
78
79 @Component("peergateway")
80 @Scope("singleton")
81 @Conditional({GatewayCondition.class})
82 public class PeerGateway {
83
84         private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
85         @Autowired
86         @Qualifier("acumos")
87         private TaskExecutor taskExecutor;
88         @Autowired
89         private Environment env;
90         @Autowired
91         private Clients clients;
92         @Autowired
93         private ContentService content;
94         @Autowired
95         private CatalogService catalog;
96         @Autowired
97         private PeerSubscriptionService peerSubscriptionService;
98
99         private static final String federationDotOperator = "federation.operator";
100
101         public PeerGateway() {
102                 log.trace("PeerGateway::new");
103         }
104
105         @PostConstruct
106         public void initGateway() {
107                 log.trace("initPeerGateway");
108
109                 /* make sure an operator was specified and that it is a declared user */
110                 if (null == this.env.getProperty(federationDotOperator)) {
111                         throw new BeanInitializationException("Missing configuration key " + federationDotOperator);
112                 } 
113                 else {
114                         try {
115                                 if (null == this.clients.getCDSClient().getUser(this.env.getProperty(federationDotOperator))) {
116                                         log.warn(federationDotOperator + 
117                                                         " does not point to an existing user");
118                                 }
119                         }
120                         catch (/* HttpStatusCode */Exception dx) {
121                                 log.warn("failed to verify value " + federationDotOperator, dx);
122                         }
123                 }
124
125                 // Done
126                 log.debug("PeerGateway available");
127         }
128
129         @PreDestroy
130         public void cleanupGateway() {
131                 log.debug("PeerGateway destroyed");
132         }
133
134         protected String getUserId(MLPPeerSubscription theSubscription/*
135                                                                                                                                          * , MLPSolution theSolution
136                                                                                                                                          */) {
137                 String userId = theSubscription.getUserId();
138                 return userId != null ? userId : this.env.getProperty(federationDotOperator);
139         }
140
141         @EventListener
142         public void handlePeerSubscriptionUpdate(PeerSubscriptionEvent theEvent) {
143                 log.info("received peer subscription update event {}", theEvent);
144                 taskExecutor.execute(
145                                 new PeerGatewayUpdateTask(theEvent.getPeer(), theEvent.getSubscription()));
146         }
147
148         /**
149          * The list of solutions processed here represents the solutions (with respect
150          * to the subscription filter definition) that were reported by the peer as
151          * being updated since the last check.
152          */
153         public class PeerGatewayUpdateTask implements Runnable {
154
155                 private MLPPeer peer;
156                 private PeerSubscription sub;
157
158                 public PeerGatewayUpdateTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
159                         this.peer = thePeer;
160                         this.sub = new PeerSubscription(theSub);
161                 }
162
163                 public void run() {
164
165                         Map selector = null;
166                         try {
167                                 selector = Utils.jsonStringToMap(this.sub.getSelector());
168                         }
169                         catch(Exception x) {
170                                 log.error("Failed to parse selector for subscription {}", this.sub);
171                                 return;
172                         }
173                         Instant lastProcessed = this.sub.getProcessed();
174                         if (lastProcessed != null) {
175                                 selector.put("modified", lastProcessed.getEpochSecond());
176                         }
177                         lastProcessed = Instant.now();
178                         
179                         FederationClient peerClient = clients.getFederationClient(this.peer.getApiUrl());
180                         if (peerClient == null) {
181                                 log.error("Failed to get client for peer {}", this.peer);
182                                 return;
183                         }
184
185                         JsonResponse<List<MLPSolution>> peerSolutionsResponse = null;
186                         try {
187                                 peerSolutionsResponse = peerClient.getSolutions(selector);
188                         }
189                         catch (FederationException fx) {
190                                 log.info("Processing peer " + this.peer + " subscription " + this.sub.getSubId() + " error.", fx);
191                                 return;
192                         }
193
194                         List<MLPSolution> peerSolutions = peerSolutionsResponse.getContent();
195                         log.info("Processing peer {} subscription {}, {} yielded solutions {}", this.peer, this.sub.getSubId(), selector, peerSolutions);
196                         if (peerSolutions == null) {
197                                 log.warn("No solutions available for peer {} subscription {} in {}", this.peer, this.sub.getSubId(), peerSolutionsResponse);
198                                 peerSolutions = Collections.EMPTY_LIST;
199                                 //and let it proceed so we end up marking it as processed
200                         }
201
202                         ServiceContext ctx = catalog.selfService();
203                         boolean isComplete = true;
204
205                         for (MLPSolution peerSolution : peerSolutions) {
206                                 log.info("Processing peer solution {}", peerSolution);
207
208                                 try {
209                                         isComplete &= mapSolution(peerSolution, peerClient, ctx);
210                                 }
211                                 catch (Throwable t) {
212                                         log.error("Mapping of acumos solution failed for " + peerSolution, t);
213                                 }
214                         }
215                                         
216                         log.info("Processing of subscription {} completed succesfully: {}", this.sub, isComplete);
217                         //only commit the last processed date if we completed succesfully
218                         if (isComplete) {
219                                 try {
220                                         this.sub.setProcessed(lastProcessed);
221                                         peerSubscriptionService.updatePeerSubscription(this.sub);
222                                 }
223                                 catch (ServiceException sx) {
224                                         log.error("Failed to update subscription information", sx);
225                                 }
226                         }
227                 }
228
229                 //this should go away once the move to service interface based operations is complete
230                 //as ugly as they come
231                 private ICommonDataServiceRestClient getCDSClient(ServiceContext theContext) {
232                         return PeerGateway.this.clients.getCDSClient();
233                 }
234
235                 private Artifact copyArtifact(Artifact peerArtifact) {
236                         return Artifact.buildFrom(peerArtifact)
237                                                                 .withUser(getUserId(this.sub))
238                                                                 .withCreated(TimestampedEntity.ORIGIN)
239                                                                 .withModified(TimestampedEntity.ORIGIN)
240                                                                 .build();
241                 }
242
243                 /* we create a new one as nothing is preserved. assumes matching ids. */
244                 private Artifact copyArtifact(Artifact peerArtifact, Artifact localArtifact) {
245                         return Artifact.buildFrom(peerArtifact)
246                                                                 .withId(localArtifact.getArtifactId())
247                                                                 .withUser(getUserId(this.sub))
248                                                                 .build();
249                 }
250
251                 private void putArtifact(String theSolutionId, String theRevisionId, Artifact theArtifact,
252                                 ServiceContext theContext) throws ServiceException {
253
254                         assert(getCDSClient(theContext) != null);
255
256                         try {
257                                 if (theArtifact.getCreated() == Instant.MIN) {
258                                         getCDSClient(theContext).createArtifact(theArtifact);
259                                         getCDSClient(theContext).addSolutionRevisionArtifact(theSolutionId, theRevisionId, theArtifact.getArtifactId());
260                                         log.info("Local artifact created: {}", theArtifact);
261                                 }
262                                 else {
263                                         getCDSClient(theContext).updateArtifact(theArtifact);
264                                         log.info("Local artifact updated: {}", theArtifact);
265                                 }
266         
267                         }
268                         catch (HttpStatusCodeException restx) {
269                                 log.error("Artifact CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
270                                 throw new ServiceException("Artifact CDS call failed.", restx);
271                         }
272                         catch (Exception x) {
273                                 log.error("Artifact unexpected failure", x);
274                                 throw new ServiceException("Artifact CDS call failed.", x);
275                         }
276                 }
277
278                 private Document copyDocument(Document peerDocument) {
279                         return Document.buildFrom(peerDocument)
280                                                                 .withUser(getUserId(this.sub))
281                                                                 .withCreated(TimestampedEntity.ORIGIN)
282                                                                 .withModified(TimestampedEntity.ORIGIN)
283                                                                 .build();
284                 }
285
286                 private Document copyDocument(Document peerDocument, Document localDocument) {
287                         return Document.buildFrom(peerDocument)
288                                                                 .withId(localDocument.getDocumentId())
289                                                                 .withUser(getUserId(this.sub))
290                                                                 .build();
291                 }
292
293                 private void putDocument(String theSolutionId, String theRevisionId, Document theDocument,
294                                 ServiceContext theContext) throws ServiceException {
295
296                         try {
297                                 if (theDocument.getCreated() == Instant.MIN) {
298                                         getCDSClient(theContext).createDocument(theDocument);
299                                         getCDSClient(theContext).addSolutionRevisionDocument(theRevisionId, AccessTypeCode.PB.name(), theDocument.getDocumentId());
300                                         log.info("Local document created: {}", theDocument);
301                                 }
302                                 else {
303                                         getCDSClient(theContext).updateDocument(theDocument);
304                                         log.info("Local document updated: {}", theDocument);
305                                 }
306                         }
307                         catch (HttpStatusCodeException restx) {
308                                 log.error("Document CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
309                                 throw new ServiceException("Document CDS call failed.", restx);
310                         }
311                         catch (Exception x) {
312                                 log.error("Document handling unexpected failure", x);
313                                 throw new ServiceException("Document handling unexpected failure", x);
314                         }
315                 }
316         
317                 private MLPRevisionDescription copyRevisionDescription(MLPRevisionDescription peerDescription) {
318                         MLPRevisionDescription localDescription = new MLPRevisionDescription(peerDescription);
319                         localDescription.setCreated(TimestampedEntity.ORIGIN);
320                         localDescription.setModified(TimestampedEntity.ORIGIN);
321                         return localDescription;
322                 }
323
324                 private MLPRevisionDescription copyRevisionDescription(MLPRevisionDescription peerDescription, MLPRevisionDescription localDescription) {
325                         localDescription.setDescription(peerDescription.getDescription());
326                         return localDescription;
327                 }
328
329                 private void putRevisionDescription(MLPRevisionDescription theDescription,ServiceContext theContext) throws ServiceException {
330                         
331                         try {
332                                 if (theDescription.getCreated() == Instant.MIN) {
333                                         getCDSClient(theContext).createRevisionDescription(theDescription);
334                                         log.info("Local description created: {}", theDescription);
335                                 }
336                                 else {
337                                         getCDSClient(theContext).updateRevisionDescription(theDescription);
338                                 }
339                         }
340                         catch (HttpStatusCodeException restx) {
341                                 log.error("Revision description CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
342                                 throw new ServiceException("Revision description CDS call failed.", restx);
343                         }
344                         catch (Exception x) {
345                                 log.error("Revision description handling unexpected failure", x);
346                                 throw new ServiceException("Revision description handling unexpected failure", x);
347                         }
348                 }
349
350                 private boolean hasChanged(Artifact thePeerArtifact, Artifact theLocalArtifact) {
351                         if (thePeerArtifact.getVersion() != null && theLocalArtifact.getVersion() != null) { 
352                                 return !thePeerArtifact.getVersion().equals(theLocalArtifact.getVersion());
353                         }
354
355                         if (thePeerArtifact.getSize() != null && theLocalArtifact.getSize() != null) { 
356                                 return !thePeerArtifact.getSize().equals(theLocalArtifact.getSize());
357                         }
358
359                         return true;
360                 }       
361
362                 private boolean hasChanged(Document thePeerDoc, Document theLocalDoc) {
363                         if (thePeerDoc.getVersion() != null && theLocalDoc.getVersion() != null) { 
364                                 return !thePeerDoc.getVersion().equals(theLocalDoc.getVersion());
365                         }
366
367                         if (thePeerDoc.getSize() != null && theLocalDoc.getSize() != null) { 
368                                 return !thePeerDoc.getSize().equals(theLocalDoc.getSize());
369                         }
370
371                         return true;
372                 }
373
374                 private Solution copySolution(Solution thePeerSolution) {
375                         return Solution.buildFrom(thePeerSolution)
376                                                                 .withCreated(TimestampedEntity.ORIGIN)
377                                                                 .withModified(TimestampedEntity.ORIGIN)
378                                                                 .withUser(getUserId(this.sub))
379                                                                 .withSource(this.peer.getPeerId())
380                                                                 .withPicture(thePeerSolution.getPicture())
381                                                                 .resetStats()
382                                                                 .build();
383                 }
384         
385                 private Solution copySolution(Solution thePeerSolution, Solution theLocalSolution) {    
386                         String newUserId = getUserId(this.sub),
387                                                  newSourceId = this.peer.getPeerId();
388
389                         //some basic warnings
390                         if (!theLocalSolution.getUserId().equals(newUserId)) {
391                                 // is this solution being updated as part of different/new subscription?
392                                 log.warn("Updating solution {} triggers a user change", theLocalSolution.getSolutionId());
393                                 //but make the change anyway
394                                 theLocalSolution.setUserId(newUserId);
395                         }
396
397                         if (theLocalSolution.getSourceId() == null) {
398                                 //this is a local solution that made its way back
399                                 log.info("Solution {} was originally provisioned locally, avoiding user update", theLocalSolution.getSolutionId());
400                         }
401                         else {
402                                 if (!theLocalSolution.getSourceId().equals(newSourceId)) {
403                                         // we will see this if a solution is available in more than one peer
404                                         log.warn("Solution {} triggers a source change", theLocalSolution.getSolutionId());
405                                         //but make the change anyway
406                                         theLocalSolution.setSourceId(newSourceId);
407                                 }
408                         }
409
410                         theLocalSolution.setPicture(thePeerSolution.getPicture());
411                         //tags, keep only the delta
412                         Set<MLPTag> tags = thePeerSolution.getTags();
413                         tags.removeAll(theLocalSolution.getTags());
414                         theLocalSolution.setTags(tags);                 
415
416                         return theLocalSolution;
417                 }
418
419                 private boolean hasChanged(Solution thePeerSolution, Solution theLocalSolution) {
420                         if (!Arrays.equals(theLocalSolution.getPicture(), thePeerSolution.getPicture())) {
421                                 return true;
422                         }
423                         if (!theLocalSolution.getTags().containsAll(thePeerSolution.getTags()))
424                                 return false;
425
426                         return true;
427                 }
428
429                 /**
430                  * Here comes the core process of updating a local solution's related
431                  * information with what is available from a peer.
432                  * 
433                  * @param theSolution
434                  *            the local solution who's related information (revisions and
435                  *            artifacts) we are trying to sync
436                  * @param thePeerClient
437                  *            client
438                  * @param theContext
439                  *            the context in which we perform the catalog operations
440                  * @return true if mapping was succesful, false otherwise
441                  * @throws Exception
442                  *             any error related to CDS and peer interaction
443                  */
444                 protected boolean mapSolution(MLPSolution theSolution, FederationClient thePeerClient, ServiceContext theContext) throws Exception {
445
446                         boolean isComplete = true,
447                                                         isSolutionNew = false,
448                                                         hasSolutionChanged = false;
449
450                         Solution localSolution = null,
451                                                          peerSolution = null;
452
453                         //retrieve the full representation from the peer
454                         JsonResponse<MLPSolution> peerSolutionResponse = null;
455                         try {
456                                 peerSolutionResponse = thePeerClient.getSolution(theSolution.getSolutionId());
457                         }
458                         catch (FederationException fx) {
459                                 log.warn("Failed to retrieve peer solution details for " + theSolution, fx);
460                                 return false;
461                         }
462
463                         peerSolution = (Solution)peerSolutionResponse.getContent();
464                         if (peerSolution == null) {
465                                 log.warn("No solution details available for {} in {}", theSolution, peerSolutionResponse);
466                                 return false;
467                         }
468
469                         localSolution = catalog.getSolution(peerSolution.getSolutionId());
470                         if (localSolution == null) {
471                                 localSolution = catalog.putSolution(copySolution(peerSolution), theContext);
472                         isSolutionNew = true;
473                         }
474                         else {
475                                 hasSolutionChanged = hasChanged(peerSolution, localSolution);
476                         }
477                         
478                         List<MLPSolutionRevision> peerRevisions = (List)peerSolution.getRevisions();
479                         Collections.sort(peerRevisions, (arev, brev) -> arev.getModified().compareTo(brev.getModified()));
480
481                         // this should not happen as any solution should have at least one
482                         // revision (but that's an assumption on how on-boarding works)
483                         if (peerRevisions == null || peerRevisions.size() == 0) {
484                                 log.warn("No peer revisions were retrieved");
485                                 return true;
486                         }
487
488                         // check if we have locally the latest revision available on the peer
489                         List<MLPSolutionRevision> catalogRevisions = (List)localSolution.getRevisions();
490                         final List<MLPSolutionRevision> localRevisions = catalogRevisions == null ? Collections.EMPTY_LIST : catalogRevisions;
491
492                         // map peer revisions to local ones; new peer revisions have a null mapping
493                         Map<MLPSolutionRevision, MLPSolutionRevision> peerToLocalRevisions =
494                                         new LinkedHashMap<MLPSolutionRevision, MLPSolutionRevision>();
495                         peerRevisions.forEach(peerRevision -> peerToLocalRevisions.put(peerRevision,
496                                         localRevisions.stream()
497                                                         .filter(localRevision -> localRevision.getRevisionId().equals(peerRevision.getRevisionId()))
498                                                         .findFirst().orElse(null)));
499
500                         for (Map.Entry<MLPSolutionRevision, MLPSolutionRevision> revisionEntry : peerToLocalRevisions.entrySet()) {
501                                 MLPSolutionRevision peerRevision = revisionEntry.getKey(), localRevision = revisionEntry.getValue();
502
503                                 boolean isRevisionNew = false,
504                                                                 hasRevisionChanged = false;
505
506                                 //revision related information (artifacts/documents/description/..) is now embedded in the revision details
507                                 //federation api call so one call is all is needed
508                                 JsonResponse<MLPSolutionRevision> peerRevisionResponse = null;
509                                 try {
510                                         peerRevisionResponse = thePeerClient.getSolutionRevision(peerSolution.getSolutionId(), peerRevision.getRevisionId());
511                                 }
512                                 catch (FederationException fx) {
513                                         isComplete = false; //try the next revision but mark the overall processing as incomplete
514                                         continue;
515                                 }
516
517                                 peerRevision = peerRevisionResponse.getContent();
518                                 if (peerRevision == null) {
519                                         isComplete = false; //try the next revision but mark the overall processing as incomplete
520                                         continue;
521                                 }
522
523                                 if (localRevision == null) {
524                                         try {
525                                                 localRevision = catalog.putSolutionRevision(
526                                                                                                                                                         SolutionRevision.buildFrom(peerRevision)
527                                                                                                                                                                 .withCreated(TimestampedEntity.ORIGIN)
528                                                                                                                                                                 .withModified(TimestampedEntity.ORIGIN)
529                                                                                                                                                                 .withUser(getUserId(this.sub))
530                                                                                                                                                                 .withSource(this.peer.getPeerId())
531                                                                                                                                                                 .withAccessTypeCode(this.sub.getAccessType())
532                                                                                                                                                                 .build(), theContext);
533                                         }
534                                         catch (ServiceException sx) {
535                                                 log.error("Failed to put revision " + theSolution.getSolutionId() + "/" + peerRevision.getRevisionId() + " into catalog", sx);
536                                                 isComplete = false; //try procecessing the next revision but mark the processing as incomplete
537                                                 continue;
538                                         }
539                                         isRevisionNew = true;
540                                 }
541
542                                 List<Artifact> peerArtifacts = (List)((SolutionRevision)peerRevision).getArtifacts();
543                                 List<Document> peerDocuments = (List)((SolutionRevision)peerRevision).getDocuments();
544
545                                 List<Artifact> catalogArtifacts = (List)((SolutionRevision)localRevision).getArtifacts();
546                                 List<Document> catalogDocuments = (List)((SolutionRevision)localRevision).getDocuments();
547
548                                 final List<Artifact> localArtifacts = catalogArtifacts;
549                                 // map the artifacts
550                                 // TODO: track deleted artifacts
551                                 Map<Artifact, Artifact> peerToLocalArtifacts = new HashMap<Artifact, Artifact>();
552                                 peerArtifacts.forEach(peerArtifact -> peerToLocalArtifacts.put(peerArtifact, localArtifacts.stream()
553                                                 .filter(localArtifact -> localArtifact.getArtifactId().equals(peerArtifact.getArtifactId()))
554                                                 .findFirst().orElse(null)));
555
556                                 for (Map.Entry<Artifact, Artifact> artifactEntry : peerToLocalArtifacts.entrySet()) {
557                                         Artifact peerArtifact = artifactEntry.getKey(),
558                                                                          localArtifact = artifactEntry.getValue();
559                                         boolean doCatalog = false;
560                                         
561                                         log.info("Processing peer artifact {} against local artifact {}", peerArtifact, localArtifact);
562
563                                         if (localArtifact == null) {
564                                                 localArtifact = copyArtifact(peerArtifact);
565                                                 doCatalog = true;
566                                         }
567                                         else {
568                                                 if (hasChanged(peerArtifact, localArtifact)) {
569                                                         // update local artifact
570                                                         localArtifact = copyArtifact(peerArtifact, localArtifact);
571                                                         doCatalog = true;
572                                                 }
573                                         }
574
575                                         boolean doContent = doCatalog &&
576                                                                                                                         (peerArtifact.getUri() != null) &&
577                                                                                                                         (SubscriptionScope.Full == SubscriptionScope.forCode(this.sub.getScopeType()));
578                                         if (doContent) {
579                                                 log.info("Processing content for artifact {}", peerArtifact); 
580                                                 // TODO: we are trying to access the artifact by its identifier which
581                                                 // is fine in the common case but the uri specified in the artifact
582                                                 // data is the right approach (as it does not rely on the E5 definition).
583                                                 Resource artifactContent = null;
584                                                 try {
585                                                         artifactContent = thePeerClient.getArtifactContent(
586                                                                 peerSolution.getSolutionId(), peerRevision.getRevisionId(), peerArtifact.getArtifactId());
587                                                         log.info("Received {} bytes of artifact content", artifactContent.contentLength()); 
588                                                 }
589                                                 catch (FederationException x) {
590                                                         log.error("Failed to retrieve acumos artifact content", x);
591                                                         doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
592                                                         isComplete = false;
593                                                 }
594
595                                                 if (artifactContent != null) {
596                                                         try {
597                                                                 content.putArtifactContent(
598                                                                         localSolution.getSolutionId(), localRevision.getRevisionId(), localArtifact, artifactContent);
599                                                                 doCatalog = true;
600                                                         }
601                                                         catch (ServiceException sx) {
602                                                                 log.error("Failed to store artifact content to local repo", sx);
603                                                                 doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
604                                                                 isComplete = false;
605                                                         }
606                                                         finally {
607                                                                 if (artifactContent instanceof Closeable) {
608                                                                         ((Closeable)artifactContent).close();
609                                                                 }
610                                                         }
611                                                 }
612                                         }
613
614                                         if (doCatalog) {
615                                                 try {
616                                                         putArtifact(localSolution.getSolutionId(), localRevision.getRevisionId(), localArtifact, theContext);
617                                                 }
618                                                 catch (ServiceException sx) {
619                                                         log.error("Artifact processing failed.", sx);
620                                                         isComplete = false;
621                                                 }
622                                                 hasRevisionChanged = true;
623                                         }
624                                 } //end map artifacts loop
625
626
627                                 final List<Document> localDocuments = catalogDocuments;
628                                 // map the documents
629                                 // TODO: track deleted documents
630                                 Map<Document, Document> peerToLocalDocuments = new HashMap<Document, Document>();
631                                 peerDocuments.forEach(peerDocument -> peerToLocalDocuments.put(peerDocument, localDocuments.stream()
632                                                 .filter(localDocument -> localDocument.getDocumentId().equals(peerDocument.getDocumentId()))
633                                                 .findFirst().orElse(null)));
634
635                                 for (Map.Entry<Document, Document> documentEntry : peerToLocalDocuments.entrySet()) {
636                                         Document peerDocument = documentEntry.getKey(),
637                                                                          localDocument = documentEntry.getValue();
638                                         boolean doCatalog = false;
639
640                                         log.info("Processing peer document {} against local version {}", peerDocument, localDocument);
641                                         if (localDocument == null) {
642                                                 localDocument = copyDocument(peerDocument);
643                                                 doCatalog = true;
644                                         }
645                                         else {
646                                                 //version strings are not standard so comparing them is not necessarly safe
647                                                 if (hasChanged(peerDocument, localDocument)) {
648                                                         // update local doc
649                                                         localDocument = copyDocument(peerDocument, localDocument);
650                                                         doCatalog = true;
651                                                 }
652                                         }
653
654                                         boolean doContent = doCatalog &&
655                                                                                                                         (peerDocument.getUri() != null) &&
656                                                                                                                         (SubscriptionScope.Full == SubscriptionScope.forCode(this.sub.getScopeType()));
657                                         if (doContent) {
658                                                 log.info("Processing content for document {}", peerDocument); 
659                                                 // TODO: we are trying to access the document by its identifier which
660                                                 // is fine in the common case but the uri specified in the document
661                                                 // data is a more flexible approach.
662                                                 Resource documentContent = null;
663                                                 try {
664                                                         documentContent = thePeerClient.getDocumentContent(
665                                                                 peerSolution.getSolutionId(), peerRevision.getRevisionId(), peerDocument.getDocumentId());
666                                                         log.info("Received {} bytes of document content", documentContent.contentLength()); 
667                                                 }
668                                                 catch (FederationException x) {
669                                                         log.error("Failed to retrieve acumos document content", x);
670                                                         doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
671                                                         isComplete = false;
672                                                 }
673
674                                                 if (documentContent != null) {
675                                                         try {
676                                                                 content.putDocumentContent(
677                                                                         localSolution.getSolutionId(), localRevision.getRevisionId(), localDocument, documentContent);
678                                                                 doCatalog = true;
679                                                         }
680                                                         catch (ServiceException sx) {
681                                                                 log.error("Failed to store document content to local repo", sx);
682                                                                 doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
683                                                                 isComplete = false;
684                                                         }
685                                                 }
686                                         }
687
688                                         if (doCatalog) {
689                                                 try {
690                                                         putDocument(localSolution.getSolutionId(), localRevision.getRevisionId(), localDocument, theContext);
691                                                 }
692                                                 catch (ServiceException sx) {
693                                                         log.error("Document processing failed", sx);
694                                                         isComplete = false;
695                                                 }
696                                                 hasRevisionChanged = true;
697                                         }
698         
699                                 } // end map documents loop
700                                 
701                                 MLPRevisionDescription localDescription = ((SolutionRevision)localRevision).getRevisionDescription();
702                                 MLPRevisionDescription peerDescription = ((SolutionRevision)peerRevision).getRevisionDescription();
703
704                                 if (peerDescription != null) {
705                                         boolean doCatalog = false;
706
707                                         if (localDescription == null) {
708                                                 localDescription = copyRevisionDescription(peerDescription);
709                                                 doCatalog = true;
710                                         }
711                                         else {
712                                                 //is this a good enough test ?? it implies time sync ..
713                                                 if (peerDescription.getModified().isAfter(localDescription.getModified())) {
714                                                         localDescription = copyRevisionDescription(peerDescription, localDescription);
715                                                         doCatalog = true;
716                                                 }
717                                         }
718
719                                         if (doCatalog) {
720                                                 try {
721                                                         putRevisionDescription(localDescription, theContext);
722                                                 }
723                                                 catch (ServiceException sx) {
724                                                         log.error("Description processing failed",      sx);
725                                                         isComplete = false;
726                                                 }
727                                                 hasRevisionChanged = true;
728                                         }
729                                 } //end revision processing
730
731                                 if (!isRevisionNew && hasRevisionChanged) {
732                                         try {
733                                                 //we do not actually update any properties, just give CDS a chance to update the timestamps as to mark it as updated.
734                                                 catalog.putSolutionRevision(SolutionRevision.buildFrom(localRevision).build(),
735                                                                                                                                                                 theContext);
736                                         }
737                                         catch (ServiceException sx) {
738                                                 log.error("Failed to update local revision",    sx);
739                                                 isComplete = false;
740                                         }
741                                 }       
742
743                                 hasSolutionChanged |= (isRevisionNew || hasRevisionChanged);
744                         } //end revisions processing
745
746                         if (!isSolutionNew && hasSolutionChanged) {
747                                 try {
748                                         catalog.putSolution(copySolution(peerSolution, localSolution), theContext);
749                                 }
750                                 catch (ServiceException sx) {
751                                         log.error("Failed to update local solution",    sx);
752                                         isComplete = false;
753                                 }
754                         }
755
756                         return isComplete;
757                 } // mapSolution
758         }
759 }