Fix origin encoding and provisioning, add doc 87/1187/2
authorSerban Jora <sj2381@att.com>
Tue, 6 Mar 2018 06:01:14 +0000 (01:01 -0500)
committerSerban Jora <sj2381@att.com>
Tue, 6 Mar 2018 15:29:27 +0000 (10:29 -0500)
Change-Id: If5a04118e3c8e787ddfd06d684a26c5b7dc2d410
Signed-off-by: Serban Jora <sj2381@att.com>
Issue-ID: ACUMOS-276
Signed-off-by: Serban Jora <sj2381@att.com>
docs/design-notes.rst [new file with mode: 0644]
gateway/src/main/java/org/acumos/federation/gateway/adapter/PeerGateway.java
gateway/src/main/java/org/acumos/federation/gateway/controller/CatalogController.java
gateway/src/main/java/org/acumos/federation/gateway/task/PeerSubscriptionTask.java
gateway/src/main/java/org/acumos/federation/gateway/task/PeerSubscriptionTaskScheduler.java
gateway/src/test/java/org/acumos/federation/gateway/test/TestAdapter.java

diff --git a/docs/design-notes.rst b/docs/design-notes.rst
new file mode 100644 (file)
index 0000000..0c39203
--- /dev/null
@@ -0,0 +1,93 @@
+..  ===============LICENSE_START=======================================================
+..  Acumos
+..  ===================================================================================
+..  Copyright (C) 2017 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
+..  ===================================================================================
+..  This Acumos software file is distributed by AT&T and Tech Mahindra
+..  under the Apache License, Version 2.0 (the "License");
+..  you may not use this file except in compliance with the License.
+..  You may obtain a copy of the License at
+..   
+..       http://www.apache.org/licenses/LICENSE-2.0
+..   
+..  This file is distributed on an "AS IS" BASIS,
+..  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+..  See the License for the specific language governing permissions and
+..  limitations under the License.
+..  ===============LICENSE_END=========================================================
+
+
+The federation gateway is an optional component of an Acumos system whose role
+is to facilitate communication with other Acumos systems (i.e. with their gateways)
+or compatible systems (through adapters). Its role is to facilitate the exchange
+of models and their related information between Acumos instances.
+The federation gateway occupies the borderline of an Acumos system, from a logical
+and deployment perspective. From a logical perspective it is the point of control
+for the flow of model information in and out of an Acumos instance. From a deployment
+perspective (within an enterprise environment), the federation gateway will be deployed
+at the edge of the network (DMZ) with communication interfaces towards the enterprise
+network (where the rest of the Acumos instance components are deployed) and towards
+the outside world (where other Acumos instances are deployed).
+We call the external interface (towards the gateways of other Acumos instances) the
+*federation interface* and we call the internal interface (towards the other components
+of the same Acumos instance) the *local interface*.
+The design of the gateway reflects this duality: the gateway defines/offers a set of
+REST APIs on its federation interface for gateway-2-gateway (or gateway-2-adapter)
+communication and another set on the local interface for component-2-gateway communication.
+
+Federation concepts
+
+The federation gateway behaviour is driven by the peer and subscription information provisioned
+in the CDS. Through the local interface API other components can trigger gateway
+behaviour, i.e. trigger interactions with peers. 
+The peer information represents all other Acumos systems (or other through adapters) this system
+has agreed to communicate/exchange information with. The 'handshake' procedure by which 2 systems
+agreed to communicate and provision the required information can take place 'out-of-band' (email,etc
++ provisining) or 'in-band' (a combination of federation REST API and provisioning actions).
+
+When enabling federation an Acumos system agrees to share its public, validated models (their
+revisions) with its peers. (a discussion on ACL driven/selective sharing control will come here later).
+Establishing a relationship does not in itself imply that any exchange of information takes place.
+Information exchange (models) is driven by subscriptions provisioned in the local CDS. In Acumos every
+peer is responsible for pulling from its peers the models it is interested in (such an interaction
+.goes through the peers' federation gateway who controls/filters access to its local models).
+A subscription towards a peer represents a subset of that peers' model set that this Acumos is interested in.
+The subscription information is there to drive the behaviour of the federation gateway (who does
+the actual peer polling and local provisining of the retrieved information); there is no subscription
+information shared between peers. An Acumos instance can have multiple subscriptions towards another
+peer. A subscription can range from one specific model to all the models a peer exposes (with any
+combination of model level selection criteria in between). A subscription further specifies
+options such as the frequency with each the federation gateway should check for updates, how much
+model information should be retrieved every time,etc.
+
+It is important to notice that the federation gateway mechanisms for model information exchange
+does not impose an overall peer organization/deployment architecture: tree like structures, fully or sparse
+connected graphs, etc are all possible.
+
+
+
+Federation mechanisms
+
+Before any interaction with a peer can take place the peer information needs to be provisoned
+in the local CDS. A federation gateway has a dual role, as a server when responding to requests
+from its peers and as a client when requesting information from them. The federation gateway
+uses mutual authentication (https,tls), i.e. when a connection is established between 2 gateways
+both sides need to present their certificates (signed by accepred CAs and so on). The subjectName
+entry in a certificate received from a peer serves to identify the peer against the locally (CDS)
+provisioned peer collection (the are no passwords or other credentials provisioned/exchanged).
+
+The gateway periodically processes the list of locally provisioned peers; where subscriptions
+towards a peer are found they are assigned to tasks who will query the peer with the given
+subscription selector. Each resulting model will be compared against locally available
+model information (in CDS) and new model/new revisions+artifacts will be fetched and provisioned.
+
+In addition to the model information exchange APIs the federation gateway offers APIs for:
+
+- status information (ping)
+- in-band registration
+- peer information sharing
+
+Other
+
+At this point the federation gateway relies on only one Acumos component, the CDS.
index 4c3e473..4313ab8 100644 (file)
@@ -185,24 +185,26 @@ public class PeerGateway {
                        }
                }
 
-               private MLPSolution createMLPSolution(MLPSolution peerMLPSolution, ICommonDataServiceRestClient cdsClient) {
+               private MLPSolution createMLPSolution(MLPSolution peerSolution, ICommonDataServiceRestClient cdsClient) {
                        log.info(EELFLoggerDelegate.debugLogger,
-                                       "Creating Local MLP Solution for peer solution " + peerMLPSolution);
+                                       "Creating Local MLP Solution for peer solution " + peerSolution);
                        MLPSolution localSolution = new MLPSolution();
-                       localSolution.setSolutionId(peerMLPSolution.getSolutionId());
-                       localSolution.setName(peerMLPSolution.getName());
-                       localSolution.setDescription(peerMLPSolution.getDescription());
-                       localSolution.setAccessTypeCode(AccessTypeCode.PB.toString());
-                       localSolution.setMetadata(peerMLPSolution.getMetadata());
-                       localSolution.setModelTypeCode(peerMLPSolution.getModelTypeCode());
-                       localSolution.setProvider("ATTAcumosInc");
-                       localSolution.setActive(peerMLPSolution.isActive());
-                       localSolution.setToolkitTypeCode(peerMLPSolution.getToolkitTypeCode());
-                       localSolution.setValidationStatusCode(ValidationStatusCode.NV.toString());
-                       localSolution.setCreated(peerMLPSolution.getCreated());
-                       localSolution.setModified(peerMLPSolution.getModified());
+                       localSolution.setSolutionId(peerSolution.getSolutionId());
+                       localSolution.setName(peerSolution.getName());
+                       localSolution.setDescription(peerSolution.getDescription());
+                       localSolution.setAccessTypeCode(this.sub.getAccessType());
+                       localSolution.setMetadata(peerSolution.getMetadata());
+                       localSolution.setModelTypeCode(peerSolution.getModelTypeCode());
+                       localSolution.setProvider(this.peer.getName());
+                       localSolution.setActive(peerSolution.isActive());
+                       localSolution.setToolkitTypeCode(peerSolution.getToolkitTypeCode());
+                       localSolution.setValidationStatusCode(this.peer.getValidationStatusCode());
+                       //should the creted/modified reflect this information or the information we got from the peer ?
+                       localSolution.setCreated(peerSolution.getCreated());
+                       localSolution.setModified(peerSolution.getModified());
                        localSolution.setOwnerId(getOwnerId(this.sub));
                        localSolution.setSourceId(this.peer.getPeerId());
+                       localSolution.setOrigin(peerSolution.getOrigin());
                        try {
                                cdsClient.createSolution(localSolution);
                                return localSolution;
@@ -218,20 +220,21 @@ public class PeerGateway {
                        }
                }
 
-               private MLPSolutionRevision createMLPSolutionRevision(MLPSolutionRevision mlpSolutionRevision,
+               private MLPSolutionRevision createMLPSolutionRevision(MLPSolutionRevision peerSolutionRevision,
                                ICommonDataServiceRestClient cdsClient) {
-                       MLPSolutionRevision solutionRevision = new MLPSolutionRevision();
-                       solutionRevision.setSolutionId(mlpSolutionRevision.getSolutionId());
-                       solutionRevision.setRevisionId(mlpSolutionRevision.getRevisionId());
-                       solutionRevision.setVersion(mlpSolutionRevision.getVersion());
-                       solutionRevision.setDescription(mlpSolutionRevision.getDescription());
-                       solutionRevision.setOwnerId(getOwnerId(this.sub));
-                       solutionRevision.setMetadata(mlpSolutionRevision.getMetadata());
-                       solutionRevision.setCreated(mlpSolutionRevision.getCreated());
-                       solutionRevision.setModified(mlpSolutionRevision.getModified());
+                       MLPSolutionRevision localSolutionRevision = new MLPSolutionRevision();
+                       localSolutionRevision.setSolutionId(peerSolutionRevision.getSolutionId());
+                       localSolutionRevision.setRevisionId(peerSolutionRevision.getRevisionId());
+                       localSolutionRevision.setVersion(peerSolutionRevision.getVersion());
+                       localSolutionRevision.setDescription(peerSolutionRevision.getDescription());
+                       localSolutionRevision.setOwnerId(getOwnerId(this.sub));
+                       localSolutionRevision.setMetadata(peerSolutionRevision.getMetadata());
+                       localSolutionRevision.setCreated(peerSolutionRevision.getCreated());
+                       localSolutionRevision.setModified(peerSolutionRevision.getModified());
+                       localSolutionRevision.setSourceId(this.peer.getPeerId());
                        try {
-                               cdsClient.createSolutionRevision(solutionRevision);
-                               return solutionRevision;
+                               cdsClient.createSolutionRevision(localSolutionRevision);
+                               return localSolutionRevision;
                        }
                        catch (HttpStatusCodeException restx) {
                                log.error(EELFLoggerDelegate.errorLogger,
@@ -292,48 +295,55 @@ public class PeerGateway {
                        return localMLPArtifact;
                }
 
-               private MLPSolution updateMLPSolution(MLPSolution peerMLPSolution, MLPSolution localMLPSolution,
+               private MLPSolution updateMLPSolution(MLPSolution peerSolution, MLPSolution localSolution,
                                ICommonDataServiceRestClient cdsClient) {
                        log.info(EELFLoggerDelegate.debugLogger,
-                                       "Updating Local MLP Solution for peer solution " + peerMLPSolution);
+                                       "Updating Local MLP Solution for peer solution " + peerSolution);
 
-                       if (!peerMLPSolution.getSolutionId().equals(localMLPSolution.getSolutionId()))
+                       if (!peerSolution.getSolutionId().equals(localSolution.getSolutionId()))
                                throw new IllegalArgumentException("Local and Peer identifier mismatch");
 
-                       localMLPSolution.setSolutionId(peerMLPSolution.getSolutionId());
-                       localMLPSolution.setName(peerMLPSolution.getName());
-                       localMLPSolution.setDescription(peerMLPSolution.getDescription());
-                       localMLPSolution.setAccessTypeCode(peerMLPSolution.getAccessTypeCode());
-                       localMLPSolution.setMetadata(peerMLPSolution.getMetadata());
-                       localMLPSolution.setModelTypeCode(peerMLPSolution.getModelTypeCode());
-                       localMLPSolution.setProvider(peerMLPSolution.getProvider());
-                       localMLPSolution.setActive(peerMLPSolution.isActive());
-                       localMLPSolution.setToolkitTypeCode(peerMLPSolution.getToolkitTypeCode());
-                       // an update needs to be re-validated
-                       localMLPSolution.setValidationStatusCode(ValidationStatusCode.NV.toString());
+                       //localSolution.setSolutionId(peerSolution.getSolutionId());
+                       localSolution.setName(peerSolution.getName());
+                       localSolution.setDescription(peerSolution.getDescription());
+                       localSolution.setAccessTypeCode(peerSolution.getAccessTypeCode());
+                       localSolution.setMetadata(peerSolution.getMetadata());
+                       localSolution.setModelTypeCode(peerSolution.getModelTypeCode());
+                       localSolution.setProvider(peerSolution.getProvider());
+                       localSolution.setActive(peerSolution.isActive());
+                       localSolution.setToolkitTypeCode(peerSolution.getToolkitTypeCode());
+                       // reset validation status to its default
+                       localSolution.setValidationStatusCode(this.peer.getValidationStatusCode());
                        {
                                String newOwnerId = getOwnerId(this.sub);
-                               if (!newOwnerId.equals(localMLPSolution.getOwnerId())) {
+                               if (!newOwnerId.equals(localSolution.getOwnerId())) {
                                        // is this solution being updated as part of different/new subscription?
-                                       log.warn(EELFLoggerDelegate.errorLogger, "updating solution " + localMLPSolution.getSolutionId()
+                                       log.warn(EELFLoggerDelegate.errorLogger, "updating solution " + localSolution.getSolutionId()
                                                        + " as part of subscription " + this.sub.getSubId() + " triggers an ownership change");
                                }
-                               localMLPSolution.setOwnerId(newOwnerId);
+                               localSolution.setOwnerId(newOwnerId);
                        }
 
                        {
-                               String newSourceId = this.peer.getPeerId();
-                               if (!newSourceId.equals(localMLPSolution.getSourceId())) {
-                                       // we will see this if a solution is available in more than one peer
-                                       log.warn(EELFLoggerDelegate.errorLogger, "updating solution " + localMLPSolution.getSolutionId()
-                                                       + " as part of subscription " + this.sub.getSubId() + " triggers a source change");
+                               if (localSolution.getSourceId() == null) {
+                                       //this is a local solution that made its way back
+                                       log.info(EELFLoggerDelegate.debugLogger, "Solution " + localSolution.getSolutionId()
+                                                       + " as part of subscription " + this.sub.getSubId() + " was originally provisioned locally");
+                               }
+                               else {
+                                       String newSourceId = this.peer.getPeerId();
+                                       if (!newSourceId.equals(localSolution.getSourceId())) {
+                                               // we will see this if a solution is available in more than one peer
+                                               log.warn(EELFLoggerDelegate.errorLogger, "updating solution " + localSolution.getSolutionId()
+                                                               + " as part of subscription " + this.sub.getSubId() + " triggers a source change");
+                                       }
+                                       localSolution.setSourceId(newSourceId);
                                }
-                               localMLPSolution.setSourceId(newSourceId);
                        }
 
                        try {
-                               cdsClient.updateSolution(localMLPSolution);
-                               return localMLPSolution;
+                               cdsClient.updateSolution(localSolution);
+                               return localSolution;
                        }
                        catch (HttpStatusCodeException restx) {
                                log.error(EELFLoggerDelegate.errorLogger,
index 37e6b5a..f7dfaf4 100644 (file)
@@ -21,6 +21,8 @@
 package org.acumos.federation.gateway.controller;
 
 import java.net.URI;
+import java.net.URISyntaxException;
+
 import java.util.List;
 import java.util.Map;
 
@@ -81,7 +83,7 @@ public class CatalogController extends AbstractController {
        @RequestMapping(value = { API.Paths.SOLUTIONS }, method = RequestMethod.GET, produces = APPLICATION_JSON)
        @ResponseBody
        public JsonResponse<List<MLPSolution>> getSolutions(
-                       /* HttpServletRequest theHttpRequest, */
+                       HttpServletRequest theHttpRequest,
                        HttpServletResponse theHttpResponse,
                        @RequestParam(value = API.QueryParameters.SOLUTIONS_SELECTOR, required = false) String theSelector) {
                JsonResponse<List<MLPSolution>> response = null;
@@ -95,6 +97,12 @@ public class CatalogController extends AbstractController {
                                selector = Utils.jsonStringToMap(new String(Base64Utils.decodeFromString(theSelector), "UTF-8"));
 
                        solutions = catalogService.getSolutions(selector, new ControllerContext());
+                       if (solutions != null) {
+                               for (MLPSolution solution: solutions) {
+                                       encodeSolution(solution, theHttpRequest);
+                               }
+                       }
+
                        response = JsonResponse.<List<MLPSolution>> buildResponse()
                                                                                                                .withMessage("available public solution for given filter")
                                                                                                                .withContent(solutions)
@@ -117,13 +125,16 @@ public class CatalogController extends AbstractController {
        @ApiOperation(value = "Invoked by Peer Acumos to get a list detailed solution information from the Catalog of the local Acumos Instance .", response = MLPSolution.class)
        @RequestMapping(value = { API.Paths.SOLUTION_DETAILS }, method = RequestMethod.GET, produces = APPLICATION_JSON)
        @ResponseBody
-       public JsonResponse<MLPSolution> getSolutionDetails(HttpServletResponse theHttpResponse,
+       public JsonResponse<MLPSolution> getSolutionDetails(
+                       HttpServletRequest theHttpRequest,
+                       HttpServletResponse theHttpResponse,
                        @PathVariable(value = "solutionId") String theSolutionId) {
                JsonResponse<MLPSolution> response = null;
                MLPSolution solution = null;
                log.debug(EELFLoggerDelegate.debugLogger, API.Paths.SOLUTION_DETAILS + ": " + theSolutionId);
                try {
                        solution = catalogService.getSolution(theSolutionId, new ControllerContext());
+                       encodeSolution(solution, theHttpRequest);
                        response = JsonResponse.<MLPSolution> buildResponse()
                                                                                                                .withMessage("solution details")
                                                                                                                .withContent(solution)
@@ -250,15 +261,7 @@ public class CatalogController extends AbstractController {
                                        !context.getPeer().getPeerInfo().isLocal()) {
                                // re-encode the artifact uri
                                for (MLPArtifact artifact : solutionRevisionArtifacts) {
-                                       // sooo cumbersome
-                                       URI requestUri = new URI(theHttpRequest.getRequestURL().toString());
-                                       URI artifactUri = API.ARTIFACT_DOWNLOAD
-                                                       .buildUri(
-                                                                       new URI(requestUri.getScheme(), null, requestUri.getHost(),
-                                                                                       requestUri.getPort(), null, null, null).toString(),
-                                                                       artifact.getArtifactId());
-                                       log.debug(EELFLoggerDelegate.debugLogger,       "getSolutionRevisionArtifacts: content uri " + artifactUri);
-                                       artifact.setUri(artifactUri.toString());
+                                       encodeArtifact(artifact, theHttpRequest);
                                }
                        }
                        response = JsonResponse.<List<MLPArtifact>> buildResponse()
@@ -313,4 +316,29 @@ public class CatalogController extends AbstractController {
                return inputStreamResource;
        }
 
+       /** */
+       private void encodeSolution(MLPSolution theSolution, HttpServletRequest theRequest) throws URISyntaxException {
+               //encode the 'origin'
+               if (null == theSolution.getOrigin()) {
+                       URI requestUri = new URI(theRequest.getRequestURL().toString());
+                       URI solutionUri = API.SOLUTION_DETAIL
+                                                                                               .buildUri(
+                                                                                                       new URI(requestUri.getScheme(), null, requestUri.getHost(),
+                                                                                                                                       requestUri.getPort(), null, null, null).toString(),
+                                                                                                       theSolution.getSolutionId());
+                       theSolution.setOrigin(solutionUri.toString());  
+               }
+       }
+       
+       /** */
+       private void encodeArtifact(MLPArtifact theArtifact, HttpServletRequest theRequest) throws URISyntaxException {
+               URI requestUri = new URI(theRequest.getRequestURL().toString());
+               URI artifactUri = API.ARTIFACT_DOWNLOAD
+                                                                                       .buildUri(
+                                                                                               new URI(requestUri.getScheme(), null, requestUri.getHost(),
+                                                                                                                               requestUri.getPort(), null, null, null).toString(),
+                                                                                               theArtifact.getArtifactId());
+               log.debug(EELFLoggerDelegate.debugLogger,       "getSolutionRevisionArtifacts: content uri " + artifactUri);
+               theArtifact.setUri(artifactUri.toString());
+       }
 }
index 08771da..838eb4c 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.acumos.federation.gateway.task;
 
+import java.util.Date;
 import java.util.List;
 
 import org.acumos.cds.domain.MLPPeer;
@@ -51,8 +52,8 @@ public class PeerSubscriptionTask implements Runnable {
        @Autowired
        private ApplicationEventPublisher eventPublisher;
 
-       private MLPPeer mlpPeer;
-       private MLPPeerSubscription mlpSubscription;
+       private MLPPeer peer;
+       private MLPPeerSubscription subscription;
 
        @Autowired
        private Clients clients;
@@ -61,44 +62,46 @@ public class PeerSubscriptionTask implements Runnable {
        }
 
        public PeerSubscriptionTask handle(MLPPeer peer, MLPPeerSubscription subscription) {
-               this.mlpPeer = peer;
-               this.mlpSubscription = subscription;
+               this.peer = peer;
+               this.subscription = subscription;
                return this;
        }
 
        public MLPPeer getPeer() {
-               return this.mlpPeer;
+               return this.peer;
        }
 
        public MLPPeerSubscription getSubscription() {
-               return this.mlpSubscription;
+               return this.subscription;
        }
 
        @Override
        public void run() {
 
-               if (this.mlpPeer == null || this.mlpSubscription == null) {
+               if (this.peer == null || this.subscription == null) {
                        log.info(EELFLoggerDelegate.debugLogger, "Peer task has no peer subscription info");
                        return;
                }
 
                try {
-                       log.info(EELFLoggerDelegate.debugLogger, "Peer task for " + mlpPeer.getName() + ", " + mlpPeer.getApiUrl() + ", " + mlpSubscription.getSelector());
-                       FederationClient fedClient = clients.getFederationClient(this.mlpPeer.getApiUrl());
+                       log.info(EELFLoggerDelegate.debugLogger, "Peer task for peer {}, subscription {}", this.peer.getName(), this.subscription.getSubId());
+                       FederationClient fedClient = clients.getFederationClient(this.peer.getApiUrl());
                        JsonResponse<List<MLPSolution>> response =
-                               fedClient.getSolutions(Utils.jsonStringToMap(mlpSubscription.getSelector()));
-                       log.debug(EELFLoggerDelegate.debugLogger,
-                                               "Peer task got response " + response + " for " + mlpPeer.getName() + ", " + mlpPeer.getApiUrl() + ", " + mlpSubscription.getSelector());
+                               fedClient.getSolutions(Utils.jsonStringToMap(this.subscription.getSelector()));
+                       log.info(EELFLoggerDelegate.debugLogger, "Peer task for peer {}, subscription {} got response {}", this.peer.getName(), this.subscription.getSubId(), response);
                        if (response != null && response.getContent() != null) {
-                               List<MLPSolution> mlpSolutions = response.getContent();
-                               if (mlpSolutions.size() > 0) {
+                               List<MLPSolution> solutions = response.getContent();
+                               if (solutions.size() > 0) {
                                        this.eventPublisher.publishEvent(
-                                                       new PeerSubscriptionEvent(this, this.mlpPeer, this.mlpSubscription, mlpSolutions));
+                                                       new PeerSubscriptionEvent(this, this.peer, this.subscription, solutions));
                                }
                        }
+
+                       this.subscription.setProcessed(new Date());
+                       this.clients.getCDSClient().updatePeerSubscription(this.subscription);
                }
                catch (Exception x) {
-                       log.error(EELFLoggerDelegate.errorLogger, "Peer task failed for " + mlpPeer.getName() + ", " + mlpPeer.getApiUrl() + ", " + mlpSubscription.getSelector(), x);
+                       log.error(EELFLoggerDelegate.errorLogger, "Peer task failed for " + peer.getName() + ", " + peer.getApiUrl() + ", " + subscription.getSelector(), x);
                }
        }
 }
index 7ed4b3c..dbd7d16 100644 (file)
@@ -124,33 +124,49 @@ public class PeerSubscriptionTaskScheduler {
                new PeerTaskHandler().runTask(thePeer, theSub);
        }
 
+       /** */
+       private boolean shouldRun(MLPPeerSubscription theSub) {
+               if (theSub.getRefreshInterval() == null) {
+                       //on demand only subscription
+                       return false;
+               }
+
+               if (theSub.getRefreshInterval().longValue() == 0 &&
+                               theSub.getProcessed() != null) {
+                       //one timer that has already been processed
+                       return false;
+               }
+
+               return true;
+       }
+
        @Scheduled(initialDelay = 5000, fixedRateString = "${peer.jobchecker.interval:400}000")
        public void checkPeerJobs() {
 
                log.debug(EELFLoggerDelegate.debugLogger, "checkPeerSubscriptionJobs");
                // Get the List of MLP Peers
-               List<MLPPeer> mlpPeers = peerService.getPeers();
-               if (Utils.isEmptyList(mlpPeers)) {
+               List<MLPPeer> peers = peerService.getPeers();
+               if (Utils.isEmptyList(peers)) {
                        log.info(EELFLoggerDelegate.debugLogger, "no peers from " + peerService);
                        return;
                }
 
-               for (MLPPeer mlpPeer : mlpPeers) {
-                       log.info(EELFLoggerDelegate.debugLogger, "checkPeer : " + mlpPeer);
+               for (MLPPeer peer : peers) {
+                       log.info(EELFLoggerDelegate.debugLogger, "check peer {}", peer);
 
-                       if (mlpPeer.isSelf())
+                       if (peer.isSelf())
                                continue;
 
                        // cancel peer tasks for inactive peers
-                       if (PeerStatus.Active != PeerStatus.forCode(mlpPeer.getStatusCode())) {
+                       if (PeerStatus.Active != PeerStatus.forCode(peer.getStatusCode())) {
                                // cancel all peer sub tasks for this peer
                                log.debug(EELFLoggerDelegate.debugLogger,
-                                               "checkPeer : peer " + mlpPeer + " no longer active, removing active tasks");
-                               Map<Long, PeerTaskHandler> subsTask = this.peersSubsTask.row(mlpPeer.getPeerId());
+                                               "peer {} no longer active, removing active tasks", peer);
+                               Map<Long, PeerTaskHandler> subsTask = this.peersSubsTask.row(peer.getPeerId());
                                if (subsTask != null) {
                                        for (Map.Entry<Long, PeerTaskHandler> subTaskEntry : subsTask.entrySet()) {
                                                subTaskEntry.getValue().stopTask();
-                                               this.peersSubsTask.remove(mlpPeer.getPeerId(), subTaskEntry.getKey());
+                                               this.peersSubsTask.remove(peer.getPeerId(), subTaskEntry.getKey());
                                        }
                                }
 
@@ -159,22 +175,22 @@ public class PeerSubscriptionTaskScheduler {
                                continue;
                        }
 
-                       List<MLPPeerSubscription> mlpSubs = peerSubscriptionService.getPeerSubscriptions(mlpPeer.getPeerId());
-                       if (Utils.isEmptyList(mlpSubs)) {
+                       List<MLPPeerSubscription> subs = peerSubscriptionService.getPeerSubscriptions(peer.getPeerId());
+                       if (Utils.isEmptyList(subs)) {
                                // the peer is still there but has no subscriptions: cancel any ongoing tasks
 
                                continue;
                        }
 
-                       for (MLPPeerSubscription mlpSub : mlpSubs) {
-                               log.info(EELFLoggerDelegate.debugLogger, "checkSub " + mlpSub);
-                               PeerTaskHandler peerSubTask = peersSubsTask.get(mlpPeer.getPeerId(), mlpSub.getSubId());
+                       for (MLPPeerSubscription sub : subs) {
+                               log.info(EELFLoggerDelegate.debugLogger, "checkSub " + sub);
+                               PeerTaskHandler peerSubTask = peersSubsTask.get(peer.getPeerId(), sub.getSubId());
                                if (peerSubTask != null) {
                                        // was the subscription updated? if yes, cancel current task.
-                                       MLPPeerSubscription mlpCurrentSub = peerSubTask.getSubscription();
-                                       if (!same(mlpSub, mlpCurrentSub)) {
+                                       MLPPeerSubscription currentSub = peerSubTask.getSubscription();
+                                       if (!same(sub, currentSub)) {
                                                log.debug(EELFLoggerDelegate.debugLogger,
-                                                               "checkSub: subscription was updated, stopping current task");
+                                                               "subscription {} was updated, stopping current task", sub.getSubId());
                                                peerSubTask.stopTask();
                                                peerSubTask = null; // in order to trigger its reset below: no need to remove the entry as we
                                                                                        // are about
@@ -182,10 +198,12 @@ public class PeerSubscriptionTaskScheduler {
                                        }
                                }
 
-                               if (peerSubTask == null) {
-                                       log.info(EELFLoggerDelegate.debugLogger, "Scheduled peer sub task for " + mlpPeer.getApiUrl());
-                                       this.peersSubsTask.put(mlpPeer.getPeerId(), mlpSub.getSubId(),
-                                                       new PeerTaskHandler().startTask(mlpPeer, mlpSub));
+                               if (peerSubTask == null && shouldRun(sub)) {
+                                       log.info(EELFLoggerDelegate.debugLogger, "Scheduling peer sub task for peer {}, subscription {}", peer.getName(), sub.getSubId());
+                                       PeerTaskHandler hnd = new PeerTaskHandler().startTask(peer, sub);
+                                       if (hnd != null) {
+                                               this.peersSubsTask.put(peer.getPeerId(), sub.getSubId(), hnd);
+                                       }
                                }
                        }
                }
@@ -202,16 +220,16 @@ public class PeerSubscriptionTaskScheduler {
 
                public synchronized PeerTaskHandler startTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
                        Long refreshInterval = theSub.getRefreshInterval();
-                       if (refreshInterval == null)
-                               return null;
-                       
+       
                        this.task = (PeerSubscriptionTask) PeerSubscriptionTaskScheduler.this.appCtx.getBean("peerSubscriptionTask");
-                       if (refreshInterval.longValue() == 0)
+                       if (refreshInterval.longValue() == 0) {
                                this.future = PeerSubscriptionTaskScheduler.this.threadPoolTaskScheduler
                                                                                                .schedule(this.task.handle(thePeer, theSub), new Date(System.currentTimeMillis() + 5000));
-                       else
+                       }
+                       else {
                                this.future = PeerSubscriptionTaskScheduler.this.threadPoolTaskScheduler
                                                                                                .scheduleAtFixedRate(this.task.handle(thePeer, theSub), 1000 * refreshInterval.longValue() );
+                       }
                        return this;
                }
                
index 491c6ad..843e68a 100644 (file)
@@ -114,16 +114,17 @@ public class TestAdapter {
 
                                        MLPSolution peerImport = peerImports.get(solution.getSolutionId());
                                        if (peerImport == null) {
-                                               log.debug(EELFLoggerDelegate.debugLogger, "New solution");
+                                               log.info(EELFLoggerDelegate.debugLogger, "New solution {}", solution.getSolutionId());
                                                peerImports.put(solution.getSolutionId(), solution);
                                        }
                                        else {
-                                               log.debug(EELFLoggerDelegate.debugLogger, "Existing solution");
+                                               log.info(EELFLoggerDelegate.debugLogger, "Existing solution {}", solution.getSolutionId());
                                                if (peerImport.getModified().equals(solution.getModified())) {
-                                                       log.debug(EELFLoggerDelegate.debugLogger, "No updates");
+                                                       log.info(EELFLoggerDelegate.debugLogger, "No updates to solution {}", solution.getSolutionId());
+                                                       continue;
                                                }
                                                else {
-                                                       log.debug(EELFLoggerDelegate.debugLogger, "Has updates");
+                                                       log.info(EELFLoggerDelegate.debugLogger, "Solution {} has updates", solution.getSolutionId());
                                                }
                                        }
 
@@ -138,8 +139,8 @@ public class TestAdapter {
                                                log.error(EELFLoggerDelegate.errorLogger, "Failed to retrieve revisions", x);
                                                continue;
                                        }
-                                       log.debug(EELFLoggerDelegate.debugLogger,
-                                                       "Received " + revisions.size() + " revisions " + revisions);
+                                       log.info(EELFLoggerDelegate.debugLogger,
+                                                       "Received {} revisions {}", revisions.size(), revisions);
 
                                        List<MLPArtifact> artifacts = null;
                                        try {
@@ -151,7 +152,7 @@ public class TestAdapter {
                                                continue;
                                        }
                                        log.info(EELFLoggerDelegate.debugLogger,
-                                                       "Received " + artifacts.size() + " artifacts " + artifacts);
+                                                       "Received {} artifacts {}", artifacts.size(), artifacts);
 
                                        for (MLPArtifact artifact : artifacts) {
                                                Resource artifactContent = null;