Fix missing information in federated solutions 26/2826/2
authorSerban Jora <sj2381@att.com>
Thu, 13 Sep 2018 15:38:06 +0000 (11:38 -0400)
committerSerban Jora <sj2381@att.com>
Thu, 13 Sep 2018 15:52:35 +0000 (11:52 -0400)
Change-Id: I834eb3dfe03176cabe7084a6c5de99c1b5de1e63
Issue-ID: ACUMOS-1690
Signed-off-by: Serban Jora <sj2381@att.com>
docs/release-notes.rst
gateway/pom.xml
gateway/src/main/java/org/acumos/federation/gateway/adapter/PeerGateway.java
gateway/src/main/java/org/acumos/federation/gateway/service/impl/CatalogServiceImpl.java
gateway/src/main/java/org/acumos/federation/gateway/task/PeerSubscriptionTaskScheduler.java
gateway/src/main/java/org/acumos/federation/gateway/task/TaskConfiguration.java
gateway/src/test/java/org/acumos/federation/gateway/test/PeerGatewayTest.java
gateway/src/test/resources/mockPeerSolutionResponse.json

index f031989..92e1088 100644 (file)
@@ -22,6 +22,13 @@ Federated Gateway Release Notes
 
 The Federated Gateway server is available as a Docker image in a Docker registry.
 
+Version 1.18.2, 2018-09-13
+-------------------------
+
+* Rely on solution detail API for mapping (ACUMS-1690)
+* Allow configuration of underlying executor and scheduler
+* Do not overwrite user during mapping for local solutions
+
 Version 1.18.1, 2018-09-05
 -------------------------
 
index 8f9654d..b75df19 100644 (file)
@@ -24,7 +24,7 @@ limitations under the License.
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.acumos.federation</groupId>
        <artifactId>gateway</artifactId>
-       <version>1.18.1-SNAPSHOT</version>
+       <version>1.18.2-SNAPSHOT</version>
        <name>Federation Gateway</name>
        <description>Federated Acumos Interface for inter-acumos and ONAP communication</description>
 
index 5592bc7..7f80c44 100644 (file)
@@ -74,6 +74,7 @@ import org.springframework.web.client.HttpStatusCodeException;
 public class PeerGateway {
 
        private final EELFLoggerDelegate log = EELFLoggerDelegate.getLogger(PeerGateway.class);
+       @Autowired
        private TaskExecutor taskExecutor;
        @Autowired
        private Environment env;
@@ -109,19 +110,13 @@ public class PeerGateway {
                        }
                }
 
-               this.taskExecutor = new ThreadPoolTaskExecutor();
-               ((ThreadPoolTaskExecutor) this.taskExecutor).setCorePoolSize(1);
-               ((ThreadPoolTaskExecutor) this.taskExecutor).setMaxPoolSize(1);
-               ((ThreadPoolTaskExecutor) this.taskExecutor).setQueueCapacity(25);
-               ((ThreadPoolTaskExecutor) this.taskExecutor).initialize();
-
                // Done
-               log.trace(EELFLoggerDelegate.debugLogger, "PeerGateway available");
+               log.debug(EELFLoggerDelegate.debugLogger, "PeerGateway available");
        }
 
        @PreDestroy
        public void cleanupGateway() {
-               log.trace(EELFLoggerDelegate.debugLogger, "PeerGateway destroyed");
+               log.debug(EELFLoggerDelegate.debugLogger, "PeerGateway destroyed");
        }
 
        protected String getUserId(MLPPeerSubscription theSubscription/*
@@ -161,25 +156,10 @@ public class PeerGateway {
                        ServiceContext ctx = catalog.selfService();
 
                        for (MLPSolution peerSolution : this.solutions) {
-                               // Check if the Model already exists in the Local Acumos
-                               MLPSolution localSolution = null;
                                log.info(EELFLoggerDelegate.debugLogger, "Processing peer solution {}", peerSolution);
 
                                try {
-                                       localSolution = catalog.putSolution(
-                                                                                                                               Solution.buildFrom(peerSolution)
-                                                                                                                                       .withUser(getUserId(this.sub))
-                                                                                                                                       .withSource(this.peer.getPeerId())
-                                                                                                                                       .build(), ctx);
-                               }
-                               catch (ServiceException sx) {
-                                       log.error(EELFLoggerDelegate.errorLogger,
-                                                       "Failed to put solution {} into local catalog", peerSolution, sx);
-                                       continue;
-                               }
-
-                               try {
-                                       mapSolution(localSolution, ctx);
+                                       mapSolution(peerSolution, ctx);
                                }
                                catch (Throwable t) {
                                        log.error(EELFLoggerDelegate.errorLogger,
@@ -272,17 +252,18 @@ public class PeerGateway {
 
                        FederationClient fedClient = clients.getFederationClient(this.peer.getApiUrl());
 
-                       // get revisions
-                       List<MLPSolutionRevision> peerRevisions = null;
-                       try {
-                               peerRevisions = (List<MLPSolutionRevision>) fedClient.getSolutionRevisions(theSolution.getSolutionId())
-                                               .getContent();
-                       }
-                       catch (Exception x) {
-                               log.warn(EELFLoggerDelegate.errorLogger, "Failed to retrieve acumos revisions for solution "
-                                               + theSolution.getSolutionId() + " from peer " + this.peer, x);
-                               throw x;
-                       }
+                       Solution localSolution,
+                                                        peerSolution;
+
+                       //retrieve the full representation from the peer
+                       peerSolution = (Solution)fedClient.getSolution(theSolution.getSolutionId()).getContent();
+                       localSolution = catalog.putSolution(
+                                                                                                                                       Solution.buildFrom(peerSolution)
+                                                                                                                                               .withUser(getUserId(this.sub))
+                                                                                                                                               .withSource(this.peer.getPeerId())
+                                                                                                                                               .build(), theContext);
+
+                       List<MLPSolutionRevision> peerRevisions = (List)peerSolution.getRevisions();
 
                        // this should not happen as any solution should have at least one
                        // revision (but that's an assumption on how on-boarding works)
@@ -294,7 +275,7 @@ public class PeerGateway {
                        // check if we have locally the latest revision available on the peer
                        List<MLPSolutionRevision> catalogRevisions = Collections.EMPTY_LIST;
                        try {
-                               catalogRevisions = catalog.getSolutionRevisions(theSolution.getSolutionId(), theContext);
+                               catalogRevisions = catalog.getSolutionRevisions(localSolution.getSolutionId(), theContext);
                        }
                        catch (ServiceException sx) {
                                log.error(EELFLoggerDelegate.errorLogger,
@@ -321,7 +302,7 @@ public class PeerGateway {
                                //revision related information (artifacts/documents/description/..) is now embedded in the revision details
                                //federation api call so one call is all is needed      
                                try {
-                                       peerRevision = fedClient.getSolutionRevision(theSolution.getSolutionId(), peerRevision.getRevisionId())
+                                       peerRevision = fedClient.getSolutionRevision(peerSolution.getSolutionId(), peerRevision.getRevisionId())
                                                                                                                                                .getContent();
                                }
                                catch (Exception x) {
@@ -363,7 +344,7 @@ public class PeerGateway {
                                        boolean doUpdate = false;
 
                                        if (localArtifact == null) {
-                                               localArtifact = createMLPArtifact(theSolution.getSolutionId(), localRevision.getRevisionId(),
+                                               localArtifact = createMLPArtifact(localSolution.getSolutionId(), localRevision.getRevisionId(),
                                                                peerArtifact, theContext);
                                        }
                                        else {
@@ -384,7 +365,7 @@ public class PeerGateway {
                                                Resource artifactContent = null;
                                                try {
                                                        artifactContent = fedClient.getArtifactContent(
-                                                               theSolution.getSolutionId(), localRevision.getRevisionId(), peerArtifact.getArtifactId());
+                                                               peerSolution.getSolutionId(), peerRevision.getRevisionId(), peerArtifact.getArtifactId());
                                                        log.info(EELFLoggerDelegate.debugLogger, "Received {} bytes of artifact content", artifactContent.contentLength()); 
                                                }
                                                catch (Exception x) {
@@ -394,7 +375,7 @@ public class PeerGateway {
                                                if (artifactContent != null) {
                                                        try {
                                                                content.putArtifactContent(
-                                                                       theSolution.getSolutionId(), localRevision.getRevisionId(), localArtifact, artifactContent);
+                                                                       localSolution.getSolutionId(), localRevision.getRevisionId(), localArtifact, artifactContent);
                                                                doUpdate = true;
                                                        }
                                                        catch (ServiceException sx) {
@@ -432,12 +413,13 @@ public class PeerGateway {
                                        boolean doUpdate = false;
 
                                        if (localDocument == null) {
-                                               localDocument = createMLPDocument(theSolution.getSolutionId(), localRevision.getRevisionId(),
+                                               localDocument = createMLPDocument(localSolution.getSolutionId(), localRevision.getRevisionId(),
                                                                peerDocument, theContext);
                                        }
                                        else {
-                                               //what if the local document has been modified past the last fetch ??
-                                               if (!peerDocument.getVersion().equals(localDocument.getVersion())) {
+                                               //version strings are not standard so comparing them is not necessarly safe
+                                               if (peerDocument.getVersion() != null && localDocument.getVersion() != null &&
+                                                               !peerDocument.getVersion().equals(localDocument.getVersion())) {
                                                        // update local doc
                                                        localDocument = copyMLPDocument(peerDocument, localDocument);
                                                        doUpdate = true;
@@ -454,7 +436,7 @@ public class PeerGateway {
                                                Resource documentContent = null;
                                                try {
                                                        documentContent = fedClient.getDocumentContent(
-                                                               theSolution.getSolutionId(), localRevision.getRevisionId(), peerDocument.getDocumentId());
+                                                               peerSolution.getSolutionId(), localRevision.getRevisionId(), peerDocument.getDocumentId());
                                                        log.info(EELFLoggerDelegate.debugLogger, "Received {} bytes of document content", documentContent.contentLength()); 
                                                }
                                                catch (Exception x) {
@@ -464,7 +446,7 @@ public class PeerGateway {
                                                if (documentContent != null) {
                                                        try {
                                                                content.putDocumentContent(
-                                                                       theSolution.getSolutionId(), localRevision.getRevisionId(), localDocument, documentContent);
+                                                                       localSolution.getSolutionId(), localRevision.getRevisionId(), localDocument, documentContent);
                                                                doUpdate = true;
                                                        }
                                                        catch (ServiceException sx) {
index 52dc62e..5fac593 100644 (file)
@@ -200,6 +200,8 @@ public class CatalogServiceImpl extends AbstractServiceImpl
                                if (catalogSolution.getSourceId() == null) {
                                        //this is a local solution that made its way back
                                        log.info(EELFLoggerDelegate.debugLogger, "Solution {} was originally provisioned locally", catalogSolution.getSolutionId());
+                                       //make sure not to update the user if the solution is local
+                                       theSolution.setUserId(catalogSolution.getUserId());
                                }
                                else {
                                        if (!theSolution.getSourceId().equals(catalogSolution.getSourceId())) {
index fda8863..565a9f2 100644 (file)
@@ -24,6 +24,8 @@ import java.lang.invoke.MethodHandles;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.concurrent.ScheduledFuture;
 
 import javax.annotation.PostConstruct;
@@ -121,14 +123,22 @@ public class PeerSubscriptionTaskScheduler {
        private void terminatePeerSubsTask(String thePeerId) {
 
                Map<Long, PeerTaskHandler> subsTask = this.peersSubsTask.row(thePeerId);
+               Set<Long> subsToTerminate = null;
+
                if (subsTask != null) {
+                       subsToTerminate = new HashSet<Long>();
+
                        for (Map.Entry<Long, PeerTaskHandler> subTaskEntry : subsTask.entrySet()) {
                                subTaskEntry.getValue().stopTask();
-                               this.peersSubsTask.remove(thePeerId, subTaskEntry.getKey());
+                               subsToTerminate.add(subTaskEntry.getKey());
                                log.debug(EELFLoggerDelegate.debugLogger,       "Terminated task for peer {} subscription {}",
                                                                        thePeerId, subTaskEntry.getKey());
                        }
                }
+
+               for (Long subId: subsToTerminate) {     
+                       this.peersSubsTask.remove(thePeerId, subId);
+               }
        }
 
        @Scheduled(initialDelay = 5000, fixedRateString = "${peer.jobchecker.interval:400}000")
@@ -142,6 +152,21 @@ public class PeerSubscriptionTaskScheduler {
                        return;
                }
 
+               //terminate peer tasks for deleted peers
+               Set<String> activePeerIds = this.peersSubsTask.rowKeySet();
+               Set<String> peersToTerminate = new HashSet<String>();
+               for (String activePeerId: activePeerIds) {
+                       MLPPeer activePeer = peers.stream().filter(peer -> peer.getPeerId().equals(activePeerId)).findFirst().orElse(null);
+                       if (activePeer == null) {
+                               peersToTerminate.add(activePeerId);
+                       }
+               }
+
+               for (String peerId: peersToTerminate) {
+                       terminatePeerSubsTask(peerId);
+               }
+
+               //for each existing peer
                for (MLPPeer peer : peers) {
                        log.info(EELFLoggerDelegate.debugLogger, "check peer {}", peer);
 
@@ -149,11 +174,11 @@ public class PeerSubscriptionTaskScheduler {
                                continue;
 
                        log.debug(EELFLoggerDelegate.debugLogger,       "processing peer {}", peer.getPeerId());
-                       // cancel peer tasks for non-active peers
+                       // terminate peer tasks for non-active peers
                        if (PeerStatus.Active != PeerStatus.forCode(peer.getStatusCode())) {
                                // cancel all peer sub tasks for this peer
                                log.debug(EELFLoggerDelegate.debugLogger,
-                                               "peer {} no longer active, stopping active tasks", peer);
+                                               "peer {} no longer active, terminating active tasks", peer);
                                terminatePeerSubsTask(peer.getPeerId());
                                continue;
                        }
@@ -163,18 +188,23 @@ public class PeerSubscriptionTaskScheduler {
                        //currently active peer subs
                        Map<Long, PeerTaskHandler> peerSubsTask = this.peersSubsTask.row(peer.getPeerId());
 
-                       //stop all active peer sub tasks that have no provisioned equivalent
+                       //terminate all active peer sub tasks that have no provisioned equivalent
                        if (peerSubsTask != null) {
+                               Set<Long> subsToTerminate = new HashSet<Long>();
                                for (Map.Entry<Long, PeerTaskHandler> peerSubTaskEntry : peerSubsTask.entrySet()) {
                                        //fugly
                                        if (!peerSubs.stream()
                                                                .filter(peerSub -> peerSub.getSubId().equals(peerSubTaskEntry.getKey())).findAny().isPresent()) {
                                                peerSubTaskEntry.getValue().stopTask();
-                                               this.peersSubsTask.remove(peer.getPeerId(), peerSubTaskEntry.getKey());
+                                               subsToTerminate.add(peerSubTaskEntry.getKey());
                                                log.debug(EELFLoggerDelegate.debugLogger,       "Terminated task for peer {} subscription {}",
                                                                                        peer.getPeerId(), peerSubTaskEntry.getKey());
                                        }
                                }
+
+                               for (Long subId: subsToTerminate) {
+                                       this.peersSubsTask.remove(peer.getPeerId(), subId);
+                               }
                        }
 
                        //start/update tasks for all current subscriptions
@@ -189,8 +219,9 @@ public class PeerSubscriptionTaskScheduler {
                                                          (peerSub.getModified() != null && taskSub.getModified() != null &&
                                                                 peerSub.getModified().equals(taskSub.getModified())))) {
                                                log.debug(EELFLoggerDelegate.debugLogger,
-                                                               "peer {} subscription {} was updated, stopping current task", peer.getPeerId(), peerSub.getSubId());
+                                                               "peer {} subscription {} was updated, terminating current task", peer.getPeerId(), peerSub.getSubId());
                                                peerSubTask.stopTask();
+                                               //this remove can be inlines as we are iterating over peersSubsTasks at this time
                                                this.peersSubsTask.remove(peer.getPeerId(), peerSub.getSubId());
                                        }
                                }
index 75f1c20..2921d69 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.acumos.federation.gateway.task;
 
+import java.util.concurrent.Executor;
+
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.beans.factory.config.ConfigurableBeanFactory;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -27,46 +29,77 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Scope;
 import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
-
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.AsyncConfigurer;
+import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
 
 /**
  * Provides the beans used to setup the peer subscription tasks.
  */
 @Configuration
+@EnableAsync
 @EnableScheduling
 @EnableAutoConfiguration
 @ConfigurationProperties(prefix = "task")
-public class TaskConfiguration {
+public class TaskConfiguration implements AsyncConfigurer {
 
-       private int poolSize = 20;
+       private int schedulerPoolSize = 100;
+       private int executorCorePoolSize = 20;
+       private int executorMaxPoolSize = 100;
+       private int executorQueueCapacity = 50;
 
        public TaskConfiguration() {
        }
 
-       public void setPoolSize(int thePoolSize) {
-               this.poolSize = thePoolSize;
+       public void setSchedulerPoolSize(int theSize) {
+               this.schedulerPoolSize = theSize;
+       }
+
+       public void setExecutorCorePoolSize(int theSize) {
+               this.executorCorePoolSize = theSize;
+       }
+
+       public void setExecutorMaxPoolSize(int theSize) {
+               this.executorMaxPoolSize = theSize;
+       }
+
+       public void setExecutorQueueCapacity(int theCapacity) {
+               this.executorQueueCapacity = theCapacity;
+       }
+
+       @Override
+       public Executor getAsyncExecutor() {
+               ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+               executor.setCorePoolSize(this.executorCorePoolSize);
+               executor.setMaxPoolSize(this.executorMaxPoolSize);
+               executor.setQueueCapacity(this.executorQueueCapacity);
+               executor.setThreadNamePrefix("GatewayExecutor-");
+               executor.initialize();
+               return executor;
+       }
+
+       @Override
+       public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
+               return null;
        }
 
        @Bean
-       public TaskScheduler taskScheduler() {
-               ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
-               taskScheduler.setPoolSize(this.poolSize);
-               taskScheduler.setBeanName("gatewayPeerTaskScheduler");
-               taskScheduler.initialize();
-               return taskScheduler;
+       public TaskScheduler getTaskScheduler() {
+               ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+               scheduler.setPoolSize(this.schedulerPoolSize);
+               scheduler.setThreadNamePrefix("GatewayScheduler-");
+               scheduler.initialize();
+               return scheduler;
        }
 
-       /**
-        */
        @Bean
        public PeerSubscriptionTaskScheduler peerSubscriptionTaskScheduler() {
                return new PeerSubscriptionTaskScheduler();
        }
 
-       /**
-        */
        @Bean
        @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public PeerSubscriptionTask peerSubscriptionTask() {
index 8148913..5f45313 100644 (file)
@@ -152,6 +152,21 @@ public class PeerGatewayTest {
                        mockSolutionsResponse
                                .addHeader("Content-Length", String.valueOf(mockSolutions.contentLength()));
 
+                       BasicHttpResponse mockSolutionResponse = 
+                               new BasicHttpResponse(
+                                       new BasicStatusLine(
+                                               new ProtocolVersion("HTTP",1,1), 200, "Success"));
+
+                       ClassPathResource mockSolution =
+                               new ClassPathResource("mockPeerSolutionResponse.json");
+
+                       mockSolutionResponse.setEntity(
+                               new InputStreamEntity(mockSolution.getInputStream()));
+                       mockSolutionResponse
+                               .addHeader("Content-Type", ContentType.APPLICATION_JSON.toString());
+                       mockSolutionResponse
+                               .addHeader("Content-Length", String.valueOf(mockSolution.contentLength()));
+
                        BasicHttpResponse mockSolutionRevisionsResponse = 
                                new BasicHttpResponse(
                                        new BasicStatusLine(
@@ -485,6 +500,8 @@ public class PeerGatewayTest {
                                                String path = req.getURI().getPath();
                                                if (path.equals("/solutions"))
                                                        return mockSolutionsResponse;
+                                               if (path.startsWith("/solutions/") && !path.contains("/revisions")) //solution details
+                                                       return mockSolutionResponse;
                                                if (path.endsWith("/revisions"))
                                                        return mockSolutionRevisionsResponse;
                                                if (path.endsWith("/artifacts"))
index c2e615a..afff8fe 100644 (file)
@@ -1,4 +1,5 @@
 {
+ "error": false,
  "message": "Success",
  "content": {
   "solutionId":"6793411f-c7a1-4e93-85bc-f91d267541d8",
@@ -8,9 +9,19 @@
   "active":"true",
   "modelTypeCode":"CL",
   "toolkitTypeCode":"",
-  "validationStatusCode":"",
   "metadata":"acumosa",
   "created":"2017-08-10",
-  "modified":"2017-08-11"
+  "modified":"2017-08-11",
+       "revisions":[{
+         "revisionId":"2c7e4481-6e6f-47d9-b7a4-c4e674d2b341",
+       "solutionId":"6793411f-c7a1-4e93-85bc-f91d267541d8",
+         "description":"First attempt",
+         "version":"1.0",
+         "metadata":"acumosa",
+         "userId":"admin",
+         "accessTypeCode": "PB",
+               "validationStatusCode": "PS",
+               "created":"2017-08-10",
+               "modified":"2017-08-11"}]
  }
 }