Fix subscription task early cancellation 74/3274/2
authorSerban Jora <sj2381@att.com>
Tue, 30 Oct 2018 04:53:58 +0000 (00:53 -0400)
committerLott, Christopher (cl778h) <cl778h@att.com>
Tue, 30 Oct 2018 10:07:17 +0000 (06:07 -0400)
Change-Id: I4335c638317fc87130bf93a47f5894f5f9d043ed
Signed-off-by: Serban Jora <sj2381@att.com>
Issue-ID: ACUMOS-1937, ACUMOS-1952

19 files changed:
docs/release-notes.rst
gateway/pom.xml
gateway/src/main/java/org/acumos/federation/gateway/adapter/PeerGateway.java
gateway/src/main/java/org/acumos/federation/gateway/adapter/onap/ONAP.java
gateway/src/main/java/org/acumos/federation/gateway/cds/PeerSubscription.java
gateway/src/main/java/org/acumos/federation/gateway/common/API.java
gateway/src/main/java/org/acumos/federation/gateway/common/FederationClient.java
gateway/src/main/java/org/acumos/federation/gateway/common/FederationException.java [new file with mode: 0644]
gateway/src/main/java/org/acumos/federation/gateway/common/PeerException.java [new file with mode: 0644]
gateway/src/main/java/org/acumos/federation/gateway/config/NexusConfiguration.java
gateway/src/main/java/org/acumos/federation/gateway/event/PeerSubscriptionEvent.java
gateway/src/main/java/org/acumos/federation/gateway/service/impl/ContentServiceImpl.java
gateway/src/main/java/org/acumos/federation/gateway/service/impl/ServiceImpl.java
gateway/src/main/java/org/acumos/federation/gateway/task/PeerSubscriptionTask.java [deleted file]
gateway/src/main/java/org/acumos/federation/gateway/task/PeerSubscriptionTaskScheduler.java
gateway/src/main/java/org/acumos/federation/gateway/task/TaskConfiguration.java
gateway/src/test/java/org/acumos/federation/gateway/test/TaskTest.java
gateway/src/test/java/org/acumos/federation/gateway/test/TestAdapter.java
gateway/src/test/resources/task-test-peers.json

index da71d9f..2bb7200 100644 (file)
@@ -22,6 +22,13 @@ Federation Gateway Release Notes
 
 This server is available as a Docker image in a Docker registry at the Linux Foundation.
 
+--------------------------
+Version 1.18.7, 2018-10-30
+--------------------------
+
+* Fix the subscription task early cancellation (`ACUMOS-1937 <https://jira.acumos.org/browse/ACUMOS-1937>`_)
+* Fix the preemptive authentication (`ACUMOS-1952 <https://jira.acumos.org/browse/ACUMOS-1952>`_)
+
 --------------------------
 Version 1.18.6, 2018-10-08
 --------------------------
index 091dbfe..20976c8 100644 (file)
@@ -17,7 +17,7 @@
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.acumos.federation</groupId>
        <artifactId>gateway</artifactId>
-       <version>1.18.6-SNAPSHOT</version>
+       <version>1.18.7-SNAPSHOT</version>
        <name>Federation Gateway</name>
        <description>Federated Acumos Interface for inter-acumos and ONAP communication</description>
 
index 483b462..9bb84ea 100644 (file)
@@ -45,8 +45,11 @@ import org.acumos.federation.gateway.cds.Solution;
 import org.acumos.federation.gateway.cds.SolutionRevision;
 import org.acumos.federation.gateway.cds.SubscriptionScope;
 import org.acumos.federation.gateway.cds.TimestampedEntity;
+import org.acumos.federation.gateway.util.Utils;
 import org.acumos.federation.gateway.common.Clients;
 import org.acumos.federation.gateway.common.FederationClient;
+import org.acumos.federation.gateway.common.FederationException;
+import org.acumos.federation.gateway.common.JsonResponse;
 import org.acumos.federation.gateway.config.EELFLoggerDelegate;
 import org.acumos.federation.gateway.config.GatewayCondition;
 import org.acumos.federation.gateway.event.PeerSubscriptionEvent;
@@ -132,7 +135,7 @@ public class PeerGateway {
        public void handlePeerSubscriptionUpdate(PeerSubscriptionEvent theEvent) {
                log.info(EELFLoggerDelegate.debugLogger, "received peer subscription update event {}", theEvent);
                taskExecutor.execute(
-                               new PeerGatewayUpdateTask(theEvent.getPeer(), theEvent.getSubscription(), theEvent.getSolutions()));
+                               new PeerGatewayUpdateTask(theEvent.getPeer(), theEvent.getSubscription()));
        }
 
        /**
@@ -144,28 +147,59 @@ public class PeerGateway {
 
                private MLPPeer peer;
                private PeerSubscription sub;
-               private List<MLPSolution> solutions;
 
-               public PeerGatewayUpdateTask(MLPPeer thePeer, MLPPeerSubscription theSub, List<MLPSolution> theSolutions) {
+               public PeerGatewayUpdateTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
                        this.peer = thePeer;
                        this.sub = new PeerSubscription(theSub);
-                       this.solutions = theSolutions;
-               
-                       //remember when we processed the subscription
-                       this.sub.setProcessed(new Date());
                }
 
                public void run() {
 
-                       log.info(EELFLoggerDelegate.debugLogger, "Received peer " + this.peer + " solutions: " + this.solutions);
+                       Map selector = null;
+                       try {
+                               selector = Utils.jsonStringToMap(this.sub.getSelector());
+                       }
+                       catch(Exception x) {
+                               log.error(EELFLoggerDelegate.errorLogger, "Failed to parse selector for subscription {}", this.sub);
+                               return;
+                       }
+                       Date lastProcessed = this.sub.getProcessed();
+                       if (lastProcessed != null) {
+                               selector.put("modified", lastProcessed);
+                       }
+                       lastProcessed = new Date();
+                       
+                       FederationClient peerClient = clients.getFederationClient(this.peer.getApiUrl());
+                       if (peerClient == null) {
+                               log.error(EELFLoggerDelegate.errorLogger, "Failed to get client for peer {}", this.peer);
+                               return;
+                       }
+
+                       JsonResponse<List<MLPSolution>> peerSolutionsResponse = null;
+                       try {
+                               peerSolutionsResponse = peerClient.getSolutions(selector);
+                       }
+                       catch (FederationException fx) {
+                               log.info(EELFLoggerDelegate.errorLogger, "Processing peer " + this.peer + " subscription " + this.sub.getSubId() + " error.", fx);
+                               return;
+                       }
+
+                       List<MLPSolution> peerSolutions = peerSolutionsResponse.getContent();
+                       log.info(EELFLoggerDelegate.debugLogger, "Processing peer {} subscription {}, {} yielded solutions {}", this.peer, this.sub.getSubId(), selector, peerSolutions);
+                       if (peerSolutions == null) {
+                               log.warn(EELFLoggerDelegate.debugLogger, "No solutions available for peer {} subscription {} in {}", this.peer, this.sub.getSubId(), peerSolutionsResponse);
+                               peerSolutions = Collections.EMPTY_LIST;
+                               //and let it proceed so we end up marking it as processed
+                       }
+
                        ServiceContext ctx = catalog.selfService();
                        boolean isComplete = true;
 
-                       for (MLPSolution peerSolution : this.solutions) {
+                       for (MLPSolution peerSolution : peerSolutions) {
                                log.info(EELFLoggerDelegate.debugLogger, "Processing peer solution {}", peerSolution);
 
                                try {
-                                       isComplete &= mapSolution(peerSolution, ctx);
+                                       isComplete &= mapSolution(peerSolution, peerClient, ctx);
                                }
                                catch (Throwable t) {
                                        log.error(EELFLoggerDelegate.errorLogger,
@@ -177,6 +211,7 @@ public class PeerGateway {
                        //only commit the last processed date if we completed succesfully
                        if (isComplete) {
                                try {
+                                       this.sub.setProcessed(lastProcessed);
                                        peerSubscriptionService.updatePeerSubscription(this.sub);
                                }
                                catch (ServiceException sx) {
@@ -307,8 +342,31 @@ public class PeerGateway {
                                throw new ServiceException("Revision description handling unexpected failure", x);
                        }
                }
-       
 
+               private boolean hasChanged(Artifact thePeerArtifact, Artifact theLocalArtifact) {
+                       if (thePeerArtifact.getVersion() != null && theLocalArtifact.getVersion() != null) { 
+                               return !thePeerArtifact.getVersion().equals(theLocalArtifact.getVersion());
+                       }
+
+                       if (thePeerArtifact.getSize() != null && theLocalArtifact.getSize() != null) { 
+                               return !thePeerArtifact.getSize().equals(theLocalArtifact.getSize());
+                       }
+
+                       return true;
+               }       
+
+               private boolean hasChanged(Document thePeerDoc, Document theLocalDoc) {
+                       if (thePeerDoc.getVersion() != null && theLocalDoc.getVersion() != null) { 
+                               return !thePeerDoc.getVersion().equals(theLocalDoc.getVersion());
+                       }
+
+                       if (thePeerDoc.getSize() != null && theLocalDoc.getSize() != null) { 
+                               return !thePeerDoc.getSize().equals(theLocalDoc.getSize());
+                       }
+
+                       return true;
+               }
+       
                /**
                 * Here comes the core process of updating a local solution's related
                 * information with what is available from a peer.
@@ -322,16 +380,29 @@ public class PeerGateway {
                 * @throws Exception
                 *             any error related to CDS and peer interaction
                 */
-               protected boolean mapSolution(MLPSolution theSolution, ServiceContext theContext) throws Exception {
+               protected boolean mapSolution(MLPSolution theSolution, FederationClient thePeerClient, ServiceContext theContext) throws Exception {
 
                        boolean isComplete = true;
-                       FederationClient fedClient = clients.getFederationClient(this.peer.getApiUrl());
 
                        Solution localSolution = null,
                                                         peerSolution = null;
 
                        //retrieve the full representation from the peer
-                       peerSolution = (Solution)fedClient.getSolution(theSolution.getSolutionId()).getContent();
+                       JsonResponse<MLPSolution> peerSolutionResponse = null;
+                       try {
+                               peerSolutionResponse = thePeerClient.getSolution(theSolution.getSolutionId());
+                       }
+                       catch (FederationException fx) {
+                               log.warn(EELFLoggerDelegate.errorLogger, "Failed to retrieve peer solution details for " + theSolution, fx);
+                               return false;
+                       }
+
+                       peerSolution = (Solution)peerSolutionResponse.getContent();
+                       if (peerSolution == null) {
+                               log.warn(EELFLoggerDelegate.debugLogger, "No solution details available for {} in {}", theSolution, peerSolutionResponse);
+                               return false;
+                       }
+
                        localSolution = catalog.putSolution(
                                                                                                                                        Solution.buildFrom(peerSolution)
                                                                                                                                                .withUser(getUserId(this.sub))
@@ -373,16 +444,21 @@ public class PeerGateway {
 
                        for (Map.Entry<MLPSolutionRevision, MLPSolutionRevision> revisionEntry : peerToLocalRevisions.entrySet()) {
                                MLPSolutionRevision peerRevision = revisionEntry.getKey(), localRevision = revisionEntry.getValue();
-                       
+
                                //revision related information (artifacts/documents/description/..) is now embedded in the revision details
-                               //federation api call so one call is all is needed      
+                               //federation api call so one call is all is needed
+                               JsonResponse<MLPSolutionRevision> peerRevisionResponse = null;
                                try {
-                                       peerRevision = fedClient.getSolutionRevision(peerSolution.getSolutionId(), peerRevision.getRevisionId())
-                                                                                                                                               .getContent();
+                                       peerRevisionResponse = thePeerClient.getSolutionRevision(peerSolution.getSolutionId(), peerRevision.getRevisionId());
                                }
-                               catch (Exception x) {
-                                       log.error(EELFLoggerDelegate.errorLogger, "Failed to retrieve peer acumos artifact details", x);
-                                       isComplete = false; //try procecessing the next revision but mark the processing as incomplete
+                               catch (FederationException fx) {
+                                       isComplete = false; //try the next revision but mark the overall processing as incomplete
+                                       continue;
+                               }
+
+                               peerRevision = peerRevisionResponse.getContent();
+                               if (peerRevision == null) {
+                                       isComplete = false; //try the next revision but mark the overall processing as incomplete
                                        continue;
                                }
 
@@ -420,13 +496,15 @@ public class PeerGateway {
                                        Artifact peerArtifact = artifactEntry.getKey(),
                                                                         localArtifact = artifactEntry.getValue();
                                        boolean doCatalog = false;
+                                       
+                                       log.info(EELFLoggerDelegate.debugLogger, "Processing peer artifact {} against local artifact {}", peerArtifact, localArtifact);
 
                                        if (localArtifact == null) {
                                                localArtifact = copyArtifact(peerArtifact);
                                                doCatalog = true;
                                        }
                                        else {
-                                               if (!peerArtifact.getVersion().equals(localArtifact.getVersion())) {
+                                               if (hasChanged(peerArtifact, localArtifact)) {
                                                        // update local artifact
                                                        localArtifact = copyArtifact(peerArtifact, localArtifact);
                                                        doCatalog = true;
@@ -443,11 +521,11 @@ public class PeerGateway {
                                                // data is the right approach (as it does not rely on the E5 definition).
                                                Resource artifactContent = null;
                                                try {
-                                                       artifactContent = fedClient.getArtifactContent(
+                                                       artifactContent = thePeerClient.getArtifactContent(
                                                                peerSolution.getSolutionId(), peerRevision.getRevisionId(), peerArtifact.getArtifactId());
                                                        log.info(EELFLoggerDelegate.debugLogger, "Received {} bytes of artifact content", artifactContent.contentLength()); 
                                                }
-                                               catch (Exception x) {
+                                               catch (FederationException x) {
                                                        log.error(EELFLoggerDelegate.errorLogger, "Failed to retrieve acumos artifact content", x);
                                                        doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
                                                        isComplete = false;
@@ -498,14 +576,14 @@ public class PeerGateway {
                                                                         localDocument = documentEntry.getValue();
                                        boolean doCatalog = false;
 
+                                       log.info(EELFLoggerDelegate.debugLogger, "Processing peer document {} against local version {}", peerDocument, localDocument);
                                        if (localDocument == null) {
                                                localDocument = copyDocument(peerDocument);
                                                doCatalog = true;
                                        }
                                        else {
                                                //version strings are not standard so comparing them is not necessarly safe
-                                               if (peerDocument.getVersion() != null && localDocument.getVersion() != null &&
-                                                               !peerDocument.getVersion().equals(localDocument.getVersion())) {
+                                               if (hasChanged(peerDocument, localDocument)) {
                                                        // update local doc
                                                        localDocument = copyDocument(peerDocument, localDocument);
                                                        doCatalog = true;
@@ -522,11 +600,11 @@ public class PeerGateway {
                                                // data is a more flexible approach.
                                                Resource documentContent = null;
                                                try {
-                                                       documentContent = fedClient.getDocumentContent(
+                                                       documentContent = thePeerClient.getDocumentContent(
                                                                peerSolution.getSolutionId(), localRevision.getRevisionId(), peerDocument.getDocumentId());
                                                        log.info(EELFLoggerDelegate.debugLogger, "Received {} bytes of document content", documentContent.contentLength()); 
                                                }
-                                               catch (Exception x) {
+                                               catch (FederationException x) {
                                                        log.error(EELFLoggerDelegate.errorLogger, "Failed to retrieve acumos document content", x);
                                                        doCatalog = this.sub.getSubscriptionOptions().alwaysUpdateCatalog();
                                                        isComplete = false;
index 07acf3d..4fb51f9 100644 (file)
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
 import java.net.URI;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -37,6 +38,7 @@ import javax.annotation.PreDestroy;
 import org.acumos.cds.ArtifactTypeCode;
 import org.acumos.cds.domain.MLPArtifact;
 import org.acumos.cds.domain.MLPPeer;
+import org.acumos.cds.domain.MLPPeerSubscription;
 import org.acumos.cds.domain.MLPSolution;
 import org.acumos.cds.domain.MLPSolutionRevision;
 import org.acumos.federation.gateway.adapter.onap.sdc.ASDC;
@@ -45,6 +47,7 @@ import org.acumos.federation.gateway.adapter.onap.sdc.ASDC.ArtifactType;
 import org.acumos.federation.gateway.adapter.onap.sdc.ASDC.AssetType;
 import org.acumos.federation.gateway.adapter.onap.sdc.ASDC.LifecycleState;
 import org.acumos.federation.gateway.adapter.onap.sdc.ASDCException;
+import org.acumos.federation.gateway.util.Utils;
 import org.acumos.federation.gateway.common.Clients;
 import org.acumos.federation.gateway.common.FederationClient;
 import org.acumos.federation.gateway.config.EELFLoggerDelegate;
@@ -123,17 +126,17 @@ public class ONAP {
        @EventListener
        public void handlePeerSubscriptionUpdate(PeerSubscriptionEvent theEvent) {
                log.info(EELFLoggerDelegate.debugLogger, "received peer subscription update event " + theEvent);
-               taskExecutor.execute(new ONAPPushTask(theEvent.getPeer(), theEvent.getSolutions()));
+               taskExecutor.execute(new ONAPPushTask(theEvent.getPeer(), theEvent.getSubscription()));
        }
 
        public class ONAPPushTask implements Runnable {
 
                private MLPPeer peer;
-               private List<MLPSolution> solutions;
+               private MLPPeerSubscription sub;
 
-               public ONAPPushTask(MLPPeer thePeer, List<MLPSolution> theSolutions) {
+               public ONAPPushTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
                        this.peer = thePeer;
-                       this.solutions = theSolutions;
+                       this.sub = theSub;
                }
 
                public void run() {
@@ -141,7 +144,36 @@ public class ONAP {
                        // list with category and subcategory currently used for onap
                        // more dynamic mapping to come: based on solution information it will provide
                        // sdc assettype, category and subcategory
-                       log.info(EELFLoggerDelegate.debugLogger, "Processing {} Acumos solutions received from {}", solutions.size(), peer);
+
+                       Map selector = null;
+                       try {
+                               selector = Utils.jsonStringToMap(this.sub.getSelector());
+                       }
+                       catch(Exception x) {
+                               log.error(EELFLoggerDelegate.errorLogger, "Failed to parse selector for subscription {}", this.sub);
+                               return;
+                       }
+                       Date lastProcessed = this.sub.getProcessed();
+                       if (lastProcessed != null) {
+                               selector.put("modified", lastProcessed);
+                       }
+                       lastProcessed = new Date();
+                       
+                       FederationClient acumosClient = clients.getFederationClient(this.peer.getApiUrl());
+                       if (acumosClient == null) {
+                               log.error(EELFLoggerDelegate.errorLogger, "Failed to get client for peer {}", this.peer);
+                               return;
+                       }
+
+                       List<MLPSolution> acumosSolutions = null;
+                       try {
+                               acumosSolutions = (List)acumosClient.getSolutions(selector).getContent();
+                       }
+                       catch(Exception x) {
+                               log.error(EELFLoggerDelegate.errorLogger, "Processing peer " + this.peer + " subscription " + this.sub.getSubId() + ": getSolutions failed.", x);
+                               return;
+                       }
+                       log.info(EELFLoggerDelegate.debugLogger, "Processing peer {} subscription {}, {} yielded solutions {}", this.peer, this.sub.getSubId(), selector, acumosSolutions);
 
                        JSONArray sdcAssets = null;
                        try {
@@ -156,20 +188,18 @@ public class ONAP {
                                        return;
                        }
                        log.info(EELFLoggerDelegate.debugLogger, "Mapping received Acumos solutions \n{}\n to retrieved ONAP SDC assets \n{}",
-                       this.solutions, sdcAssets);
-
-                       for (MLPSolution acumosSolution : this.solutions) {
+                       acumosSolutions, sdcAssets);
 
-                               FederationClient fedClient = clients.getFederationClient(this.peer.getApiUrl());
+                       for (MLPSolution acumosSolution : acumosSolutions) {
 
                                List<MLPSolutionRevision> acumosRevisions = null;
                                try {
-                                       acumosRevisions = (List<MLPSolutionRevision>) fedClient
+                                       acumosRevisions = (List<MLPSolutionRevision>) acumosClient
                                                        .getSolutionRevisions(acumosSolution.getSolutionId()).getContent();
                                }
                                catch (Exception x) {
                                        log.error(EELFLoggerDelegate.errorLogger, "Failed to retrieve acumos revisions", x);
-                                       throw x;
+                                       return;
                                }
                                sortAcumosSolutionRevisions(acumosRevisions);
 
index 434b464..f0b7cf1 100644 (file)
@@ -96,5 +96,41 @@ public class PeerSubscription extends MLPPeerSubscription {
                always
        }
 
+       /**
+        * Detect changes in a peer subscription. The 'modified' timestamp is not a reliable test as we
+        * we modify it outselves, so instead we look at the content.
+        * TODO: selector and options are json string, we should compare teh actual json structure.
+        */
+       public static boolean isModified(MLPPeerSubscription theCurrentSub, MLPPeerSubscription theNewSub) {
+               boolean res = true;
+
+               String taskSelector = theCurrentSub.getSelector(),
+                                        peerSelector = theNewSub.getSelector();
+               res &= ((taskSelector != null && peerSelector == null) ||
+                                               (taskSelector == null && peerSelector != null) ||
+                                         (taskSelector != null && peerSelector != null && !taskSelector.equals(peerSelector)));
+
+               String taskOptions = theCurrentSub.getOptions(),
+                                        peerOptions = theNewSub.getOptions();
+               res &= ((taskOptions != null && peerOptions == null) ||
+                                               (taskOptions == null && peerOptions != null) ||
+                                         (taskOptions != null && peerOptions != null && !taskOptions.equals(peerOptions)));
+
+               Long taskRefresh = theCurrentSub.getRefreshInterval(),
+                                peerRefresh = theNewSub.getRefreshInterval();
+               res &= ((taskRefresh != null && peerRefresh == null) ||
+                                               (taskRefresh == null && peerRefresh != null) ||
+                                         (taskRefresh != null && peerRefresh != null && !taskRefresh.equals(peerRefresh)));
+
+               //cannot be null
+               res &= !theCurrentSub.getScopeType().equals(theNewSub.getScopeType());
+               res &= !theCurrentSub.getUserId().equals(theNewSub.getUserId());
+               res &= !theCurrentSub.getPeerId().equals(theNewSub.getPeerId());
+               res &= !theCurrentSub.getAccessType().equals(theNewSub.getAccessType());
+
+               return res;
+       }
+
+
 }
 
index b274229..bfc42a9 100644 (file)
@@ -124,7 +124,9 @@ public enum API {
         * the params include both path and query params.
         */
        public URI buildUri(String theHttpUrl, Map<String, ?> theParams) {
-               return uriBuilder(theHttpUrl, theParams.keySet()).buildAndExpand(theParams).encode().toUri();
+               /* While encoding seems like a good/safe idea no API URI actually requires encoding and this causes
+               problems when encoding base64 encoded selectors */
+               return uriBuilder(theHttpUrl, theParams.keySet()).buildAndExpand(theParams)/*.encode()*/.toUri();
        }
 
        /**
index 530b130..f3f6fd5 100644 (file)
@@ -82,11 +82,11 @@ public class FederationClient extends AbstractClient {
 
        /**
         * @return Ping information from/for Remote Acumos
-        * @throws HttpStatusCodeException
-        *             Throws HttpStatusCodeException if remote acumos interaction has failed.
+        * @throws FederationException
+        *             Throws FederationException if remote acumos interaction has failed.
         */
        public JsonResponse<MLPPeer> ping()
-                       throws HttpStatusCodeException {
+                       throws FederationException {
                URI uri = API.PING.buildUri(this.baseUrl);
                log.info(EELFLoggerDelegate.debugLogger, "Query for " + uri);
                ResponseEntity<JsonResponse<MLPPeer>> response = null;
@@ -95,23 +95,24 @@ public class FederationClient extends AbstractClient {
                                        new ParameterizedTypeReference<JsonResponse<MLPPeer>>() {
                                        });
                }
-               catch (HttpStatusCodeException x) {
-                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", x);
-                       throw x;
+               catch (HttpStatusCodeException scx) {
+                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", scx);
+                       throw new PeerException(uri, scx);
                }
                catch (Throwable t) {
                        log.error(EELFLoggerDelegate.errorLogger, uri + " unexpected failure.", t);
+                       throw new FederationException(uri, t);
                }
                finally {
                        log.info(EELFLoggerDelegate.debugLogger, uri + " response " + response);
                }
-               return response == null ? null : response.getBody();
+               return response.getBody();
        }
 
        /**
         */
        public JsonResponse<List<MLPPeer>> getPeers()
-                       throws HttpStatusCodeException {
+                       throws FederationException {
                URI uri = API.PEERS.buildUri(this.baseUrl);
                log.info(EELFLoggerDelegate.debugLogger, "Query for " + uri);
                ResponseEntity<JsonResponse<List<MLPPeer>>> response = null;
@@ -120,17 +121,18 @@ public class FederationClient extends AbstractClient {
                                        new ParameterizedTypeReference<JsonResponse<List<MLPPeer>>>() {
                                        });
                }
-               catch (HttpStatusCodeException x) {
-                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", x);
-                       throw x;
+               catch (HttpStatusCodeException scx) {
+                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", scx);
+                       throw new PeerException(uri, scx);
                }
                catch (Throwable t) {
                        log.error(EELFLoggerDelegate.errorLogger, uri + " unexpected failure.", t);
+                       throw new FederationException(uri, t);
                }
                finally {
                        log.info(EELFLoggerDelegate.debugLogger, uri + " response " + response);
                }
-               return response == null ? null : response.getBody();
+               return response.getBody();
        }       
 
 
@@ -144,13 +146,15 @@ public class FederationClient extends AbstractClient {
         *             Throws HttpStatusCodeException is remote acumos is not available
         */
        public JsonResponse<List<MLPSolution>> getSolutions(Map<String, Object> theSelection)
-                       throws HttpStatusCodeException {
+                       throws FederationException {
 
                String selectorParam = null;
                try {
+                       log.info(EELFLoggerDelegate.debugLogger, "getSolutions selector {}", Utils.mapToJsonString(theSelection));
                        selectorParam = theSelection == null ? null
                                        // : UriUtils.encodeQueryParam(Utils.mapToJsonString(theSelection),"UTF-8");
                                        : Base64Utils.encodeToString(Utils.mapToJsonString(theSelection).getBytes("UTF-8"));
+                       log.info(EELFLoggerDelegate.debugLogger, "getSolutions encoded selector {}", selectorParam);
                }
                catch (Exception x) {
                        throw new IllegalArgumentException("Cannot process the selection argument", x);
@@ -165,17 +169,18 @@ public class FederationClient extends AbstractClient {
                                        new ParameterizedTypeReference<JsonResponse<List<MLPSolution>>>() {
                                        });
                }
-               catch (HttpStatusCodeException x) {
-                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", x);
-                       throw x;
+               catch (HttpStatusCodeException scx) {
+                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", scx);
+                       throw new PeerException(uri, scx);
                }
                catch (Throwable t) {
                        log.error(EELFLoggerDelegate.errorLogger, uri + " unexpected failure.", t);
+                       throw new FederationException(uri, t);
                }
                finally {
                        log.info(EELFLoggerDelegate.debugLogger, uri + " response " + response);
                }
-               return response == null ? null : response.getBody();
+               return response.getBody();
        }
 
        /**
@@ -185,7 +190,7 @@ public class FederationClient extends AbstractClient {
         *             Throws HttpStatusCodeException if remote acumos interaction has failed.
         */
        public JsonResponse<MLPSolution> getSolution(String theSolutionId)
-                       throws HttpStatusCodeException {
+                       throws FederationException {
 
                URI uri = API.SOLUTION_DETAIL.buildUri(this.baseUrl, theSolutionId);
                log.info(EELFLoggerDelegate.debugLogger, "Query for " + uri);
@@ -195,17 +200,18 @@ public class FederationClient extends AbstractClient {
                                        new ParameterizedTypeReference<JsonResponse<MLPSolution>>() {
                                        });
                }
-               catch (HttpStatusCodeException x) {
-                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", x);
-                       throw x;
+               catch (HttpStatusCodeException scx) {
+                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", scx);
+                       throw new PeerException(uri, scx);
                }
                catch (Throwable t) {
                        log.error(EELFLoggerDelegate.errorLogger, uri + " unexpected failure.", t);
+                       throw new FederationException(uri, t);
                }
                finally {
                        log.info(EELFLoggerDelegate.debugLogger, uri + " response " + response);
                }
-               return response == null ? null : response.getBody();
+               return response.getBody();
        }
 
        /**
@@ -216,7 +222,7 @@ public class FederationClient extends AbstractClient {
         *             Throws HttpStatusCodeException is remote acumos is not available
         */
        public JsonResponse<List<MLPSolutionRevision>> getSolutionRevisions(String theSolutionId)
-                       throws HttpStatusCodeException {
+                       throws FederationException {
 
                URI uri = API.SOLUTION_REVISIONS.buildUri(this.baseUrl, theSolutionId);
                log.info(EELFLoggerDelegate.debugLogger, "Query for " + uri);
@@ -226,17 +232,18 @@ public class FederationClient extends AbstractClient {
                                        new ParameterizedTypeReference<JsonResponse<List<MLPSolutionRevision>>>() {
                                        });
                }
-               catch (HttpStatusCodeException x) {
-                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", x);
-                       throw x;
+               catch (HttpStatusCodeException scx) {
+                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", scx);
+                       throw new PeerException(uri, scx);
                }
                catch (Throwable t) {
                        log.info(EELFLoggerDelegate.errorLogger, uri + " unexpected failure.", t);
+                       throw new FederationException(uri, t);
                }
                finally {
                        log.info(EELFLoggerDelegate.debugLogger, uri + " response " + response);
                }
-               return response == null ? null : response.getBody();
+               return response.getBody();
        }
 
        /**
@@ -249,7 +256,7 @@ public class FederationClient extends AbstractClient {
         *             Throws HttpStatusCodeException is remote acumos is not available
         */
        public JsonResponse<MLPSolutionRevision> getSolutionRevision(String theSolutionId, String theRevisionId)
-                       throws HttpStatusCodeException {
+                       throws FederationException {
 
                URI uri = API.SOLUTION_REVISION_DETAILS.buildUri(this.baseUrl, theSolutionId, theRevisionId);
                log.info(EELFLoggerDelegate.debugLogger, "Query for " + uri);
@@ -259,17 +266,18 @@ public class FederationClient extends AbstractClient {
                                        new ParameterizedTypeReference<JsonResponse<MLPSolutionRevision>>() {
                                        });
                }
-               catch (HttpStatusCodeException x) {
-                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", x);
-                       throw x;
+               catch (HttpStatusCodeException scx) {
+                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", scx);
+                       throw new PeerException(uri, scx);
                }
                catch (Throwable t) {
                        log.info(EELFLoggerDelegate.errorLogger, uri + " unexpected failure.", t);
+                       throw new FederationException(uri, t);
                }
                finally {
                        log.info(EELFLoggerDelegate.debugLogger, uri + " response " + response);
                }
-               return response == null ? null : response.getBody();
+               return response.getBody();
        }
 
        /**
@@ -283,7 +291,7 @@ public class FederationClient extends AbstractClient {
         *             Throws HttpStatusCodeException is remote acumos is not available
         */
        public JsonResponse<List<MLPArtifact>> getArtifacts(String theSolutionId, String theRevisionId)
-                       throws HttpStatusCodeException {
+                       throws FederationException {
                URI uri = API.SOLUTION_REVISION_ARTIFACTS.buildUri(this.baseUrl, theSolutionId, theRevisionId);
                log.info(EELFLoggerDelegate.debugLogger, "Query for " + uri);
                ResponseEntity<JsonResponse<List<MLPArtifact>>> response = null;
@@ -292,17 +300,18 @@ public class FederationClient extends AbstractClient {
                                        new ParameterizedTypeReference<JsonResponse<List<MLPArtifact>>>() {
                                        });
                }
-               catch (HttpStatusCodeException x) {
-                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", x);
-                       throw x;
+               catch (HttpStatusCodeException scx) {
+                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", scx);
+                       throw new PeerException(uri, scx);
                }
                catch (Throwable t) {
                        log.error(EELFLoggerDelegate.errorLogger, uri + " unexpected failure.", t);
+                       throw new FederationException(uri, t);
                }
                finally {
                        log.info(EELFLoggerDelegate.debugLogger, uri + " response " + response);
                }
-               return response == null ? null : response.getBody();
+               return response.getBody();
        }
 
        /**
@@ -317,7 +326,7 @@ public class FederationClient extends AbstractClient {
         *             On failure
         */
        public Resource getArtifactContent(String theSolutionId, String theRevisionId, String theArtifactId)
-                                                                                                                                                                                                                                                                                                                                                       throws HttpStatusCodeException {
+                                                                                                                                                                                                                                                                                                                                                       throws FederationException {
                return download2(API.ARTIFACT_CONTENT.buildUri(this.baseUrl, theSolutionId, theRevisionId, theArtifactId));
        }
 
@@ -332,7 +341,7 @@ public class FederationClient extends AbstractClient {
         *             Throws HttpStatusCodeException is remote acumos is not available
         */
        public JsonResponse<List<MLPDocument>> getDocuments(String theSolutionId, String theRevisionId)
-                       throws HttpStatusCodeException {
+                       throws FederationException {
                URI uri = API.SOLUTION_REVISION_DOCUMENTS.buildUri(this.baseUrl, theSolutionId, theRevisionId);
                log.info(EELFLoggerDelegate.debugLogger, "Query for " + uri);
                ResponseEntity<JsonResponse<List<MLPDocument>>> response = null;
@@ -341,12 +350,13 @@ public class FederationClient extends AbstractClient {
                                        new ParameterizedTypeReference<JsonResponse<List<MLPDocument>>>() {
                                        });
                }
-               catch (HttpStatusCodeException x) {
-                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", x);
-                       throw x;
+               catch (HttpStatusCodeException scx) {
+                       log.error(EELFLoggerDelegate.errorLogger, uri + " failed", scx);
+                       throw new PeerException(uri, scx);
                }
                catch (Throwable t) {
                        log.error(EELFLoggerDelegate.errorLogger, uri + " unexpected failure.", t);
+                       throw new FederationException(uri, t);
                }
                finally {
                        log.info(EELFLoggerDelegate.debugLogger, uri + " response " + response);
@@ -366,11 +376,11 @@ public class FederationClient extends AbstractClient {
         *             On failure
         */
        public Resource getDocumentContent(String theSolutionId, String theRevisionId, String theDocumentId)
-                                                                                                                                                                                                                                                                                                                                               throws HttpStatusCodeException {
+                                                                                                                                                                                                                                                                                                                                               throws FederationException {
                return download2(API.DOCUMENT_CONTENT.buildUri(this.baseUrl, theSolutionId, theRevisionId, theDocumentId));
        }
 
-       protected Resource download(URI theUri) throws HttpStatusCodeException {
+       protected Resource download(URI theUri) throws FederationException {
                log.info(EELFLoggerDelegate.debugLogger, "Query for download {}", theUri);
                ResponseEntity<Resource> response = null;
                RequestEntity<Void> request = RequestEntity
@@ -380,32 +390,25 @@ public class FederationClient extends AbstractClient {
                try {
                        response = restTemplate.exchange(request, Resource.class);
                }
-               catch (HttpStatusCodeException x) {
-                       log.error(EELFLoggerDelegate.errorLogger, theUri + " failed", x);
-                       throw x;
+               catch (HttpStatusCodeException scx) {
+                       log.error(EELFLoggerDelegate.errorLogger, theUri + " failed", scx);
+                       throw new PeerException(theUri, scx);
                }
                catch (Throwable t) {
                        log.error(EELFLoggerDelegate.errorLogger, theUri + " unexpected failure.", t);
-                       //not very clean
-                       throw new HttpClientErrorException(HttpStatus.BAD_REQUEST, theUri + " unexpected failure: " + t);
+                       throw new FederationException(theUri, t);
                }
                finally {
                        log.info(EELFLoggerDelegate.debugLogger, theUri + " response " + response);
                }
 
-               if (response == null) {
-                       //should never get here         
-                       return null;
-               }
-               else {
-                       return response.getBody();
-               }
+               return response.getBody();
        }
 
        /**
         * Important: the Resource returned by this method MUST BE CLOSED by whoever uses it.
         */
-       protected StreamingResource download2(URI theUri) throws HttpStatusCodeException {
+       protected StreamingResource download2(URI theUri) throws FederationException {
                log.info(EELFLoggerDelegate.debugLogger, "Query for download {}", theUri);
                ClientHttpResponse response = null;
                try {
@@ -421,8 +424,8 @@ public class FederationClient extends AbstractClient {
        
                        return new StreamingResource(response);
                }
-               catch (IOException ex) {
-                       throw new ResourceAccessException("I/O error for " + theUri + ": " + ex.getMessage(), ex);
+               catch (IOException iox) {
+                       throw new FederationException(theUri, iox);
                }
        }
 
diff --git a/gateway/src/main/java/org/acumos/federation/gateway/common/FederationException.java b/gateway/src/main/java/org/acumos/federation/gateway/common/FederationException.java
new file mode 100644 (file)
index 0000000..339e372
--- /dev/null
@@ -0,0 +1,45 @@
+/*-
+ * ===============LICENSE_START=======================================================
+ * Acumos
+ * ===================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
+ * ===================================================================================
+ * This Acumos software file is distributed by AT&T and Tech Mahindra
+ * under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *  
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * This file is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ===============LICENSE_END=========================================================
+ */
+
+package org.acumos.federation.gateway.common;
+
+import java.net.URI;
+
+/**
+ * Common root for errors occuring during federation interactions (with the a peer)
+ */
+public class FederationException extends Exception {
+
+       public FederationException(URI theUri, Throwable theCause) {
+               this(theUri.toString(), theCause);
+       }
+
+       public FederationException(String theUri, Throwable theCause) {
+               super(theUri + " failed", theCause);
+       }
+
+       public FederationException(URI theUri) {
+               this(theUri.toString());
+       }
+
+       public FederationException(String theUri) {
+               super(theUri + " failed");
+       }
+}
diff --git a/gateway/src/main/java/org/acumos/federation/gateway/common/PeerException.java b/gateway/src/main/java/org/acumos/federation/gateway/common/PeerException.java
new file mode 100644 (file)
index 0000000..5ea0ed1
--- /dev/null
@@ -0,0 +1,40 @@
+/*-
+ * ===============LICENSE_START=======================================================
+ * Acumos
+ * ===================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
+ * ===================================================================================
+ * This Acumos software file is distributed by AT&T and Tech Mahindra
+ * under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *  
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * This file is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ===============LICENSE_END=========================================================
+ */
+
+package org.acumos.federation.gateway.common;
+
+import java.net.URI;
+
+import org.springframework.web.client.HttpStatusCodeException;
+
+/**
+ * Federation error as explcitly reported by a peer
+ */
+public class PeerException extends FederationException {
+
+       public PeerException(URI theUri, HttpStatusCodeException theCause) {
+               super(theUri, theCause);
+       }
+
+       public PeerException(String theUri, HttpStatusCodeException theCause) {
+               super(theUri, theCause);
+       }
+
+}
index f318a42..d40af6c 100644 (file)
 
 package org.acumos.federation.gateway.config;
 
+import java.net.URI;
+import java.net.URL;
+import java.net.MalformedURLException;
+
 import java.lang.invoke.MethodHandles;
 
+import org.apache.http.HttpHost;
+import org.apache.http.client.AuthCache;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.BasicAuthCache;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.BasicHttpContext;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.http.HttpMethod;
 import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.http.client.support.BasicAuthorizationInterceptor;
 import org.springframework.stereotype.Component;
 import org.springframework.web.client.RestTemplate;
 
+
 /**
  * 
  */
@@ -41,7 +61,7 @@ public class NexusConfiguration {
        private String          proxy;
        private String    groupId;
        private String    id;
-       private String          url;
+       private URL                             url;
        private String          username;
        private String          password;
        private String          nameSeparator;
@@ -63,12 +83,12 @@ public class NexusConfiguration {
                this.id = theId;
        }
 
-       public void setUrl(String theUrl) {
-               this.url = theUrl;
+       public void setUrl(String theSpec) throws MalformedURLException {
+               this.url = new URL(theSpec);
   }
        
        public String getUrl() {
-               return this.url;
+               return this.url.toString();
   }
 
        public void setUsername(String theUsername) {
@@ -100,28 +120,38 @@ public class NexusConfiguration {
                return this.nameSeparator;
        }
 
+       /**
+        * Prepare a RestTemplate fitted for Nexus interactions, in particular ready to perform preemptive basic authentication.
+        */
        public RestTemplate getNexusClient() {
 
-               //cannot use the localIfConfig straightup as it does not carry the Nexus specific client authentication info
-               //but this only need to be built once
-               InterfaceConfiguration nexusIfConfig = InterfaceConfigurationBuilder.buildFrom(this.localIfConfig)
-                                                                                                                                                                                       .withClient(new InterfaceConfiguration.Client(this.username, this.password))
-                                                                                                                                                                                       .buildConfig();
-
-               
-               log.info(EELFLoggerDelegate.debugLogger, "Nexus config: {}", nexusIfConfig);
-
                RestTemplateBuilder builder =
                        new RestTemplateBuilder()
-                               .requestFactory(new HttpComponentsClientHttpRequestFactory( 
-                                                                                                       nexusIfConfig.buildClient()));
-               if (this.url != null) {
-                       builder.rootUri(this.url);
-               }
-               if (this.username != null) {
-                       builder.basicAuthorization(this.username, this.password);
-               }
+                               .requestFactory(
+                                       new HttpComponentsClientHttpRequestFactory(this.localIfConfig.buildClient()) {
+
+                                               protected HttpContext createHttpContext(HttpMethod theMethod, URI theUri) {
+                                                       HttpHost nexusHost = new HttpHost(NexusConfiguration.this.url.getHost(), NexusConfiguration.this.url.getPort());
+
+                                                       CredentialsProvider nexusCreds = new BasicCredentialsProvider();
+                                                       nexusCreds.setCredentials(
+                                               new AuthScope(nexusHost.getHostName(), nexusHost.getPort()),
+                                       new UsernamePasswordCredentials(NexusConfiguration.this.username, NexusConfiguration.this.password));
+
+                                                       AuthCache authCache = new BasicAuthCache();
+                                                       BasicScheme basicAuth = new BasicScheme();
+                                                       authCache.put(nexusHost, basicAuth);
+                                                       HttpClientContext nexusContext = HttpClientContext.create();
+                                                       nexusContext.setAuthCache(authCache);
+                                                       nexusContext.setCredentialsProvider(nexusCreds);
+                                                       return nexusContext;
+                                               }
+                                       });
 
                return builder.build();
        }
+
+       
+
 }
index 97697c5..b2fbddf 100644 (file)
@@ -25,7 +25,6 @@ import java.util.List;
 
 import org.acumos.cds.domain.MLPPeer;
 import org.acumos.cds.domain.MLPPeerSubscription;
-import org.acumos.cds.domain.MLPSolution;
 
 /**
  * Carries event information related to a peer subscription check
@@ -34,14 +33,11 @@ public class PeerSubscriptionEvent extends EventObject {
 
        private MLPPeer peer;
        private MLPPeerSubscription subscription;
-       private List<MLPSolution> solutions;
 
-       public PeerSubscriptionEvent(Object theSource, MLPPeer thePeer, MLPPeerSubscription theSubscription,
-                       List<MLPSolution> theSolutions) {
+       public PeerSubscriptionEvent(Object theSource, MLPPeer thePeer, MLPPeerSubscription theSubscription) {
                super(theSource);
                this.peer = thePeer;
                this.subscription = theSubscription;
-               this.solutions = theSolutions;
        }
 
        public MLPPeer getPeer() {
@@ -52,8 +48,4 @@ public class PeerSubscriptionEvent extends EventObject {
                return this.subscription;
        }
 
-       public List<MLPSolution> getSolutions() {
-               return this.solutions;
-       }
-
 }
index 9d96ef9..ce954f2 100644 (file)
@@ -245,6 +245,7 @@ public class ContentServiceImpl extends AbstractServiceImpl
                try {
                        String path = nexusPath(theGroupId, theContentId, theVersion, thePackaging);
                        URI contentUri = new URI(this.nexusConfig.getUrl() + UriUtils.encodePath(path, "UTF-8"));
+                       //URI contentUri = new URI(this.nexusConfig.getUrl() + path);
                        log.info(EELFLoggerDelegate.debugLogger, "Writing artifact content to nexus at {}", path);
                        RequestEntity<Resource> request = RequestEntity
                                                                                                                                                                        .put(contentUri)
@@ -298,6 +299,9 @@ public class ContentServiceImpl extends AbstractServiceImpl
         * @return a string array containing the 2 part or null if the part was missing
         */
        private String[] splitName(String theName) {
+               log.info(EELFLoggerDelegate.debugLogger,        "Splitting name {}", theName);
+               if (null == theName)
+                       return new String[] {"", ""};
                int pos = theName.lastIndexOf('.');
                return (pos < 0) ?
                        new String[] {theName, "" /*null: better coding but does not facilitate callers*/} :
index 702c93a..ff2f554 100644 (file)
@@ -46,11 +46,13 @@ public abstract class ServiceImpl {
        public static boolean isSelectable(MLPSolution theSolution, Map<String, ?> theSelector) /*throws ServiceException*/ {
                boolean res = true;
 
+               log.trace(EELFLoggerDelegate.debugLogger, "isSelectable {}", theSolution);
                if (theSelector == null || theSelector.isEmpty())
                        return true;
 
                Object solutionId = theSelector.get("solutionId");
                if (solutionId != null) {
+                       log.trace(EELFLoggerDelegate.debugLogger, "using solutionId based selection {}", solutionId);
                        if (solutionId instanceof String) {
                                res &= theSolution.getSolutionId().equals(solutionId);
                        }
@@ -62,6 +64,7 @@ public abstract class ServiceImpl {
 
                Object modelTypeCode = theSelector.get("modelTypeCode");
                if (modelTypeCode != null) {
+                       log.trace(EELFLoggerDelegate.debugLogger, "using modelTypeCode based selection {}", modelTypeCode);
                        String solutionModelTypeCode = theSolution.getModelTypeCode();
                        if (solutionModelTypeCode == null) {
                                return false;
@@ -82,6 +85,7 @@ public abstract class ServiceImpl {
 
                Object toolkitTypeCode = theSelector.get("toolkitTypeCode");
                if (toolkitTypeCode != null) {
+                       log.trace(EELFLoggerDelegate.debugLogger, "using toolkitTypeCode based selection {}", toolkitTypeCode);
                        String solutionToolkitTypeCode = theSolution.getToolkitTypeCode();
                        if (solutionToolkitTypeCode == null) {
                                return false;
@@ -102,6 +106,7 @@ public abstract class ServiceImpl {
 
                Object tags = theSelector.get("tags");
                if (tags != null) {
+                       log.trace(EELFLoggerDelegate.debugLogger, "using tags based selection {}", tags);
                        Set<MLPTag> solutionTags = theSolution.getTags();
                        if (solutionTags == null) {
                                return false;
@@ -122,6 +127,7 @@ public abstract class ServiceImpl {
 
                Object name = theSelector.get("name");
                if (name != null) {
+                       log.debug(EELFLoggerDelegate.debugLogger, "using name based selection {}", name);
                        String solutionName = theSolution.getName();
                        if (solutionName == null) {
                                return false;
@@ -133,6 +139,7 @@ public abstract class ServiceImpl {
 
                Object desc = theSelector.get("description");
                if (desc != null) {
+                       log.debug(EELFLoggerDelegate.debugLogger, "using description based selection {}", desc);
                        String solutionDesc = theSolution.getDescription();
                        if (solutionDesc == null) {
                                return false;
diff --git a/gateway/src/main/java/org/acumos/federation/gateway/task/PeerSubscriptionTask.java b/gateway/src/main/java/org/acumos/federation/gateway/task/PeerSubscriptionTask.java
deleted file mode 100644 (file)
index 543131c..0000000
+++ /dev/null
@@ -1,125 +0,0 @@
-/*-
- * ===============LICENSE_START=======================================================
- * Acumos
- * ===================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
- * ===================================================================================
- * This Acumos software file is distributed by AT&T and Tech Mahindra
- * under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *  
- *      http://www.apache.org/licenses/LICENSE-2.0
- *  
- * This file is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ===============LICENSE_END=========================================================
- */
-
-package org.acumos.federation.gateway.task;
-
-import java.lang.invoke.MethodHandles;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.acumos.cds.domain.MLPPeer;
-import org.acumos.cds.domain.MLPPeerSubscription;
-import org.acumos.cds.domain.MLPSolution;
-import org.acumos.federation.gateway.common.Clients;
-import org.acumos.federation.gateway.common.FederationClient;
-import org.acumos.federation.gateway.common.JsonResponse;
-import org.acumos.federation.gateway.config.EELFLoggerDelegate;
-import org.acumos.federation.gateway.event.PeerSubscriptionEvent;
-import org.acumos.federation.gateway.service.PeerSubscriptionService;
-import org.acumos.federation.gateway.util.Utils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationEventPublisher;
-import org.springframework.context.annotation.Scope;
-import org.springframework.stereotype.Component;
-
-/**
- * 
- * Peer Acumos Task to Communicate to Remote Acumos and fetch Solutions and
- * Catalogs.
- * This is a Component/Bean so that it can be autowired.
- */
-@Component
-@Scope("prototype")
-public class PeerSubscriptionTask implements Runnable {
-
-       private static final EELFLoggerDelegate log = EELFLoggerDelegate.getLogger(MethodHandles.lookup().lookupClass());
-
-       private MLPPeer peer;
-       private MLPPeerSubscription subscription;
-
-       @Autowired
-       private ApplicationEventPublisher eventPublisher;
-       @Autowired
-       private Clients clients;
-       @Autowired
-       private PeerSubscriptionService peerSubscriptionService;
-
-       public PeerSubscriptionTask() {
-       }
-
-       public PeerSubscriptionTask handle(MLPPeer peer, MLPPeerSubscription subscription) {
-               this.peer = peer;
-               this.subscription = subscription;
-               return this;
-       }
-
-       public MLPPeer getPeer() {
-               return this.peer;
-       }
-
-       public MLPPeerSubscription getSubscription() {
-               return this.subscription;
-       }
-
-       @Override
-       public void run() {
-
-               if (this.peer == null || this.subscription == null) {
-                       log.info(EELFLoggerDelegate.debugLogger, "Peer task has no peer subscription info");
-                       return;
-               }
-
-               Map selector = Utils.jsonStringToMap(this.subscription.getSelector());
-               Date lastProcessed = this.subscription.getProcessed();
-               if (lastProcessed != null) {
-                       selector.put("modified", lastProcessed);
-               }
-
-               try {
-                       log.info(EELFLoggerDelegate.debugLogger, "Peer task for peer {}, subscription {}", this.peer.getName(), this.subscription.getSubId());
-                       FederationClient fedClient = clients.getFederationClient(this.peer.getApiUrl());
-                       if (fedClient == null)
-                               throw new IllegalArgumentException("run: failed to get client for peer URL " + this.peer.getApiUrl());
-                       JsonResponse<List<MLPSolution>> response = fedClient.getSolutions(selector);
-                       log.info(EELFLoggerDelegate.debugLogger, "Peer task for peer {}, subscription {} got response {}", this.peer.getName(), this.subscription.getSubId(), response);
-                       if (response != null) {
-                               List<MLPSolution> solutions = response.getContent();
-                               if (solutions != null) {
-                                       //keep only those updated since last processed
-                                       //this assumes we can trust the last processed to be maintained correctly by the peer .. unreliable
-                                       //solutions = solutions.stream()
-                                       //                                                      .filter(solution -> solution.getLastUpdate().after(this.subscription.getProcessed()))
-                                       //                                                      .collect(Collectors.toList());
-                                       if (solutions.size() > 0) {
-                                               this.eventPublisher.publishEvent(
-                                                       new PeerSubscriptionEvent(this, this.peer, this.subscription, solutions));
-                                       }
-                               }
-                       }
-
-                       //moved this to PeerGateway, not good design but practical ..
-                       //this.peerSubscriptionService.updatePeerSubscription(this.subscription);
-               }
-               catch (Exception x) {
-                       log.error(EELFLoggerDelegate.errorLogger, "Peer task failed for " + peer.getName() + ", " + peer.getApiUrl() + ", " + subscription, x);
-               }
-       }
-}
index ce26dd7..a085611 100644 (file)
@@ -33,12 +33,15 @@ import javax.annotation.PreDestroy;
 import org.acumos.cds.domain.MLPPeer;
 import org.acumos.cds.domain.MLPPeerSubscription;
 import org.acumos.federation.gateway.cds.PeerStatus;
+import org.acumos.federation.gateway.cds.PeerSubscription;
 import org.acumos.federation.gateway.config.EELFLoggerDelegate;
 import org.acumos.federation.gateway.service.PeerService;
 import org.acumos.federation.gateway.service.PeerSubscriptionService;
 import org.acumos.federation.gateway.util.Utils;
+import org.acumos.federation.gateway.event.PeerSubscriptionEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.core.env.Environment;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
@@ -68,6 +71,8 @@ public class PeerSubscriptionTaskScheduler {
        private ApplicationContext appCtx;
        @Autowired
        private TaskScheduler taskScheduler = null;
+       @Autowired
+       private ApplicationEventPublisher eventPublisher;
 
        private Table<String, Long, PeerTaskHandler> peersSubsTask = HashBasedTable.create();
 
@@ -142,7 +147,6 @@ public class PeerSubscriptionTaskScheduler {
 
        @Scheduled(initialDelay = 5000, fixedRateString = "${peer.jobchecker.interval:400}000")
        public void checkPeerJobs() {
-
                log.debug(EELFLoggerDelegate.debugLogger, "checkPeerSubscriptionJobs");
                // Get the List of MLP Peers
                List<MLPPeer> peers = peerService.getPeers();
@@ -213,10 +217,7 @@ public class PeerSubscriptionTaskScheduler {
                                if (peerSubTask != null) {
                                        MLPPeerSubscription taskSub = peerSubTask.getSubscription();
                                        // was the subscription updated? if yes, cancel current task.
-                                       //TODO: this does not correctly identify one time executions that were completed
-                                       if (!((peerSub.getModified() == null && taskSub.getModified() == null) ||
-                                                         (peerSub.getModified() != null && taskSub.getModified() != null &&
-                                                                peerSub.getModified().equals(taskSub.getModified())))) {
+                                       if (PeerSubscription.isModified(peerSub, taskSub)) {
                                                log.debug(EELFLoggerDelegate.debugLogger,
                                                                "peer {} subscription {} was updated, terminating current task", peer.getPeerId(), peerSub.getSubId());
                                                peerSubTask.stopTask();
@@ -243,28 +244,35 @@ public class PeerSubscriptionTaskScheduler {
         */
        private class PeerTaskHandler {
 
-               private ScheduledFuture future;
-               private PeerSubscriptionTask task;
+               private ScheduledFuture                         future;
+               private PeerSubscriptionTask    task;
 
                public synchronized PeerTaskHandler startTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
+                       if (this.task != null)
+                               throw new IllegalStateException("Already scheduled");
+                       else
+                               this.task = new PeerSubscriptionTask(thePeer, theSub);
+
                        Long refreshInterval = theSub.getRefreshInterval();
-       
-                       this.task = (PeerSubscriptionTask) PeerSubscriptionTaskScheduler.this.appCtx.getBean("peerSubscriptionTask");
                        if (refreshInterval.longValue() == 0) {
                                this.future = PeerSubscriptionTaskScheduler.this.taskScheduler
-                                                                                               .schedule(this.task.handle(thePeer, theSub), new Date(System.currentTimeMillis() + 5000));
+                                                                                               .schedule(this.task, new Date(System.currentTimeMillis() + 5000));
                        }
                        else {
                                this.future = PeerSubscriptionTaskScheduler.this.taskScheduler
-                                                                                               .scheduleAtFixedRate(this.task.handle(thePeer, theSub), 1000 * refreshInterval.longValue() );
+                                                                                               .scheduleAtFixedRate(this.task, 1000 * refreshInterval.longValue() );
                        }
                        return this;
                }
                
                public synchronized PeerTaskHandler runTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
-                       this.task = (PeerSubscriptionTask) PeerSubscriptionTaskScheduler.this.appCtx.getBean("peerSubscriptionTask");
+                       if (this.task != null)
+                               throw new IllegalStateException("Already scheduled");
+                       else
+                               this.task = new PeerSubscriptionTask(thePeer, theSub);
+
                        this.future = PeerSubscriptionTaskScheduler.this.taskScheduler
-                                                                                               .schedule(this.task.handle(thePeer, theSub), new Date(System.currentTimeMillis() + 5000));
+                                                                                               .schedule(this.task, new Date(System.currentTimeMillis() + 5000));
                        return this;
                }
 
@@ -272,16 +280,39 @@ public class PeerSubscriptionTaskScheduler {
                        if (this.future == null)
                                throw new IllegalStateException("Not started");
 
-                       this.future.cancel(true);
+                       this.future.cancel(false);
                        return this;
                }
 
-               public synchronized MLPPeerSubscription getSubscription() {
-                       if (this.task == null)
-                               throw new IllegalStateException("Not started");
-
-                       return this.task.getSubscription();
+               public MLPPeerSubscription getSubscription() {
+                       return (this.task == null) ? null : this.task.getSubscription();
                }
        }       
 
+       public class PeerSubscriptionTask implements Runnable {
+
+               private MLPPeer peer;
+               private MLPPeerSubscription subscription;
+
+               PeerSubscriptionTask(MLPPeer peer, MLPPeerSubscription subscription) {
+                       this.peer = peer;
+                       this.subscription = subscription;
+               }
+
+               public MLPPeer getPeer() {
+                       return this.peer;
+               }
+
+               public MLPPeerSubscription getSubscription() {
+                       return this.subscription;
+               }
+
+               @Override
+               public void run() {
+
+                       //tell whoever needs to know that this subscription is to be processed
+                       PeerSubscriptionTaskScheduler.this.eventPublisher.publishEvent(
+                                       new PeerSubscriptionEvent(this, this.peer, this.subscription));
+               }
+       }
 }
index b7d4cec..1b23c2e 100644 (file)
@@ -100,9 +100,4 @@ public class TaskConfiguration implements AsyncConfigurer {
                return new PeerSubscriptionTaskScheduler();
        }
 
-       @Bean
-       @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-       public PeerSubscriptionTask peerSubscriptionTask() {
-               return new PeerSubscriptionTask();
-       }
 }
index 546202c..ec44b32 100644 (file)
@@ -96,10 +96,6 @@ import org.springframework.test.context.junit4.SpringRunner;
 public class TaskTest {
 
        private final EELFLoggerDelegate log = EELFLoggerDelegate.getLogger(getClass().getName());
-       @MockBean(name = "clients")
-       private Clients clients;
-       @MockBean(name = "federationClient")
-       private HttpClient      federationClient;
        @Autowired
        private ApplicationContext context;
 
@@ -114,130 +110,6 @@ public class TaskTest {
                PeerSubscriptionListener listener =
                        (PeerSubscriptionListener)this.context.getBean("testListener");
 
-               try {
-                       BasicHttpResponse mockResponse = 
-                               new BasicHttpResponse(
-                                       new BasicStatusLine(
-                                               new ProtocolVersion("HTTP",1,1), 200, "Success"));
-/*
-                       String mockContent = "{" +
-                                       "\"status\": true," +
-                                       "\"response_code\": 200," +
-                                       "\"response_detail\": \"Success\"," +
-                                       "\"response_body\": [{" +
-                                       "\"solutionId\":\"6793411f-c7a1-4e93-85bc-f91d267541d8\"," +
-                                       "\"name\":\"mock model\"," +
-                               "\"description\":\"Test mock model\"," +
-                               "\"ownerId\":\"admin\"," +
-                               "\"active\":\"true\"," +
-                               "\"modelTypeCode\":\"CL\"," +
-                               "\"toolkitTypeCode\":\"\"," +
-                               "\"validationStatusCode\":\"\"," +
-                               "\"metadata\":\"acumosa\"," +
-                               "\"created\":\"2017-08-10\"," +
-                               "\"modified\":\"2017-08-11\"" +
-                               "}]}";
-
-                       byte[] mockContentBytes = mockContent.getBytes("UTF-8");
-
-                       mockResponse.setEntity(
-                               new ByteArrayEntity(mockContentBytes));
-                       mockResponse.addHeader("Content-Type", ContentType.APPLICATION_JSON.toString());
-                       mockResponse.addHeader("Content-Length", String.valueOf(mockContentBytes.length));
-*/
-                       ClassPathResource mockResource = new ClassPathResource("mockPeerSolutionsResponse.json");
-
-                       mockResponse.setEntity(
-                               new InputStreamEntity(mockResource.getInputStream()));
-                       mockResponse.addHeader("Content-Type", ContentType.APPLICATION_JSON.toString());
-                       mockResponse.addHeader("Content-Length", String.valueOf(mockResource.contentLength()));
-
-                       MLPSolution mockSolution = new MLPSolution();
-                       mockSolution.setSolutionId("1");
-                       mockSolution.setName("mock model");
-                       mockSolution.setDescription("mock model test");
-                       mockSolution.setActive(true);
-
-                       JsonResponse<List<MLPSolution>> mockPayload = new JsonResponse();
-                       mockPayload.setMessage("Success");
-                       mockPayload.setContent(Collections.singletonList(mockSolution));
-
-                       when(
-                               this.federationClient.execute(
-                                       any(HttpHost.class), any(HttpRequest.class)
-                               )
-                       ).thenReturn(mockResponse);
-
-                       when(
-                               this.federationClient.execute(
-                                       any(HttpHost.class), any(HttpRequest.class), any(HttpContext.class)
-                               )
-                       ).thenReturn(mockResponse);
-                       
-                       when(
-                               this.federationClient.execute(
-                                       any(HttpUriRequest.class)
-                               )
-                       ).thenReturn(mockResponse);
-
-//this one gets called!
-                       when(
-                               this.federationClient.execute(
-                                       any(HttpUriRequest.class), any(HttpContext.class)
-                               )
-                       //).thenReturn(mockResponse);
-                       ).thenAnswer(new Answer<HttpResponse>() {
-                                       public HttpResponse answer(InvocationOnMock theInvocation) {
-                                               return mockResponse;
-                                       }
-                               });
-
-                       when(
-                               this.federationClient.execute(
-                                       any(HttpHost.class), any(HttpRequest.class), any(ResponseHandler.class)
-                               )
-                       ).thenReturn(mockPayload);
-                                               
-                       when(
-                               this.federationClient.execute(
-                                       any(HttpHost.class), any(HttpRequest.class), any(ResponseHandler.class), any(HttpContext.class)
-                               )
-                       ).thenReturn(mockPayload);
-                       
-                       when(
-                               this.federationClient.execute(
-                                       any(HttpUriRequest.class), any(ResponseHandler.class)
-                               )
-                       ).thenReturn(mockPayload);
-                                               
-                       when(
-                               this.federationClient.execute(
-                                       any(HttpUriRequest.class), any(ResponseHandler.class), any(HttpContext.class)
-                               )
-                       ).thenReturn(mockPayload);
-
-                       //prepare the clients
-                       when(
-                               this.clients.getFederationClient(
-                                       any(String.class)
-                               )
-                       )
-                       .thenAnswer(new Answer<FederationClient>() {
-                                       public FederationClient answer(InvocationOnMock theInvocation) {
-                                               //this ends up providing a client based on the mocked http client
-                                         return new FederationClient(
-                  (String)theInvocation.getArguments()[0]/*the URI*/,
-                  federationClient);
-                                       }
-                               });
-
-       
-               }
-               catch(Exception x) {
-                       log.error(EELFLoggerDelegate.errorLogger, "Failed to setup mock", x);
-                       assertTrue(1 == 0);
-               }
-
                try {
                        boolean complete = listener.peerEventLatch.await(10, TimeUnit.SECONDS);
                        log.info(EELFLoggerDelegate.debugLogger, "event: " + complete + "/" + listener.peerEventLatch.getCount());
@@ -249,8 +121,7 @@ public class TaskTest {
                }
                //
                assertTrue(listener.event != null);
-               assertTrue(listener.event.getSolutions().size() == 1);
-               assertTrue(listener.event.getSolutions().get(0).getName().equals("mock model"));
+               assertTrue(listener.event.getSubscription().getPeerId().equals("11111111-1111-1111-1111-111111111111"));
        }
 
        public static class TaskTestConfiguration {
index 3f7a7a7..60d19a3 100644 (file)
@@ -32,13 +32,16 @@ import javax.annotation.PreDestroy;
 
 import org.acumos.cds.domain.MLPArtifact;
 import org.acumos.cds.domain.MLPPeer;
+import org.acumos.cds.domain.MLPPeerSubscription;
 import org.acumos.cds.domain.MLPSolution;
 import org.acumos.cds.domain.MLPSolutionRevision;
 import org.acumos.federation.gateway.cds.Solution;
 import org.acumos.federation.gateway.common.Clients;
 import org.acumos.federation.gateway.common.FederationClient;
+import org.acumos.federation.gateway.common.FederationException;
 import org.acumos.federation.gateway.config.EELFLoggerDelegate;
 import org.acumos.federation.gateway.event.PeerSubscriptionEvent;
+import org.acumos.federation.gateway.util.Utils;
 import org.springframework.beans.factory.annotation.Autowired;
 //import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -88,17 +91,17 @@ public class TestAdapter {
        @EventListener
        public void handlePeerSubscriptionUpdate(PeerSubscriptionEvent theEvent) {
                log.info(EELFLoggerDelegate.debugLogger, "received peer subscription update event {}", theEvent);
-               taskExecutor.execute(new TestTask(theEvent.getPeer(), theEvent.getSolutions()));
+               taskExecutor.execute(new TestTask(theEvent.getPeer(), theEvent.getSubscription()));
        }
 
        public class TestTask implements Runnable {
 
                private MLPPeer peer;
-               private List<MLPSolution> solutions;
+               private MLPPeerSubscription sub;
 
-               public TestTask(MLPPeer thePeer, List<MLPSolution> theSolutions) {
+               public TestTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
                        this.peer = thePeer;
-                       this.solutions = theSolutions;
+                       this.sub = theSub;
                }
 
                public void run() {
@@ -110,7 +113,20 @@ public class TestAdapter {
                        }
 
                        synchronized (peerImports) {
-                               for (MLPSolution solution : this.solutions) {
+
+                               List<MLPSolution> peerSolutions = null;
+                               try {
+                                       peerSolutions = (List)clients.getFederationClient(this.peer.getApiUrl())
+                                               .getSolutions(Utils.jsonStringToMap(this.sub.getSelector())).getContent();
+                               }
+                               catch (FederationException fx) {
+                                       log.warn(EELFLoggerDelegate.debugLogger, "Failed to retrieve solutions", fx);
+                                       return;
+                               }
+
+                               log.info(EELFLoggerDelegate.debugLogger, "Processing peer {} subscription {} yielded solutions {}", this.peer, this.sub.getSubId(), peerSolutions);
+
+                               for (MLPSolution solution : peerSolutions) {
 
                                        MLPSolution peerImport = peerImports.get(solution.getSolutionId());
                                        if (peerImport == null) {
index 16ff3e7..ec81fc0 100644 (file)
@@ -20,7 +20,7 @@
                "subscriptions":[
                        {
         "subId":1,
-                               "peerId":"5d2f3b51-3d38-46cd-b171-7e0f3718087a",
+                               "peerId":"11111111-1111-1111-1111-111111111111",
                                "selector":"{\"modelTypeCode\":\"CL,PR\"}",
                                "refreshInterval":"2000",
                                "maxArtifactSize":"2048",