Render modified timestamp as seconds since Epoch
[federation.git] / gateway / src / main / java / org / acumos / federation / gateway / adapter / PeerGateway.java
1 /*-
2  * ===============LICENSE_START=======================================================
3  * Acumos
4  * ===================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6  * ===================================================================================
7  * This Acumos software file is distributed by AT&T and Tech Mahindra
8  * under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *  
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  * This file is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ===============LICENSE_END=========================================================
19  */
20
21 package org.acumos.federation.gateway.adapter;
22
23 import java.io.Closeable;
24 import java.lang.invoke.MethodHandles;
25 import java.time.Instant;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.LinkedHashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32
33 import javax.annotation.PostConstruct;
34 import javax.annotation.PreDestroy;
35
36 import org.acumos.cds.AccessTypeCode;
37 import org.acumos.cds.client.ICommonDataServiceRestClient;
38 import org.acumos.cds.domain.MLPPeer;
39 import org.acumos.cds.domain.MLPPeerSubscription;
40 import org.acumos.cds.domain.MLPRevisionDescription;
41 import org.acumos.cds.domain.MLPSolution;
42 import org.acumos.cds.domain.MLPSolutionRevision;
43 import org.acumos.cds.domain.MLPTag;
44 import org.acumos.federation.gateway.cds.Artifact;
45 import org.acumos.federation.gateway.cds.Document;
46 import org.acumos.federation.gateway.cds.PeerSubscription;
47 import org.acumos.federation.gateway.cds.Solution;
48 import org.acumos.federation.gateway.cds.SolutionRevision;
49 import org.acumos.federation.gateway.cds.SubscriptionScope;
50 import org.acumos.federation.gateway.cds.TimestampedEntity;
51 import org.acumos.federation.gateway.common.Clients;
52 import org.acumos.federation.gateway.common.FederationClient;
53 import org.acumos.federation.gateway.common.FederationException;
54 import org.acumos.federation.gateway.common.JsonResponse;
55 import org.acumos.federation.gateway.config.GatewayCondition;
56 import org.acumos.federation.gateway.event.PeerSubscriptionEvent;
57 import org.acumos.federation.gateway.service.CatalogService;
58 import org.acumos.federation.gateway.service.ContentService;
59 import org.acumos.federation.gateway.service.PeerSubscriptionService;
60 import org.acumos.federation.gateway.service.ServiceContext;
61 import org.acumos.federation.gateway.service.ServiceException;
62 import org.acumos.federation.gateway.util.Utils;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import org.springframework.beans.factory.BeanInitializationException;
66 import org.springframework.beans.factory.annotation.Autowired;
67 import org.springframework.beans.factory.annotation.Qualifier;
68 import org.springframework.context.annotation.Conditional;
69 import org.springframework.context.annotation.Scope;
70 import org.springframework.context.event.EventListener;
71 import org.springframework.core.env.Environment;
72 import org.springframework.core.io.Resource;
73 import org.springframework.core.task.TaskExecutor;
74 import org.springframework.stereotype.Component;
75 import org.springframework.web.client.HttpStatusCodeException;
76
77
78 @Component("peergateway")
79 @Scope("singleton")
80 @Conditional({GatewayCondition.class})
81 public class PeerGateway {
82
83         private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
84         @Autowired
85         @Qualifier("acumos")
86         private TaskExecutor taskExecutor;
87         @Autowired
88         private Environment env;
89         @Autowired
90         private Clients clients;
91         @Autowired
92         private ContentService content;
93         @Autowired
94         private CatalogService catalog;
95         @Autowired
96         private PeerSubscriptionService peerSubscriptionService;
97
98         private static final String federationDotOperator = "federation.operator";
99
100         public PeerGateway() {
101                 log.trace("PeerGateway::new");
102         }
103
104         @PostConstruct
105         public void initGateway() {
106                 log.trace("initPeerGateway");
107
108                 /* make sure an operator was specified and that it is a declared user */
109                 if (null == this.env.getProperty(federationDotOperator)) {
110                         throw new BeanInitializationException("Missing configuration key " + federationDotOperator);
111                 } 
112                 else {
113                         try {
114                                 if (null == this.clients.getCDSClient().getUser(this.env.getProperty(federationDotOperator))) {
115                                         log.warn(federationDotOperator + 
116                                                         " does not point to an existing user");
117                                 }
118                         }
119                         catch (/* HttpStatusCode */Exception dx) {
120                                 log.warn("failed to verify value " + federationDotOperator, dx);
121                         }
122                 }
123
124                 // Done
125                 log.debug("PeerGateway available");
126         }
127
128         @PreDestroy
129         public void cleanupGateway() {
130                 log.debug("PeerGateway destroyed");
131         }
132
133         protected String getUserId(MLPPeerSubscription theSubscription/*
134                                                                                                                                          * , MLPSolution theSolution
135                                                                                                                                          */) {
136                 String userId = theSubscription.getUserId();
137                 return userId != null ? userId : this.env.getProperty(federationDotOperator);
138         }
139
140         @EventListener
141         public void handlePeerSubscriptionUpdate(PeerSubscriptionEvent theEvent) {
142                 log.info("received peer subscription update event {}", theEvent);
143                 taskExecutor.execute(
144                                 new PeerGatewayUpdateTask(theEvent.getPeer(), theEvent.getSubscription()));
145         }
146
147         /**
148          * The list of solutions processed here represents the solutions (with respect
149          * to the subscription filter definition) that were reported by the peer as
150          * being updated since the last check.
151          */
152         public class PeerGatewayUpdateTask implements Runnable {
153
154                 private MLPPeer peer;
155                 private PeerSubscription sub;
156
157                 public PeerGatewayUpdateTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
158                         this.peer = thePeer;
159                         this.sub = new PeerSubscription(theSub);
160                 }
161
162                 public void run() {
163
164                         Map selector = null;
165                         try {
166                                 selector = Utils.jsonStringToMap(this.sub.getSelector());
167                         }
168                         catch(Exception x) {
169                                 log.error("Failed to parse selector for subscription {}", this.sub);
170                                 return;
171                         }
172                         Instant lastProcessed = this.sub.getProcessed();
173                         if (lastProcessed != null) {
174                                 selector.put("modified", lastProcessed.getEpochSecond());
175                         }
176                         lastProcessed = Instant.now();
177                         
178                         FederationClient peerClient = clients.getFederationClient(this.peer.getApiUrl());
179                         if (peerClient == null) {
180                                 log.error("Failed to get client for peer {}", this.peer);
181                                 return;
182                         }
183
184                         JsonResponse<List<MLPSolution>> peerSolutionsResponse = null;
185                         try {
186                                 peerSolutionsResponse = peerClient.getSolutions(selector);
187                         }
188                         catch (FederationException fx) {
189                                 log.info("Processing peer " + this.peer + " subscription " + this.sub.getSubId() + " error.", fx);
190                                 return;
191                         }
192
193                         List<MLPSolution> peerSolutions = peerSolutionsResponse.getContent();
194                         log.info("Processing peer {} subscription {}, {} yielded solutions {}", this.peer, this.sub.getSubId(), selector, peerSolutions);
195                         if (peerSolutions == null) {
196                                 log.warn("No solutions available for peer {} subscription {} in {}", this.peer, this.sub.getSubId(), peerSolutionsResponse);
197                                 peerSolutions = Collections.EMPTY_LIST;
198                                 //and let it proceed so we end up marking it as processed
199                         }
200
201                         ServiceContext ctx = catalog.selfService();
202                         boolean isComplete = true;
203
204                         for (MLPSolution peerSolution : peerSolutions) {
205                                 log.info("Processing peer solution {}", peerSolution);
206
207                                 try {
208                                         isComplete &= mapSolution(peerSolution, peerClient, ctx);
209                                 }
210                                 catch (Throwable t) {
211                                         log.error("Mapping of acumos solution failed for " + peerSolution, t);
212                                 }
213                         }
214                                         
215                         log.info("Processing of subscription {} completed succesfully: {}", this.sub, isComplete);
216                         //only commit the last processed date if we completed succesfully
217                         if (isComplete) {
218                                 try {
219                                         this.sub.setProcessed(lastProcessed);
220                                         peerSubscriptionService.updatePeerSubscription(this.sub);
221                                 }
222                                 catch (ServiceException sx) {
223                                         log.error("Failed to update subscription information", sx);
224                                 }
225                         }
226                 }
227
228                 //this should go away once the move to service interface based operations is complete
229                 //as ugly as they come
230                 private ICommonDataServiceRestClient getCDSClient(ServiceContext theContext) {
231                         return PeerGateway.this.clients.getCDSClient();
232                 }
233
234                 private Artifact copyArtifact(Artifact peerArtifact) {
235                         return Artifact.buildFrom(peerArtifact)
236                                                                 .withUser(getUserId(this.sub))
237                                                                 .withCreated(TimestampedEntity.ORIGIN)
238                                                                 .withModified(TimestampedEntity.ORIGIN)
239                                                                 .build();
240                 }
241
242                 /* we create a new one as nothing is preserved. assumes matching ids. */
243                 private Artifact copyArtifact(Artifact peerArtifact, Artifact localArtifact) {
244                         return Artifact.buildFrom(peerArtifact)
245                                                                 .withId(localArtifact.getArtifactId())
246                                                                 .withUser(getUserId(this.sub))
247                                                                 .build();
248                 }
249
250                 private void putArtifact(String theSolutionId, String theRevisionId, Artifact theArtifact,
251                                 ServiceContext theContext) throws ServiceException {
252
253                         assert(getCDSClient(theContext) != null);
254
255                         try {
256                                 if (theArtifact.getCreated() == Instant.MIN) {
257                                         getCDSClient(theContext).createArtifact(theArtifact);
258                                         getCDSClient(theContext).addSolutionRevisionArtifact(theSolutionId, theRevisionId, theArtifact.getArtifactId());
259                                         log.info("Local artifact created: {}", theArtifact);
260                                 }
261                                 else {
262                                         getCDSClient(theContext).updateArtifact(theArtifact);
263                                         log.info("Local artifact updated: {}", theArtifact);
264                                 }
265         
266                         }
267                         catch (HttpStatusCodeException restx) {
268                                 log.error("Artifact CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
269                                 throw new ServiceException("Artifact CDS call failed.", restx);
270                         }
271                         catch (Exception x) {
272                                 log.error("Artifact unexpected failure", x);
273                                 throw new ServiceException("Artifact CDS call failed.", x);
274                         }
275                 }
276
277                 private Document copyDocument(Document peerDocument) {
278                         return Document.buildFrom(peerDocument)
279                                                                 .withUser(getUserId(this.sub))
280                                                                 .withCreated(TimestampedEntity.ORIGIN)
281                                                                 .withModified(TimestampedEntity.ORIGIN)
282                                                                 .build();
283                 }
284
285                 private Document copyDocument(Document peerDocument, Document localDocument) {
286                         return Document.buildFrom(peerDocument)
287                                                                 .withId(localDocument.getDocumentId())
288                                                                 .withUser(getUserId(this.sub))
289                                                                 .build();
290                 }
291
292                 private void putDocument(String theSolutionId, String theRevisionId, Document theDocument,
293                                 ServiceContext theContext) throws ServiceException {
294
295                         try {
296                                 if (theDocument.getCreated() == Instant.MIN) {
297                                         getCDSClient(theContext).createDocument(theDocument);
298                                         getCDSClient(theContext).addSolutionRevisionDocument(theRevisionId, AccessTypeCode.PB.name(), theDocument.getDocumentId());
299                                         log.info("Local document created: {}", theDocument);
300                                 }
301                                 else {
302                                         getCDSClient(theContext).updateDocument(theDocument);
303                                         log.info("Local document updated: {}", theDocument);
304                                 }
305                         }
306                         catch (HttpStatusCodeException restx) {
307                                 log.error("Document CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
308                                 throw new ServiceException("Document CDS call failed.", restx);
309                         }
310                         catch (Exception x) {
311                                 log.error("Document handling unexpected failure", x);
312                                 throw new ServiceException("Document handling unexpected failure", x);
313                         }
314                 }
315         
316                 private MLPRevisionDescription copyRevisionDescription(MLPRevisionDescription peerDescription) {
317                         MLPRevisionDescription localDescription = new MLPRevisionDescription(peerDescription);
318                         localDescription.setCreated(TimestampedEntity.ORIGIN);
319                         localDescription.setModified(TimestampedEntity.ORIGIN);
320                         return localDescription;
321                 }
322
323                 private MLPRevisionDescription copyRevisionDescription(MLPRevisionDescription peerDescription, MLPRevisionDescription localDescription) {
324                         localDescription.setDescription(peerDescription.getDescription());
325                         return localDescription;
326                 }
327
328                 private void putRevisionDescription(MLPRevisionDescription theDescription,ServiceContext theContext) throws ServiceException {
329                         
330                         try {
331                                 if (theDescription.getCreated() == Instant.MIN) {
332                                         getCDSClient(theContext).createRevisionDescription(theDescription);
333                                         log.info("Local description created: {}", theDescription);
334                                 }
335                                 else {
336                                         getCDSClient(theContext).updateRevisionDescription(theDescription);
337                                 }
338                         }
339                         catch (HttpStatusCodeException restx) {
340                                 log.error("Revision description CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
341                                 throw new ServiceException("Revision description CDS call failed.", restx);
342                         }
343                         catch (Exception x) {
344                                 log.error("Revision description handling unexpected failure", x);
345                                 throw new ServiceException("Revision description handling unexpected failure", x);
346                         }
347                 }
348
349                 private boolean hasChanged(Artifact thePeerArtifact, Artifact theLocalArtifact) {
350                         if (thePeerArtifact.getVersion() != null && theLocalArtifact.getVersion() != null) { 
351                                 return !thePeerArtifact.getVersion().equals(theLocalArtifact.getVersion());
352                         }
353
354                         if (thePeerArtifact.getSize() != null && theLocalArtifact.getSize() != null) { 
355                                 return !thePeerArtifact.getSize().equals(theLocalArtifact.getSize());
356                         }
357
358                         return true;
359                 }       
360
361                 private boolean hasChanged(Document thePeerDoc, Document theLocalDoc) {
362                         if (thePeerDoc.getVersion() != null && theLocalDoc.getVersion() != null) { 
363                                 return !thePeerDoc.getVersion().equals(theLocalDoc.getVersion());
364                         }
365
366                         if (thePeerDoc.getSize() != null && theLocalDoc.getSize() != null) { 
367                                 return !thePeerDoc.getSize().equals(theLocalDoc.getSize());
368                         }
369
370                         return true;
371                 }
372
373                 private Solution copySolution(Solution thePeerSolution) {
374                         return Solution.buildFrom(thePeerSolution)
375                                                                 .withCreated(TimestampedEntity.ORIGIN)
376                                                                 .withModified(TimestampedEntity.ORIGIN)
377                                                                 .withUser(getUserId(this.sub))
378                                                                 .withSource(this.peer.getPeerId())
379                                                                 .resetStats()
380                                                                 .build();
381                 }
382         
383                 private Solution copySolution(Solution thePeerSolution, Solution theLocalSolution) {    
384                         String newUserId = getUserId(this.sub),
385                                                  newSourceId = this.peer.getPeerId();
386
387                         //some basic warnings
388                         if (!theLocalSolution.getUserId().equals(newUserId)) {
389                                 // is this solution being updated as part of different/new subscription?
390                                 log.warn("Updating solution {} triggers a user change", theLocalSolution.getSolutionId());
391                                 //but make the change anyway
392                                 theLocalSolution.setUserId(newUserId);
393                         }
394
395                         if (theLocalSolution.getSourceId() == null) {
396                                 //this is a local solution that made its way back
397                                 log.info("Solution {} was originally provisioned locally, avoiding user update", theLocalSolution.getSolutionId());
398                         }
399                         else {
400                                 if (!theLocalSolution.getSourceId().equals(newSourceId)) {
401                                         // we will see this if a solution is available in more than one peer
402                                         log.warn("Solution {} triggers a source change", theLocalSolution.getSolutionId());
403                                         //but make the change anyway
404                                         theLocalSolution.setSourceId(newSourceId);
405                                 }
406                         }
407
408                         //tags, keep only the delta
409                         Set<MLPTag> tags = thePeerSolution.getTags();
410                         tags.removeAll(theLocalSolution.getTags());
411                         theLocalSolution.setTags(tags);                 
412
413                         return theLocalSolution;
414                 }
415
416                 private boolean hasChanged(Solution thePeerSolution, Solution theLocalSolution) {
417                         if (!theLocalSolution.getTags().containsAll(thePeerSolution.getTags()))
418                                 return false;
419
420                         return true;
421                 }
422
423                 /**
424                  * Here comes the core process of updating a local solution's related
425                  * information with what is available from a peer.
426                  * 
427                  * @param theSolution
428                  *            the local solution who's related information (revisions and
429                  *            artifacts) we are trying to sync
430                  * @param thePeerClient
431                  *            client
432                  * @param theContext
433                  *            the context in which we perform the catalog operations
434                  * @return true if mapping was succesful, false otherwise
435                  * @throws Exception
436                  *             any error related to CDS and peer interaction
437                  */
438                 protected boolean mapSolution(MLPSolution theSolution, FederationClient thePeerClient, ServiceContext theContext) throws Exception {
439
440                         boolean isComplete = true,
441                                                         isSolutionNew = false,
442                                                         hasSolutionChanged = false;
443
444                         Solution localSolution = null,
445                                                          peerSolution = null;
446
447                         //retrieve the full representation from the peer
448                         JsonResponse<MLPSolution> peerSolutionResponse = null;
449                         try {
450                                 peerSolutionResponse = thePeerClient.getSolution(theSolution.getSolutionId());
451                         }
452                         catch (FederationException fx) {
453                                 log.warn("Failed to retrieve peer solution details for " + theSolution, fx);
454                                 return false;
455                         }
456
457                         peerSolution = (Solution)peerSolutionResponse.getContent();
458                         if (peerSolution == null) {
459                                 log.warn("No solution details available for {} in {}", theSolution, peerSolutionResponse);
460                                 return false;
461                         }
462
463                         localSolution = catalog.getSolution(peerSolution.getSolutionId());
464                         if (localSolution == null) {
465                                 localSolution = catalog.putSolution(copySolution(peerSolution), theContext);
466                         isSolutionNew = true;
467                         }
468                         else {
469                                 hasSolutionChanged = hasChanged(peerSolution, localSolution);
470                         }
471                         
472                         List<MLPSolutionRevision> peerRevisions = (List)peerSolution.getRevisions();
473                         Collections.sort(peerRevisions, (arev, brev) -> arev.getModified().compareTo(brev.getModified()));
474
475                         // this should not happen as any solution should have at least one
476                         // revision (but that's an assumption on how on-boarding works)
477                         if (peerRevisions == null || peerRevisions.size() == 0) {
478                                 log.warn("No peer revisions were retrieved");
479                                 return true;
480                         }
481
482                         // check if we have locally the latest revision available on the peer
483                         List<MLPSolutionRevision> catalogRevisions = (List)localSolution.getRevisions();
484                         final List<MLPSolutionRevision> localRevisions = catalogRevisions == null ? Collections.EMPTY_LIST : catalogRevisions;
485
486                         // map peer revisions to local ones; new peer revisions have a null mapping
487                         Map<MLPSolutionRevision, MLPSolutionRevision> peerToLocalRevisions =
488                                         new LinkedHashMap<MLPSolutionRevision, MLPSolutionRevision>();
489                         peerRevisions.forEach(peerRevision -> peerToLocalRevisions.put(peerRevision,
490                                         localRevisions.stream()
491                                                         .filter(localRevision -> localRevision.getRevisionId().equals(peerRevision.getRevisionId()))
492                                                         .findFirst().orElse(null)));
493
494                         for (Map.Entry<MLPSolutionRevision, MLPSolutionRevision> revisionEntry : peerToLocalRevisions.entrySet()) {
495                                 MLPSolutionRevision peerRevision = revisionEntry.getKey(), localRevision = revisionEntry.getValue();
496
497                                 boolean isRevisionNew = false,
498                                                                 hasRevisionChanged = false;
499
500                                 //revision related information (artifacts/documents/description/..) is now embedded in the revision details
501                                 //federation api call so one call is all is needed
502                                 JsonResponse<MLPSolutionRevision> peerRevisionResponse = null;
503                                 try {
504                                         peerRevisionResponse = thePeerClient.getSolutionRevision(peerSolution.getSolutionId(), peerRevision.getRevisionId());
505                                 }
506                                 catch (FederationException fx) {
507                                         isComplete = false; //try the next revision but mark the overall processing as incomplete
508                                         continue;
509                                 }
510
511                                 peerRevision = peerRevisionResponse.getContent();
512                                 if (peerRevision == null) {
513                                         isComplete = false; //try the next revision but mark the overall processing as incomplete
514                                         continue;
515                                 }
516
517                                 if (localRevision == null) {
518                                         try {
519                                                 localRevision = catalog.putSolutionRevision(
520                                                                                                                                                         SolutionRevision.buildFrom(peerRevision)
521                                                                                                                                                                 .withCreated(TimestampedEntity.ORIGIN)
522                                                                                                                                                                 .withModified(TimestampedEntity.ORIGIN)
523                                                                                                                                                                 .withUser(getUserId(this.sub))
524                                                                                                                                                                 .withSource(this.peer.getPeerId())
525                                                                                                                                                                 .withAccessTypeCode(this.sub.getAccessType())
526                                                                                                                                                                 .build(), theContext);
527                                         }
528                                         catch (ServiceException sx) {
529                                                 log.error("Failed to put revision " + theSolution.getSolutionId() + "/" + peerRevision.getRevisionId() + " into catalog", sx);
530                                                 isComplete = false; //try procecessing the next revision but mark the processing as incomplete
531                                                 continue;
532                                         }
533                                         isRevisionNew = true;
534                                 }
535
536                                 List<Artifact> peerArtifacts = (List)((SolutionRevision)peerRevision).getArtifacts();
537                                 List<Document> peerDocuments = (List)((SolutionRevision)peerRevision).getDocuments();
538
539                                 List<Artifact> catalogArtifacts = (List)((SolutionRevision)localRevision).getArtifacts();
540                                 List<Document> catalogDocuments = (List)((SolutionRevision)localRevision).getDocuments();
541
542                                 final List<Artifact> localArtifacts = catalogArtifacts;
543                                 // map the artifacts
544                                 // TODO: track deleted artifacts
545                                 Map<Artifact, Artifact> peerToLocalArtifacts = new HashMap<Artifact, Artifact>();
546                                 peerArtifacts.forEach(peerArtifact -> peerToLocalArtifacts.put(peerArtifact, localArtifacts.stream()
547                                                 .filter(localArtifact -> localArtifact.getArtifactId().equals(peerArtifact.getArtifactId()))
548                                                 .findFirst().orElse(null)));
549
550                                 for (Map.Entry<Artifact, Artifact> artifactEntry : peerToLocalArtifacts.entrySet()) {
551                                         Artifact peerArtifact = artifactEntry.getKey(),
552                                                                          localArtifact = artifactEntry.getValue();
553                                         boolean doCatalog = false;
554                                         
555                                         log.info("Processing peer artifact {} against local artifact {}", peerArtifact, localArtifact);
556
557                                         if (localArtifact == null) {
558                                                 localArtifact = copyArtifact(peerArtifact);
559                                                 doCatalog = true;
560                                         }
561                                         else {
562                                                 if (hasChanged(peerArtifact, localArtifact)) {
563                                                         // update local artifact
564                                                         localArtifact = copyArtifact(peerArtifact, localArtifact);
565                                                         doCatalog = true;
566                                                 }
567                                         }
568
569                                         boolean doContent = doCatalog &&
570                                                                                                                         (peerArtifact.getUri() != null) &&
571                                                                                                                         (SubscriptionScope.Full == SubscriptionScope.forCode(this.sub.getScopeType()));
572                                         if (doContent) {
573                                                 log.info("Processing content for artifact {}", peerArtifact); 
574                                                 // TODO: we are trying to access the artifact by its identifier which
575                                                 // is fine in the common case but the uri specified in the artifact
576                                                 // data is the right approach (as it does not rely on the E5 definition).
577                                                 Resource artifactContent = null;
578                                                 try {
579                                                         artifactContent = thePeerClient.getArtifactContent(
580                                                                 peerSolution.getSolutionId(), peerRevision.getRevisionId(), peerArtifact.getArtifactId());
581                                                         log.info("Received {} bytes of artifact content", artifactContent.contentLength()); 
582                                                 }
583                                                 catch (FederationException x) {
584                                                         log.error("Failed to retrieve acumos artifact content", x);
585                                                         doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
586                                                         isComplete = false;
587                                                 }
588
589                                                 if (artifactContent != null) {
590                                                         try {
591                                                                 content.putArtifactContent(
592                                                                         localSolution.getSolutionId(), localRevision.getRevisionId(), localArtifact, artifactContent);
593                                                                 doCatalog = true;
594                                                         }
595                                                         catch (ServiceException sx) {
596                                                                 log.error("Failed to store artifact content to local repo", sx);
597                                                                 doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
598                                                                 isComplete = false;
599                                                         }
600                                                         finally {
601                                                                 if (artifactContent instanceof Closeable) {
602                                                                         ((Closeable)artifactContent).close();
603                                                                 }
604                                                         }
605                                                 }
606                                         }
607
608                                         if (doCatalog) {
609                                                 try {
610                                                         putArtifact(localSolution.getSolutionId(), localRevision.getRevisionId(), localArtifact, theContext);
611                                                 }
612                                                 catch (ServiceException sx) {
613                                                         log.error("Artifact processing failed.", sx);
614                                                         isComplete = false;
615                                                 }
616                                                 hasRevisionChanged = true;
617                                         }
618                                 } //end map artifacts loop
619
620
621                                 final List<Document> localDocuments = catalogDocuments;
622                                 // map the documents
623                                 // TODO: track deleted documents
624                                 Map<Document, Document> peerToLocalDocuments = new HashMap<Document, Document>();
625                                 peerDocuments.forEach(peerDocument -> peerToLocalDocuments.put(peerDocument, localDocuments.stream()
626                                                 .filter(localDocument -> localDocument.getDocumentId().equals(peerDocument.getDocumentId()))
627                                                 .findFirst().orElse(null)));
628
629                                 for (Map.Entry<Document, Document> documentEntry : peerToLocalDocuments.entrySet()) {
630                                         Document peerDocument = documentEntry.getKey(),
631                                                                          localDocument = documentEntry.getValue();
632                                         boolean doCatalog = false;
633
634                                         log.info("Processing peer document {} against local version {}", peerDocument, localDocument);
635                                         if (localDocument == null) {
636                                                 localDocument = copyDocument(peerDocument);
637                                                 doCatalog = true;
638                                         }
639                                         else {
640                                                 //version strings are not standard so comparing them is not necessarly safe
641                                                 if (hasChanged(peerDocument, localDocument)) {
642                                                         // update local doc
643                                                         localDocument = copyDocument(peerDocument, localDocument);
644                                                         doCatalog = true;
645                                                 }
646                                         }
647
648                                         boolean doContent = doCatalog &&
649                                                                                                                         (peerDocument.getUri() != null) &&
650                                                                                                                         (SubscriptionScope.Full == SubscriptionScope.forCode(this.sub.getScopeType()));
651                                         if (doContent) {
652                                                 log.info("Processing content for document {}", peerDocument); 
653                                                 // TODO: we are trying to access the document by its identifier which
654                                                 // is fine in the common case but the uri specified in the document
655                                                 // data is a more flexible approach.
656                                                 Resource documentContent = null;
657                                                 try {
658                                                         documentContent = thePeerClient.getDocumentContent(
659                                                                 peerSolution.getSolutionId(), localRevision.getRevisionId(), peerDocument.getDocumentId());
660                                                         log.info("Received {} bytes of document content", documentContent.contentLength()); 
661                                                 }
662                                                 catch (FederationException x) {
663                                                         log.error("Failed to retrieve acumos document content", x);
664                                                         doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
665                                                         isComplete = false;
666                                                 }
667
668                                                 if (documentContent != null) {
669                                                         try {
670                                                                 content.putDocumentContent(
671                                                                         localSolution.getSolutionId(), localRevision.getRevisionId(), localDocument, documentContent);
672                                                                 doCatalog = true;
673                                                         }
674                                                         catch (ServiceException sx) {
675                                                                 log.error("Failed to store document content to local repo", sx);
676                                                                 doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
677                                                                 isComplete = false;
678                                                         }
679                                                 }
680                                         }
681
682                                         if (doCatalog) {
683                                                 try {
684                                                         putDocument(localSolution.getSolutionId(), localRevision.getRevisionId(), localDocument, theContext);
685                                                 }
686                                                 catch (ServiceException sx) {
687                                                         log.error("Document processing failed", sx);
688                                                         isComplete = false;
689                                                 }
690                                                 hasRevisionChanged = true;
691                                         }
692         
693                                 } // end map documents loop
694                                 
695                                 MLPRevisionDescription localDescription = ((SolutionRevision)localRevision).getRevisionDescription();
696                                 MLPRevisionDescription peerDescription = ((SolutionRevision)peerRevision).getRevisionDescription();
697
698                                 if (peerDescription != null) {
699                                         boolean doCatalog = false;
700
701                                         if (localDescription == null) {
702                                                 localDescription = copyRevisionDescription(peerDescription);
703                                                 doCatalog = true;
704                                         }
705                                         else {
706                                                 //is this a good enough test ?? it implies time sync ..
707                                                 if (peerDescription.getModified().isAfter(localDescription.getModified())) {
708                                                         localDescription = copyRevisionDescription(peerDescription, localDescription);
709                                                         doCatalog = true;
710                                                 }
711                                         }
712
713                                         if (doCatalog) {
714                                                 try {
715                                                         putRevisionDescription(localDescription, theContext);
716                                                 }
717                                                 catch (ServiceException sx) {
718                                                         log.error("Description processing failed",      sx);
719                                                         isComplete = false;
720                                                 }
721                                                 hasRevisionChanged = true;
722                                         }
723                                 } //end revision processing
724
725                                 if (!isRevisionNew && hasRevisionChanged) {
726                                         try {
727                                                 //we do not actually update any properties, just give CDS a chance to update the timestamps as to mark it as updated.
728                                                 catalog.putSolutionRevision(SolutionRevision.buildFrom(localRevision).build(),
729                                                                                                                                                                 theContext);
730                                         }
731                                         catch (ServiceException sx) {
732                                                 log.error("Failed to update local revision",    sx);
733                                                 isComplete = false;
734                                         }
735                                 }       
736
737                                 hasSolutionChanged |= (isRevisionNew || hasRevisionChanged);
738                         } //end revisions processing
739
740                         if (!isSolutionNew && hasSolutionChanged) {
741                                 try {
742                                         catalog.putSolution(copySolution(peerSolution, localSolution), theContext);
743                                 }
744                                 catch (ServiceException sx) {
745                                         log.error("Failed to update local solution",    sx);
746                                         isComplete = false;
747                                 }
748                         }
749
750                         return isComplete;
751                 } // mapSolution
752         }
753 }