--- /dev/null
+.. ===============LICENSE_START=======================================================
+.. Acumos
+.. ===================================================================================
+.. Copyright (C) 2017 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
+.. ===================================================================================
+.. This Acumos software file is distributed by AT&T and Tech Mahindra
+.. under the Apache License, Version 2.0 (the "License");
+.. you may not use this file except in compliance with the License.
+.. You may obtain a copy of the License at
+..
+.. http://www.apache.org/licenses/LICENSE-2.0
+..
+.. This file is distributed on an "AS IS" BASIS,
+.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+.. See the License for the specific language governing permissions and
+.. limitations under the License.
+.. ===============LICENSE_END=========================================================
+
+
+The federation gateway is an optional component of an Acumos system whose role
+is to facilitate communication with other Acumos systems (i.e. with their gateways)
+or compatible systems (through adapters). Its role is to facilitate the exchange
+of models and their related information between Acumos instances.
+The federation gateway occupies the borderline of an Acumos system, from a logical
+and deployment perspective. From a logical perspective it is the point of control
+for the flow of model information in and out of an Acumos instance. From a deployment
+perspective (within an enterprise environment), the federation gateway will be deployed
+at the edge of the network (DMZ) with communication interfaces towards the enterprise
+network (where the rest of the Acumos instance components are deployed) and towards
+the outside world (where other Acumos instances are deployed).
+We call the external interface (towards the gateways of other Acumos instances) the
+*federation interface* and we call the internal interface (towards the other components
+of the same Acumos instance) the *local interface*.
+The design of the gateway reflects this duality: the gateway defines/offers a set of
+REST APIs on its federation interface for gateway-2-gateway (or gateway-2-adapter)
+communication and another set on the local interface for component-2-gateway communication.
+
+Federation concepts
+
+The federation gateway behaviour is driven by the peer and subscription information provisioned
+in the CDS. Through the local interface API other components can trigger gateway
+behaviour, i.e. trigger interactions with peers.
+The peer information represents all other Acumos systems (or other through adapters) this system
+has agreed to communicate/exchange information with. The 'handshake' procedure by which 2 systems
+agreed to communicate and provision the required information can take place 'out-of-band' (email,etc
++ provisining) or 'in-band' (a combination of federation REST API and provisioning actions).
+
+When enabling federation an Acumos system agrees to share its public, validated models (their
+revisions) with its peers. (a discussion on ACL driven/selective sharing control will come here later).
+Establishing a relationship does not in itself imply that any exchange of information takes place.
+Information exchange (models) is driven by subscriptions provisioned in the local CDS. In Acumos every
+peer is responsible for pulling from its peers the models it is interested in (such an interaction
+.goes through the peers' federation gateway who controls/filters access to its local models).
+A subscription towards a peer represents a subset of that peers' model set that this Acumos is interested in.
+The subscription information is there to drive the behaviour of the federation gateway (who does
+the actual peer polling and local provisining of the retrieved information); there is no subscription
+information shared between peers. An Acumos instance can have multiple subscriptions towards another
+peer. A subscription can range from one specific model to all the models a peer exposes (with any
+combination of model level selection criteria in between). A subscription further specifies
+options such as the frequency with each the federation gateway should check for updates, how much
+model information should be retrieved every time,etc.
+
+It is important to notice that the federation gateway mechanisms for model information exchange
+does not impose an overall peer organization/deployment architecture: tree like structures, fully or sparse
+connected graphs, etc are all possible.
+
+
+
+Federation mechanisms
+
+Before any interaction with a peer can take place the peer information needs to be provisoned
+in the local CDS. A federation gateway has a dual role, as a server when responding to requests
+from its peers and as a client when requesting information from them. The federation gateway
+uses mutual authentication (https,tls), i.e. when a connection is established between 2 gateways
+both sides need to present their certificates (signed by accepred CAs and so on). The subjectName
+entry in a certificate received from a peer serves to identify the peer against the locally (CDS)
+provisioned peer collection (the are no passwords or other credentials provisioned/exchanged).
+
+The gateway periodically processes the list of locally provisioned peers; where subscriptions
+towards a peer are found they are assigned to tasks who will query the peer with the given
+subscription selector. Each resulting model will be compared against locally available
+model information (in CDS) and new model/new revisions+artifacts will be fetched and provisioned.
+
+In addition to the model information exchange APIs the federation gateway offers APIs for:
+
+- status information (ping)
+- in-band registration
+- peer information sharing
+
+Other
+
+At this point the federation gateway relies on only one Acumos component, the CDS.
+
}
}
- private MLPSolution createMLPSolution(MLPSolution peerMLPSolution, ICommonDataServiceRestClient cdsClient) {
+ private MLPSolution createMLPSolution(MLPSolution peerSolution, ICommonDataServiceRestClient cdsClient) {
log.info(EELFLoggerDelegate.debugLogger,
- "Creating Local MLP Solution for peer solution " + peerMLPSolution);
+ "Creating Local MLP Solution for peer solution " + peerSolution);
MLPSolution localSolution = new MLPSolution();
- localSolution.setSolutionId(peerMLPSolution.getSolutionId());
- localSolution.setName(peerMLPSolution.getName());
- localSolution.setDescription(peerMLPSolution.getDescription());
- localSolution.setAccessTypeCode(AccessTypeCode.PB.toString());
- localSolution.setMetadata(peerMLPSolution.getMetadata());
- localSolution.setModelTypeCode(peerMLPSolution.getModelTypeCode());
- localSolution.setProvider("ATTAcumosInc");
- localSolution.setActive(peerMLPSolution.isActive());
- localSolution.setToolkitTypeCode(peerMLPSolution.getToolkitTypeCode());
- localSolution.setValidationStatusCode(ValidationStatusCode.NV.toString());
- localSolution.setCreated(peerMLPSolution.getCreated());
- localSolution.setModified(peerMLPSolution.getModified());
+ localSolution.setSolutionId(peerSolution.getSolutionId());
+ localSolution.setName(peerSolution.getName());
+ localSolution.setDescription(peerSolution.getDescription());
+ localSolution.setAccessTypeCode(this.sub.getAccessType());
+ localSolution.setMetadata(peerSolution.getMetadata());
+ localSolution.setModelTypeCode(peerSolution.getModelTypeCode());
+ localSolution.setProvider(this.peer.getName());
+ localSolution.setActive(peerSolution.isActive());
+ localSolution.setToolkitTypeCode(peerSolution.getToolkitTypeCode());
+ localSolution.setValidationStatusCode(this.peer.getValidationStatusCode());
+ //should the creted/modified reflect this information or the information we got from the peer ?
+ localSolution.setCreated(peerSolution.getCreated());
+ localSolution.setModified(peerSolution.getModified());
localSolution.setOwnerId(getOwnerId(this.sub));
localSolution.setSourceId(this.peer.getPeerId());
+ localSolution.setOrigin(peerSolution.getOrigin());
try {
cdsClient.createSolution(localSolution);
return localSolution;
}
}
- private MLPSolutionRevision createMLPSolutionRevision(MLPSolutionRevision mlpSolutionRevision,
+ private MLPSolutionRevision createMLPSolutionRevision(MLPSolutionRevision peerSolutionRevision,
ICommonDataServiceRestClient cdsClient) {
- MLPSolutionRevision solutionRevision = new MLPSolutionRevision();
- solutionRevision.setSolutionId(mlpSolutionRevision.getSolutionId());
- solutionRevision.setRevisionId(mlpSolutionRevision.getRevisionId());
- solutionRevision.setVersion(mlpSolutionRevision.getVersion());
- solutionRevision.setDescription(mlpSolutionRevision.getDescription());
- solutionRevision.setOwnerId(getOwnerId(this.sub));
- solutionRevision.setMetadata(mlpSolutionRevision.getMetadata());
- solutionRevision.setCreated(mlpSolutionRevision.getCreated());
- solutionRevision.setModified(mlpSolutionRevision.getModified());
+ MLPSolutionRevision localSolutionRevision = new MLPSolutionRevision();
+ localSolutionRevision.setSolutionId(peerSolutionRevision.getSolutionId());
+ localSolutionRevision.setRevisionId(peerSolutionRevision.getRevisionId());
+ localSolutionRevision.setVersion(peerSolutionRevision.getVersion());
+ localSolutionRevision.setDescription(peerSolutionRevision.getDescription());
+ localSolutionRevision.setOwnerId(getOwnerId(this.sub));
+ localSolutionRevision.setMetadata(peerSolutionRevision.getMetadata());
+ localSolutionRevision.setCreated(peerSolutionRevision.getCreated());
+ localSolutionRevision.setModified(peerSolutionRevision.getModified());
+ localSolutionRevision.setSourceId(this.peer.getPeerId());
try {
- cdsClient.createSolutionRevision(solutionRevision);
- return solutionRevision;
+ cdsClient.createSolutionRevision(localSolutionRevision);
+ return localSolutionRevision;
}
catch (HttpStatusCodeException restx) {
log.error(EELFLoggerDelegate.errorLogger,
return localMLPArtifact;
}
- private MLPSolution updateMLPSolution(MLPSolution peerMLPSolution, MLPSolution localMLPSolution,
+ private MLPSolution updateMLPSolution(MLPSolution peerSolution, MLPSolution localSolution,
ICommonDataServiceRestClient cdsClient) {
log.info(EELFLoggerDelegate.debugLogger,
- "Updating Local MLP Solution for peer solution " + peerMLPSolution);
+ "Updating Local MLP Solution for peer solution " + peerSolution);
- if (!peerMLPSolution.getSolutionId().equals(localMLPSolution.getSolutionId()))
+ if (!peerSolution.getSolutionId().equals(localSolution.getSolutionId()))
throw new IllegalArgumentException("Local and Peer identifier mismatch");
- localMLPSolution.setSolutionId(peerMLPSolution.getSolutionId());
- localMLPSolution.setName(peerMLPSolution.getName());
- localMLPSolution.setDescription(peerMLPSolution.getDescription());
- localMLPSolution.setAccessTypeCode(peerMLPSolution.getAccessTypeCode());
- localMLPSolution.setMetadata(peerMLPSolution.getMetadata());
- localMLPSolution.setModelTypeCode(peerMLPSolution.getModelTypeCode());
- localMLPSolution.setProvider(peerMLPSolution.getProvider());
- localMLPSolution.setActive(peerMLPSolution.isActive());
- localMLPSolution.setToolkitTypeCode(peerMLPSolution.getToolkitTypeCode());
- // an update needs to be re-validated
- localMLPSolution.setValidationStatusCode(ValidationStatusCode.NV.toString());
+ //localSolution.setSolutionId(peerSolution.getSolutionId());
+ localSolution.setName(peerSolution.getName());
+ localSolution.setDescription(peerSolution.getDescription());
+ localSolution.setAccessTypeCode(peerSolution.getAccessTypeCode());
+ localSolution.setMetadata(peerSolution.getMetadata());
+ localSolution.setModelTypeCode(peerSolution.getModelTypeCode());
+ localSolution.setProvider(peerSolution.getProvider());
+ localSolution.setActive(peerSolution.isActive());
+ localSolution.setToolkitTypeCode(peerSolution.getToolkitTypeCode());
+ // reset validation status to its default
+ localSolution.setValidationStatusCode(this.peer.getValidationStatusCode());
{
String newOwnerId = getOwnerId(this.sub);
- if (!newOwnerId.equals(localMLPSolution.getOwnerId())) {
+ if (!newOwnerId.equals(localSolution.getOwnerId())) {
// is this solution being updated as part of different/new subscription?
- log.warn(EELFLoggerDelegate.errorLogger, "updating solution " + localMLPSolution.getSolutionId()
+ log.warn(EELFLoggerDelegate.errorLogger, "updating solution " + localSolution.getSolutionId()
+ " as part of subscription " + this.sub.getSubId() + " triggers an ownership change");
}
- localMLPSolution.setOwnerId(newOwnerId);
+ localSolution.setOwnerId(newOwnerId);
}
{
- String newSourceId = this.peer.getPeerId();
- if (!newSourceId.equals(localMLPSolution.getSourceId())) {
- // we will see this if a solution is available in more than one peer
- log.warn(EELFLoggerDelegate.errorLogger, "updating solution " + localMLPSolution.getSolutionId()
- + " as part of subscription " + this.sub.getSubId() + " triggers a source change");
+ if (localSolution.getSourceId() == null) {
+ //this is a local solution that made its way back
+ log.info(EELFLoggerDelegate.debugLogger, "Solution " + localSolution.getSolutionId()
+ + " as part of subscription " + this.sub.getSubId() + " was originally provisioned locally");
+ }
+ else {
+ String newSourceId = this.peer.getPeerId();
+ if (!newSourceId.equals(localSolution.getSourceId())) {
+ // we will see this if a solution is available in more than one peer
+ log.warn(EELFLoggerDelegate.errorLogger, "updating solution " + localSolution.getSolutionId()
+ + " as part of subscription " + this.sub.getSubId() + " triggers a source change");
+ }
+ localSolution.setSourceId(newSourceId);
}
- localMLPSolution.setSourceId(newSourceId);
}
try {
- cdsClient.updateSolution(localMLPSolution);
- return localMLPSolution;
+ cdsClient.updateSolution(localSolution);
+ return localSolution;
}
catch (HttpStatusCodeException restx) {
log.error(EELFLoggerDelegate.errorLogger,
package org.acumos.federation.gateway.controller;
import java.net.URI;
+import java.net.URISyntaxException;
+
import java.util.List;
import java.util.Map;
@RequestMapping(value = { API.Paths.SOLUTIONS }, method = RequestMethod.GET, produces = APPLICATION_JSON)
@ResponseBody
public JsonResponse<List<MLPSolution>> getSolutions(
- /* HttpServletRequest theHttpRequest, */
+ HttpServletRequest theHttpRequest,
HttpServletResponse theHttpResponse,
@RequestParam(value = API.QueryParameters.SOLUTIONS_SELECTOR, required = false) String theSelector) {
JsonResponse<List<MLPSolution>> response = null;
selector = Utils.jsonStringToMap(new String(Base64Utils.decodeFromString(theSelector), "UTF-8"));
solutions = catalogService.getSolutions(selector, new ControllerContext());
+ if (solutions != null) {
+ for (MLPSolution solution: solutions) {
+ encodeSolution(solution, theHttpRequest);
+ }
+ }
+
response = JsonResponse.<List<MLPSolution>> buildResponse()
.withMessage("available public solution for given filter")
.withContent(solutions)
@ApiOperation(value = "Invoked by Peer Acumos to get a list detailed solution information from the Catalog of the local Acumos Instance .", response = MLPSolution.class)
@RequestMapping(value = { API.Paths.SOLUTION_DETAILS }, method = RequestMethod.GET, produces = APPLICATION_JSON)
@ResponseBody
- public JsonResponse<MLPSolution> getSolutionDetails(HttpServletResponse theHttpResponse,
+ public JsonResponse<MLPSolution> getSolutionDetails(
+ HttpServletRequest theHttpRequest,
+ HttpServletResponse theHttpResponse,
@PathVariable(value = "solutionId") String theSolutionId) {
JsonResponse<MLPSolution> response = null;
MLPSolution solution = null;
log.debug(EELFLoggerDelegate.debugLogger, API.Paths.SOLUTION_DETAILS + ": " + theSolutionId);
try {
solution = catalogService.getSolution(theSolutionId, new ControllerContext());
+ encodeSolution(solution, theHttpRequest);
response = JsonResponse.<MLPSolution> buildResponse()
.withMessage("solution details")
.withContent(solution)
!context.getPeer().getPeerInfo().isLocal()) {
// re-encode the artifact uri
for (MLPArtifact artifact : solutionRevisionArtifacts) {
- // sooo cumbersome
- URI requestUri = new URI(theHttpRequest.getRequestURL().toString());
- URI artifactUri = API.ARTIFACT_DOWNLOAD
- .buildUri(
- new URI(requestUri.getScheme(), null, requestUri.getHost(),
- requestUri.getPort(), null, null, null).toString(),
- artifact.getArtifactId());
- log.debug(EELFLoggerDelegate.debugLogger, "getSolutionRevisionArtifacts: content uri " + artifactUri);
- artifact.setUri(artifactUri.toString());
+ encodeArtifact(artifact, theHttpRequest);
}
}
response = JsonResponse.<List<MLPArtifact>> buildResponse()
return inputStreamResource;
}
+ /** */
+ private void encodeSolution(MLPSolution theSolution, HttpServletRequest theRequest) throws URISyntaxException {
+ //encode the 'origin'
+ if (null == theSolution.getOrigin()) {
+ URI requestUri = new URI(theRequest.getRequestURL().toString());
+ URI solutionUri = API.SOLUTION_DETAIL
+ .buildUri(
+ new URI(requestUri.getScheme(), null, requestUri.getHost(),
+ requestUri.getPort(), null, null, null).toString(),
+ theSolution.getSolutionId());
+ theSolution.setOrigin(solutionUri.toString());
+ }
+ }
+
+ /** */
+ private void encodeArtifact(MLPArtifact theArtifact, HttpServletRequest theRequest) throws URISyntaxException {
+ URI requestUri = new URI(theRequest.getRequestURL().toString());
+ URI artifactUri = API.ARTIFACT_DOWNLOAD
+ .buildUri(
+ new URI(requestUri.getScheme(), null, requestUri.getHost(),
+ requestUri.getPort(), null, null, null).toString(),
+ theArtifact.getArtifactId());
+ log.debug(EELFLoggerDelegate.debugLogger, "getSolutionRevisionArtifacts: content uri " + artifactUri);
+ theArtifact.setUri(artifactUri.toString());
+ }
}
package org.acumos.federation.gateway.task;
+import java.util.Date;
import java.util.List;
import org.acumos.cds.domain.MLPPeer;
@Autowired
private ApplicationEventPublisher eventPublisher;
- private MLPPeer mlpPeer;
- private MLPPeerSubscription mlpSubscription;
+ private MLPPeer peer;
+ private MLPPeerSubscription subscription;
@Autowired
private Clients clients;
}
public PeerSubscriptionTask handle(MLPPeer peer, MLPPeerSubscription subscription) {
- this.mlpPeer = peer;
- this.mlpSubscription = subscription;
+ this.peer = peer;
+ this.subscription = subscription;
return this;
}
public MLPPeer getPeer() {
- return this.mlpPeer;
+ return this.peer;
}
public MLPPeerSubscription getSubscription() {
- return this.mlpSubscription;
+ return this.subscription;
}
@Override
public void run() {
- if (this.mlpPeer == null || this.mlpSubscription == null) {
+ if (this.peer == null || this.subscription == null) {
log.info(EELFLoggerDelegate.debugLogger, "Peer task has no peer subscription info");
return;
}
try {
- log.info(EELFLoggerDelegate.debugLogger, "Peer task for " + mlpPeer.getName() + ", " + mlpPeer.getApiUrl() + ", " + mlpSubscription.getSelector());
- FederationClient fedClient = clients.getFederationClient(this.mlpPeer.getApiUrl());
+ log.info(EELFLoggerDelegate.debugLogger, "Peer task for peer {}, subscription {}", this.peer.getName(), this.subscription.getSubId());
+ FederationClient fedClient = clients.getFederationClient(this.peer.getApiUrl());
JsonResponse<List<MLPSolution>> response =
- fedClient.getSolutions(Utils.jsonStringToMap(mlpSubscription.getSelector()));
- log.debug(EELFLoggerDelegate.debugLogger,
- "Peer task got response " + response + " for " + mlpPeer.getName() + ", " + mlpPeer.getApiUrl() + ", " + mlpSubscription.getSelector());
+ fedClient.getSolutions(Utils.jsonStringToMap(this.subscription.getSelector()));
+ log.info(EELFLoggerDelegate.debugLogger, "Peer task for peer {}, subscription {} got response {}", this.peer.getName(), this.subscription.getSubId(), response);
if (response != null && response.getContent() != null) {
- List<MLPSolution> mlpSolutions = response.getContent();
- if (mlpSolutions.size() > 0) {
+ List<MLPSolution> solutions = response.getContent();
+ if (solutions.size() > 0) {
this.eventPublisher.publishEvent(
- new PeerSubscriptionEvent(this, this.mlpPeer, this.mlpSubscription, mlpSolutions));
+ new PeerSubscriptionEvent(this, this.peer, this.subscription, solutions));
}
}
+
+ this.subscription.setProcessed(new Date());
+ this.clients.getCDSClient().updatePeerSubscription(this.subscription);
}
catch (Exception x) {
- log.error(EELFLoggerDelegate.errorLogger, "Peer task failed for " + mlpPeer.getName() + ", " + mlpPeer.getApiUrl() + ", " + mlpSubscription.getSelector(), x);
+ log.error(EELFLoggerDelegate.errorLogger, "Peer task failed for " + peer.getName() + ", " + peer.getApiUrl() + ", " + subscription.getSelector(), x);
}
}
}
new PeerTaskHandler().runTask(thePeer, theSub);
}
+ /** */
+ private boolean shouldRun(MLPPeerSubscription theSub) {
+ if (theSub.getRefreshInterval() == null) {
+ //on demand only subscription
+ return false;
+ }
+
+ if (theSub.getRefreshInterval().longValue() == 0 &&
+ theSub.getProcessed() != null) {
+ //one timer that has already been processed
+ return false;
+ }
+
+ return true;
+ }
+
@Scheduled(initialDelay = 5000, fixedRateString = "${peer.jobchecker.interval:400}000")
public void checkPeerJobs() {
log.debug(EELFLoggerDelegate.debugLogger, "checkPeerSubscriptionJobs");
// Get the List of MLP Peers
- List<MLPPeer> mlpPeers = peerService.getPeers();
- if (Utils.isEmptyList(mlpPeers)) {
+ List<MLPPeer> peers = peerService.getPeers();
+ if (Utils.isEmptyList(peers)) {
log.info(EELFLoggerDelegate.debugLogger, "no peers from " + peerService);
return;
}
- for (MLPPeer mlpPeer : mlpPeers) {
- log.info(EELFLoggerDelegate.debugLogger, "checkPeer : " + mlpPeer);
+ for (MLPPeer peer : peers) {
+ log.info(EELFLoggerDelegate.debugLogger, "check peer {}", peer);
- if (mlpPeer.isSelf())
+ if (peer.isSelf())
continue;
// cancel peer tasks for inactive peers
- if (PeerStatus.Active != PeerStatus.forCode(mlpPeer.getStatusCode())) {
+ if (PeerStatus.Active != PeerStatus.forCode(peer.getStatusCode())) {
// cancel all peer sub tasks for this peer
log.debug(EELFLoggerDelegate.debugLogger,
- "checkPeer : peer " + mlpPeer + " no longer active, removing active tasks");
- Map<Long, PeerTaskHandler> subsTask = this.peersSubsTask.row(mlpPeer.getPeerId());
+ "peer {} no longer active, removing active tasks", peer);
+ Map<Long, PeerTaskHandler> subsTask = this.peersSubsTask.row(peer.getPeerId());
if (subsTask != null) {
for (Map.Entry<Long, PeerTaskHandler> subTaskEntry : subsTask.entrySet()) {
subTaskEntry.getValue().stopTask();
- this.peersSubsTask.remove(mlpPeer.getPeerId(), subTaskEntry.getKey());
+ this.peersSubsTask.remove(peer.getPeerId(), subTaskEntry.getKey());
}
}
continue;
}
- List<MLPPeerSubscription> mlpSubs = peerSubscriptionService.getPeerSubscriptions(mlpPeer.getPeerId());
- if (Utils.isEmptyList(mlpSubs)) {
+ List<MLPPeerSubscription> subs = peerSubscriptionService.getPeerSubscriptions(peer.getPeerId());
+ if (Utils.isEmptyList(subs)) {
// the peer is still there but has no subscriptions: cancel any ongoing tasks
continue;
}
- for (MLPPeerSubscription mlpSub : mlpSubs) {
- log.info(EELFLoggerDelegate.debugLogger, "checkSub " + mlpSub);
- PeerTaskHandler peerSubTask = peersSubsTask.get(mlpPeer.getPeerId(), mlpSub.getSubId());
+ for (MLPPeerSubscription sub : subs) {
+ log.info(EELFLoggerDelegate.debugLogger, "checkSub " + sub);
+ PeerTaskHandler peerSubTask = peersSubsTask.get(peer.getPeerId(), sub.getSubId());
if (peerSubTask != null) {
// was the subscription updated? if yes, cancel current task.
- MLPPeerSubscription mlpCurrentSub = peerSubTask.getSubscription();
- if (!same(mlpSub, mlpCurrentSub)) {
+ MLPPeerSubscription currentSub = peerSubTask.getSubscription();
+ if (!same(sub, currentSub)) {
log.debug(EELFLoggerDelegate.debugLogger,
- "checkSub: subscription was updated, stopping current task");
+ "subscription {} was updated, stopping current task", sub.getSubId());
peerSubTask.stopTask();
peerSubTask = null; // in order to trigger its reset below: no need to remove the entry as we
// are about
}
}
- if (peerSubTask == null) {
- log.info(EELFLoggerDelegate.debugLogger, "Scheduled peer sub task for " + mlpPeer.getApiUrl());
- this.peersSubsTask.put(mlpPeer.getPeerId(), mlpSub.getSubId(),
- new PeerTaskHandler().startTask(mlpPeer, mlpSub));
+ if (peerSubTask == null && shouldRun(sub)) {
+ log.info(EELFLoggerDelegate.debugLogger, "Scheduling peer sub task for peer {}, subscription {}", peer.getName(), sub.getSubId());
+ PeerTaskHandler hnd = new PeerTaskHandler().startTask(peer, sub);
+ if (hnd != null) {
+ this.peersSubsTask.put(peer.getPeerId(), sub.getSubId(), hnd);
+ }
}
}
}
public synchronized PeerTaskHandler startTask(MLPPeer thePeer, MLPPeerSubscription theSub) {
Long refreshInterval = theSub.getRefreshInterval();
- if (refreshInterval == null)
- return null;
-
+
this.task = (PeerSubscriptionTask) PeerSubscriptionTaskScheduler.this.appCtx.getBean("peerSubscriptionTask");
- if (refreshInterval.longValue() == 0)
+ if (refreshInterval.longValue() == 0) {
this.future = PeerSubscriptionTaskScheduler.this.threadPoolTaskScheduler
.schedule(this.task.handle(thePeer, theSub), new Date(System.currentTimeMillis() + 5000));
- else
+ }
+ else {
this.future = PeerSubscriptionTaskScheduler.this.threadPoolTaskScheduler
.scheduleAtFixedRate(this.task.handle(thePeer, theSub), 1000 * refreshInterval.longValue() );
+ }
return this;
}
MLPSolution peerImport = peerImports.get(solution.getSolutionId());
if (peerImport == null) {
- log.debug(EELFLoggerDelegate.debugLogger, "New solution");
+ log.info(EELFLoggerDelegate.debugLogger, "New solution {}", solution.getSolutionId());
peerImports.put(solution.getSolutionId(), solution);
}
else {
- log.debug(EELFLoggerDelegate.debugLogger, "Existing solution");
+ log.info(EELFLoggerDelegate.debugLogger, "Existing solution {}", solution.getSolutionId());
if (peerImport.getModified().equals(solution.getModified())) {
- log.debug(EELFLoggerDelegate.debugLogger, "No updates");
+ log.info(EELFLoggerDelegate.debugLogger, "No updates to solution {}", solution.getSolutionId());
+ continue;
}
else {
- log.debug(EELFLoggerDelegate.debugLogger, "Has updates");
+ log.info(EELFLoggerDelegate.debugLogger, "Solution {} has updates", solution.getSolutionId());
}
}
log.error(EELFLoggerDelegate.errorLogger, "Failed to retrieve revisions", x);
continue;
}
- log.debug(EELFLoggerDelegate.debugLogger,
- "Received " + revisions.size() + " revisions " + revisions);
+ log.info(EELFLoggerDelegate.debugLogger,
+ "Received {} revisions {}", revisions.size(), revisions);
List<MLPArtifact> artifacts = null;
try {
continue;
}
log.info(EELFLoggerDelegate.debugLogger,
- "Received " + artifacts.size() + " artifacts " + artifacts);
+ "Received {} artifacts {}", artifacts.size(), artifacts);
for (MLPArtifact artifact : artifacts) {
Resource artifactContent = null;