2 * ===============LICENSE_START=======================================================
4 * ===================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6 * ===================================================================================
7 * This Acumos software file is distributed by AT&T and Tech Mahindra
8 * under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * This file is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ===============LICENSE_END=========================================================
21 package org.acumos.federation.gateway.adapter;
23 import java.io.BufferedReader;
24 import java.io.InputStreamReader;
26 import java.util.List;
27 import java.util.stream.Collectors;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
32 import org.acumos.federation.gateway.common.GatewayCondition;
33 import org.acumos.federation.gateway.config.EELFLoggerDelegate;
34 import org.acumos.federation.gateway.event.PeerSubscriptionEvent;
35 import org.acumos.federation.gateway.service.impl.Clients;
36 import org.acumos.federation.gateway.service.impl.FederationClient;
37 import org.acumos.federation.gateway.util.Errors;
38 import org.acumos.federation.gateway.util.Utils;
40 import org.acumos.nexus.client.data.UploadArtifactInfo;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.beans.factory.BeanInitializationException;
44 //import org.springframework.scheduling.annotation.Scheduled;
45 import org.springframework.boot.context.properties.ConfigurationProperties;
46 import org.springframework.context.annotation.Conditional;
47 import org.springframework.context.event.EventListener;
48 import org.springframework.core.env.Environment;
49 import org.springframework.core.task.TaskExecutor;
50 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
51 import org.springframework.stereotype.Component;
52 import org.springframework.web.client.HttpStatusCodeException;
53 import org.springframework.web.client.HttpClientErrorException;
54 import org.springframework.core.io.Resource;
55 import org.springframework.http.HttpStatus;
57 import org.acumos.cds.AccessTypeCode;
58 import org.acumos.cds.ValidationStatusCode;
59 import org.acumos.cds.client.CommonDataServiceRestClientImpl;
60 import org.acumos.cds.client.ICommonDataServiceRestClient;
61 import org.acumos.cds.domain.MLPArtifact;
62 import org.acumos.cds.domain.MLPPeer;
63 import org.acumos.cds.domain.MLPPeerSubscription;
64 import org.acumos.cds.domain.MLPSolution;
65 import org.acumos.cds.domain.MLPSolutionRevision;
68 @Component("peergateway")
70 @ConfigurationProperties(prefix="federation")
71 @Conditional(GatewayCondition.class)
72 public class PeerGateway {
74 private final EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(PeerGateway.class);
75 private String operator;
76 private TaskExecutor taskExecutor;
78 private Environment env;
80 private Clients clients;
84 public void initGateway() {
85 logger.debug(EELFLoggerDelegate.debugLogger, "initPeerGateway");
87 /* make sure an operator was specified and that it is a declared user */
88 if (null == this.env.getProperty("federation.operator")) {
89 throw new BeanInitializationException("Missing 'federation.operator' configuration");
93 if (null == this.clients.getClient().getUser(this.env.getProperty("federation.operator"))) {
94 logger.warn(EELFLoggerDelegate.errorLogger, "'federation.operator' does not point to an existing user");
97 catch (/*HttpStatusCode*/Exception dx) {
98 logger.warn(EELFLoggerDelegate.errorLogger, "failed to verify 'federation.operator' value", dx);
102 this.taskExecutor = new ThreadPoolTaskExecutor();
103 ((ThreadPoolTaskExecutor)this.taskExecutor).setCorePoolSize(1);
104 ((ThreadPoolTaskExecutor)this.taskExecutor).setMaxPoolSize(1);
105 ((ThreadPoolTaskExecutor)this.taskExecutor).setQueueCapacity(25);
106 ((ThreadPoolTaskExecutor)this.taskExecutor).initialize();
109 logger.debug(EELFLoggerDelegate.debugLogger, "PeerGateway available");
113 public void cleanupGateway() {
114 logger.debug(EELFLoggerDelegate.debugLogger, "PeerGateway destroyed");
117 protected String getOwnerId(MLPPeerSubscription theSubscription/*,
118 MLPSolution theSolution*/) {
119 // Need to get from c_user table . It has to be admin user
120 return this.env.getProperty("federation.operator");
124 public void handlePeerSubscriptionUpdate(PeerSubscriptionEvent theEvent) {
125 logger.info(EELFLoggerDelegate.debugLogger, "received peer subscription update event " + theEvent);
126 taskExecutor.execute(
127 new PeerGatewayUpdateTask(theEvent.getPeer(),
128 theEvent.getSubscription(),
129 theEvent.getSolutions()));
133 public class PeerGatewayUpdateTask implements Runnable {
135 private MLPPeer peer;
136 private MLPPeerSubscription sub;
137 private List<MLPSolution> solutions;
140 public PeerGatewayUpdateTask(MLPPeer thePeer,
141 MLPPeerSubscription theSub,
142 List<MLPSolution> theSolutions) {
145 this.solutions = theSolutions;
150 //list with category and subcategory currently used for onap
151 //more dynamic mapping to come: based on solution information it will provide sdc assettype, categoty and subcategoty
152 ICommonDataServiceRestClient cdsClient = PeerGateway.this.clients.getClient();
154 logger.info(EELFLoggerDelegate.debugLogger, "Received peer " + this.peer + " solutions: " + this.solutions);
156 for (MLPSolution acumosSolution: this.solutions) {
158 //Check if the Model already exists in the Local Acumos
159 MLPSolution mlpSolution = null;
161 mlpSolution = cdsClient.getSolution(acumosSolution.getSolutionId());
162 } catch (Exception e) {
163 logger.info(EELFLoggerDelegate.debugLogger, "Solution Id : " + acumosSolution.getSolutionId() + " does not exists locally, Adding it to local catalog ");
166 //Verify if MLPSolution is not same
167 if(mlpSolution != null &&
168 isSameMLPSolution(acumosSolution, mlpSolution)) {
169 //if already exists locally then loop through next
170 mlpSolution = updateMLPSolution(acumosSolution, mlpSolution, cdsClient);
174 mlpSolution = createMLPSolution(acumosSolution, cdsClient);
177 if (mlpSolution != null) {
178 updateMLPSolution(mlpSolution, cdsClient);
181 catch (Exception x) {
182 logger.warn(EELFLoggerDelegate.debugLogger, "Mapping of acumos solution failed for: " + acumosSolution, x);
187 private MLPSolution createMLPSolution(
188 MLPSolution peerMLPSolution,
189 ICommonDataServiceRestClient cdsClient) {
190 logger.info(EELFLoggerDelegate.debugLogger, "Creating Local MLP Solution for peer solution " + peerMLPSolution);
191 MLPSolution mlpSolution = new MLPSolution();
192 mlpSolution.setSolutionId(peerMLPSolution.getSolutionId());
193 mlpSolution.setName(peerMLPSolution.getName());
194 mlpSolution.setDescription(peerMLPSolution.getDescription());
195 mlpSolution.setAccessTypeCode(AccessTypeCode.PB.toString());
196 mlpSolution.setMetadata(peerMLPSolution.getMetadata());
197 mlpSolution.setModelTypeCode(peerMLPSolution.getModelTypeCode());
198 mlpSolution.setProvider("ATTAcumosInc");
199 mlpSolution.setActive(peerMLPSolution.isActive());
200 mlpSolution.setToolkitTypeCode(peerMLPSolution.getToolkitTypeCode());
201 mlpSolution.setValidationStatusCode(ValidationStatusCode.PS.toString());
202 mlpSolution.setCreated(peerMLPSolution.getCreated());
203 mlpSolution.setModified(peerMLPSolution.getModified());
204 mlpSolution.setOwnerId(getOwnerId(this.sub));
206 cdsClient.createSolution(mlpSolution);
209 catch (HttpStatusCodeException restx) {
210 logger.error(EELFLoggerDelegate.debugLogger, "createSolution CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
213 catch (Exception x) {
214 logger.error(EELFLoggerDelegate.debugLogger, "createMLPSolution unexpected failure", x);
219 private MLPSolutionRevision createMLPSolutionRevision(
220 MLPSolutionRevision mlpSolutionRevision,
221 ICommonDataServiceRestClient cdsClient) {
222 MLPSolutionRevision solutionRevision = new MLPSolutionRevision();
223 solutionRevision.setSolutionId(mlpSolutionRevision.getSolutionId());
224 solutionRevision.setRevisionId(mlpSolutionRevision.getRevisionId());
225 solutionRevision.setVersion(mlpSolutionRevision.getVersion());
226 solutionRevision.setDescription(mlpSolutionRevision.getDescription());
227 solutionRevision.setOwnerId(getOwnerId(this.sub));
228 solutionRevision.setMetadata(mlpSolutionRevision.getMetadata());
229 solutionRevision.setCreated(mlpSolutionRevision.getCreated());
230 solutionRevision.setModified(mlpSolutionRevision.getModified());
232 cdsClient.createSolutionRevision(solutionRevision);
233 return solutionRevision;
235 catch (HttpStatusCodeException restx) {
236 logger.error(EELFLoggerDelegate.debugLogger, "createSolutionRevision CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
239 catch (Exception x) {
240 logger.error(EELFLoggerDelegate.debugLogger, "createSolutionRevision unexpected failure", x);
245 private MLPArtifact createMLPArtifact(
246 String theSolutionId,
247 String theRevisionId,
248 MLPArtifact mlpArtifact,
249 ICommonDataServiceRestClient cdsClient) {
250 MLPArtifact artifact = new MLPArtifact();
251 artifact.setArtifactId(mlpArtifact.getArtifactId());
252 artifact.setArtifactTypeCode(mlpArtifact.getArtifactTypeCode());
253 artifact.setCreated(mlpArtifact.getCreated());
254 artifact.setDescription(mlpArtifact.getDescription());
255 artifact.setMetadata(mlpArtifact.getMetadata());
256 artifact.setModified(mlpArtifact.getModified());
257 artifact.setName(mlpArtifact.getName());
258 artifact.setOwnerId(getOwnerId(this.sub));
259 artifact.setSize(mlpArtifact.getSize());;
260 artifact.setUri(mlpArtifact.getUri());
261 artifact.setVersion(mlpArtifact.getVersion());
263 cdsClient.createArtifact(artifact);
264 cdsClient.addSolutionRevisionArtifact(theSolutionId, theRevisionId, mlpArtifact.getArtifactId());
267 catch (HttpStatusCodeException restx) {
268 logger.error(EELFLoggerDelegate.debugLogger, "createArtifact CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
271 catch (Exception x) {
272 logger.error(EELFLoggerDelegate.debugLogger, "createArtifact unexpected failure", x);
277 private MLPArtifact updateMLPArtifact(MLPArtifact peerMLPArtifact, MLPArtifact localMLPArtifact, ICommonDataServiceRestClient cdsClient) {
278 logger.info(EELFLoggerDelegate.debugLogger, "Updating Local MLP Artifact for peer artifact " + peerMLPArtifact);
280 localMLPArtifact.setArtifactId(peerMLPArtifact.getArtifactId());
281 localMLPArtifact.setArtifactTypeCode(peerMLPArtifact.getArtifactTypeCode());
282 localMLPArtifact.setCreated(peerMLPArtifact.getCreated());
283 localMLPArtifact.setDescription(peerMLPArtifact.getDescription());
284 localMLPArtifact.setMetadata(peerMLPArtifact.getMetadata());
285 localMLPArtifact.setModified(peerMLPArtifact.getModified());
286 localMLPArtifact.setName(peerMLPArtifact.getName());
287 localMLPArtifact.setOwnerId(getOwnerId(this.sub));
288 localMLPArtifact.setSize(peerMLPArtifact.getSize());;
289 localMLPArtifact.setUri(peerMLPArtifact.getUri());
290 localMLPArtifact.setVersion(peerMLPArtifact.getVersion());
292 cdsClient.updateArtifact(localMLPArtifact);
293 return localMLPArtifact;
295 catch (HttpStatusCodeException restx) {
296 logger.error(EELFLoggerDelegate.debugLogger, "updateArtifact CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
299 catch (Exception x) {
300 logger.error(EELFLoggerDelegate.debugLogger, "updateArtifact unexpected failure", x);
305 private MLPSolution updateMLPSolution(MLPSolution peerMLPSolution, MLPSolution localMLPSolution, ICommonDataServiceRestClient cdsClient) {
306 logger.info(EELFLoggerDelegate.debugLogger, "Updating Local MLP Solution for peer solution " + peerMLPSolution);
307 localMLPSolution.setSolutionId(peerMLPSolution.getSolutionId());
308 localMLPSolution.setName(peerMLPSolution.getName());
309 localMLPSolution.setDescription(peerMLPSolution.getDescription());
310 localMLPSolution.setAccessTypeCode(peerMLPSolution.getAccessTypeCode());
311 localMLPSolution.setMetadata(peerMLPSolution.getMetadata());
312 localMLPSolution.setModelTypeCode(peerMLPSolution.getModelTypeCode());
313 localMLPSolution.setProvider(peerMLPSolution.getProvider());
314 localMLPSolution.setActive(peerMLPSolution.isActive());
315 localMLPSolution.setToolkitTypeCode(peerMLPSolution.getToolkitTypeCode());
316 localMLPSolution.setValidationStatusCode(localMLPSolution.getValidationStatusCode());
317 localMLPSolution.setOwnerId(getOwnerId(this.sub));
320 cdsClient.updateSolution(localMLPSolution);
321 return localMLPSolution;
323 catch (HttpStatusCodeException restx) {
324 logger.error(EELFLoggerDelegate.debugLogger, "updateSolution CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
327 catch (Exception x) {
328 logger.error(EELFLoggerDelegate.debugLogger, "updateSolution unexpected failure", x);
333 private boolean isSameMLPSolution(MLPSolution peerMLPSolution, MLPSolution localMLPSolution) {
334 boolean isSame = false;
335 if(peerMLPSolution != null && localMLPSolution != null) {
337 if((!Utils.isEmptyOrNullString(peerMLPSolution.getName()) && !Utils.isEmptyOrNullString(localMLPSolution.getName()) && localMLPSolution.getName().equalsIgnoreCase(peerMLPSolution.getName()))
338 || (!Utils.isEmptyOrNullString(peerMLPSolution.getDescription()) && !Utils.isEmptyOrNullString(localMLPSolution.getDescription()) && localMLPSolution.getDescription().equalsIgnoreCase(peerMLPSolution.getDescription()))
339 || (!Utils.isEmptyOrNullString(peerMLPSolution.getAccessTypeCode()) && !Utils.isEmptyOrNullString(localMLPSolution.getAccessTypeCode()) && localMLPSolution.getAccessTypeCode().equalsIgnoreCase(peerMLPSolution.getAccessTypeCode()))
340 || (!Utils.isEmptyOrNullString(peerMLPSolution.getMetadata()) && !Utils.isEmptyOrNullString(localMLPSolution.getMetadata()) && localMLPSolution.getMetadata().equalsIgnoreCase(peerMLPSolution.getMetadata()))
341 || (!Utils.isEmptyOrNullString(peerMLPSolution.getModelTypeCode()) && !Utils.isEmptyOrNullString(localMLPSolution.getModelTypeCode()) && localMLPSolution.getModelTypeCode().equalsIgnoreCase(peerMLPSolution.getModelTypeCode()))
342 || (!Utils.isEmptyOrNullString(peerMLPSolution.getProvider()) && !Utils.isEmptyOrNullString(localMLPSolution.getProvider()) && localMLPSolution.getProvider().equalsIgnoreCase(peerMLPSolution.getProvider()))
343 || (!Utils.isEmptyOrNullString(peerMLPSolution.getToolkitTypeCode()) && !Utils.isEmptyOrNullString(localMLPSolution.getToolkitTypeCode()) && localMLPSolution.getToolkitTypeCode().equalsIgnoreCase(peerMLPSolution.getToolkitTypeCode()))
344 || (Utils.isEmptyOrNullString(peerMLPSolution.getMetadata()) && Utils.isEmptyOrNullString(localMLPSolution.getMetadata()))
345 || (Utils.isEmptyOrNullString(peerMLPSolution.getDescription()) && Utils.isEmptyOrNullString(localMLPSolution.getDescription()))
346 || (Utils.isEmptyOrNullString(peerMLPSolution.getAccessTypeCode()) && Utils.isEmptyOrNullString(localMLPSolution.getAccessTypeCode()))
347 || (Utils.isEmptyOrNullString(peerMLPSolution.getModelTypeCode()) && Utils.isEmptyOrNullString(localMLPSolution.getModelTypeCode()))
348 || (Utils.isEmptyOrNullString(peerMLPSolution.getToolkitTypeCode()) && Utils.isEmptyOrNullString(localMLPSolution.getToolkitTypeCode()))) {
355 public void updateMLPSolution(MLPSolution theSolution, ICommonDataServiceRestClient cdsClient) throws Exception {
356 FederationClient fedClient =
357 clients.getFederationClient(this.peer.getApiUrl());
360 List<MLPSolutionRevision> peerRevisions = null;
362 peerRevisions = (List<MLPSolutionRevision>)
363 fedClient.getSolutionRevisions(theSolution.getSolutionId()).getResponseBody();
365 catch (Exception x) {
366 logger.warn(EELFLoggerDelegate.debugLogger, "Failed to retrieve acumos revisions", x);
370 //check if we have locally the latest revision available on the peer
371 //TODO: this is just one possible policy regarding the handling of
373 MLPSolutionRevision localRevision = null;
376 cdsClient.getSolutionRevision(
377 theSolution.getSolutionId(),
378 peerRevisions.get(peerRevisions.size()-1).getRevisionId());
380 catch (HttpStatusCodeException restx) {
381 if (!Errors.isCDSNotFound(restx)) {
382 logger.error(EELFLoggerDelegate.debugLogger, "getSolutionRevision CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
387 if(localRevision == null) {
388 localRevision = createMLPSolutionRevision(
389 peerRevisions.get(peerRevisions.size()-1), cdsClient);
392 //update the revision information
395 //continue to verify that we have the latest version of the artifacts
397 List<MLPArtifact> peerArtifacts = null;
399 peerArtifacts = (List<MLPArtifact>)
400 fedClient.getArtifacts(
401 theSolution.getSolutionId(),
402 peerRevisions.get(peerRevisions.size()-1).getRevisionId())
405 catch (Exception x) {
406 logger.warn(EELFLoggerDelegate.debugLogger, "Failed to retrieve peer acumos artifacts", x);
410 if(localRevision != null) {
411 for(MLPArtifact peerArtifact : peerArtifacts) {
412 MLPArtifact localArtifact = null;
415 cdsClient.getArtifact(peerArtifact.getArtifactId());
417 catch (HttpStatusCodeException restx) {
418 if (!Errors.isCDSNotFound(restx)) {
419 logger.error(EELFLoggerDelegate.debugLogger, "getArtifact CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
424 if(localArtifact == null) {
425 localArtifact = createMLPArtifact(
426 theSolution.getSolutionId(),
427 localRevision.getRevisionId(),
432 //an update might not actually be necessary but we cannot compare
433 //timestamps as they are locally generated
434 localArtifact = updateMLPArtifact(peerArtifact, localArtifact, cdsClient);
437 //TODO: add the delete of those who are not available anymore
439 //if (localArtifact == null) {
440 //not transactional .. hard to recover from, we'll re-attempt
441 //next time we process the enclosing solution/revision (should be
442 //marked accordingly)
443 //if anything happened an exception
446 //artifacts file download and push it to nexus: we continue here
447 //as we persisted the peer URI
448 Resource artifactContent = null;
450 artifactContent = fedClient.downloadArtifact(peerArtifact.getArtifactId());
452 catch (Exception x) {
453 logger.warn(EELFLoggerDelegate.debugLogger, "Failed to retrieve acumos artifact content", x);
456 UploadArtifactInfo uploadInfo = null;
457 if (artifactContent != null) {
460 PeerGateway.this.clients.getNexusClient()
462 PeerGateway.this.env.getProperty("nexus.groupId"),
463 localArtifact.getName(), /* probably wrong */
464 localArtifact.getVersion(),
465 "", /* should receive this from peer */
466 artifactContent.contentLength(),
467 artifactContent.getInputStream());
469 catch (Exception x) {
470 logger.warn(EELFLoggerDelegate.debugLogger, "Failed to push artifact content to local Nexus repo", x);
474 if (uploadInfo != null) {
475 //update artifact with local repo reference
476 localArtifact.setUri(uploadInfo.getArtifactMvnPath());
478 cdsClient.updateArtifact(localArtifact);
480 catch (HttpStatusCodeException restx) {
481 logger.error(EELFLoggerDelegate.debugLogger, "updateArtifact CDS call failed. CDS message is " + restx.getResponseBodyAsString(), restx);
483 catch (Exception x) {
484 logger.error(EELFLoggerDelegate.debugLogger, "updateArtifact unexpected failure", x);