Align with CDS 1.18 27/2727/2
authorSerban Jora <sj2381@att.com>
Wed, 5 Sep 2018 13:23:27 +0000 (09:23 -0400)
committerChris Lott <clott@research.att.com>
Wed, 5 Sep 2018 13:41:13 +0000 (13:41 +0000)
Fix subscription update processing.

Change-Id: I90070fdc417d228df28fb1c4296389e047593142
Issue-ID: ACUMOS-1693
Signed-off-by: Serban Jora <sj2381@att.com>
docs/release-notes.rst
gateway/pom.xml
gateway/src/main/java/org/acumos/federation/gateway/task/PeerSubscriptionTaskScheduler.java

index f8278f1..d2ac3ee 100644 (file)
@@ -22,6 +22,12 @@ Federated Gateway Release Notes
 
 The Federated Gateway server is available as a Docker image in a Docker registry.
 
+Version 1.18.0, 2018-09-05
+-------------------------
+
+* Align with data model changes from CDS 1.18.x
+* Fix subscription update processing (ACUMS-1693)
+
 Version 1.17.1, 2018-09-04
 -------------------------
 
index 410852e..4c7ae05 100644 (file)
@@ -24,7 +24,7 @@ limitations under the License.
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.acumos.federation</groupId>
        <artifactId>gateway</artifactId>
-       <version>1.17.1-SNAPSHOT</version>
+       <version>1.18.0-SNAPSHOT</version>
        <name>Federation Gateway</name>
        <description>Federated Acumos Interface for inter-acumos and ONAP communication</description>
 
@@ -74,7 +74,7 @@ limitations under the License.
                <dependency>
                        <groupId>org.acumos.common-dataservice</groupId>
                        <artifactId>cmn-data-svc-client</artifactId>
-                       <version>1.17.1</version>
+                       <version>1.18.0</version>
                </dependency>
                <dependency>
                        <groupId>org.json</groupId>
index b918029..1721abd 100644 (file)
@@ -136,6 +136,19 @@ public class PeerSubscriptionTaskScheduler {
                return true;
        }
 
+       private void terminatePeerSubsTask(String thePeerId) {
+
+               Map<Long, PeerTaskHandler> subsTask = this.peersSubsTask.row(thePeerId);
+               if (subsTask != null) {
+                       for (Map.Entry<Long, PeerTaskHandler> subTaskEntry : subsTask.entrySet()) {
+                               subTaskEntry.getValue().stopTask();
+                               this.peersSubsTask.remove(thePeerId, subTaskEntry.getKey());
+                               log.debug(EELFLoggerDelegate.debugLogger,       "Terminated task for peer {} subscription {}",
+                                                                       thePeerId, subTaskEntry.getKey());
+                       }
+               }
+       }
+
        @Scheduled(initialDelay = 5000, fixedRateString = "${peer.jobchecker.interval:400}000")
        public void checkPeerJobs() {
 
@@ -153,52 +166,58 @@ public class PeerSubscriptionTaskScheduler {
                        if (peer.isSelf())
                                continue;
 
-                       log.debug(EELFLoggerDelegate.debugLogger,
-                                               "processing peer {}", peer.getPeerId());
+                       log.debug(EELFLoggerDelegate.debugLogger,       "processing peer {}", peer.getPeerId());
                        // cancel peer tasks for non-active peers
                        if (PeerStatus.Active != PeerStatus.forCode(peer.getStatusCode())) {
                                // cancel all peer sub tasks for this peer
                                log.debug(EELFLoggerDelegate.debugLogger,
-                                               "peer {} no longer active, removing active tasks", peer);
-                               Map<Long, PeerTaskHandler> subsTask = this.peersSubsTask.row(peer.getPeerId());
-                               if (subsTask != null) {
-                                       for (Map.Entry<Long, PeerTaskHandler> subTaskEntry : subsTask.entrySet()) {
-                                               subTaskEntry.getValue().stopTask();
-                                               this.peersSubsTask.remove(peer.getPeerId(), subTaskEntry.getKey());
-                                       }
-                               }
-
+                                               "peer {} no longer active, stopping active tasks", peer);
+                               terminatePeerSubsTask(peer.getPeerId());
                                continue;
                        }
 
-                       List<MLPPeerSubscription> subs = peerSubscriptionService.getPeerSubscriptions(peer.getPeerId());
-                       if (Utils.isEmptyList(subs)) {
-                               // the peer is still there but has no subscriptions: cancel any ongoing tasks
-
-                               continue;
+                       //currently provisioned peer subs
+                       List<MLPPeerSubscription> peerSubs = peerSubscriptionService.getPeerSubscriptions(peer.getPeerId());
+                       //currently active peer subs
+                       Map<Long, PeerTaskHandler> peerSubsTask = this.peersSubsTask.row(peer.getPeerId());
+
+                       //stop all active peer sub tasks that have no provisioned equivalent
+                       if (peerSubsTask != null) {
+                               for (Map.Entry<Long, PeerTaskHandler> peerSubTaskEntry : peerSubsTask.entrySet()) {
+                                       //fugly
+                                       if (!peerSubs.stream()
+                                                               .filter(peerSub -> peerSub.getSubId().equals(peerSubTaskEntry.getKey())).findAny().isPresent()) {
+                                               peerSubTaskEntry.getValue().stopTask();
+                                               this.peersSubsTask.remove(peer.getPeerId(), peerSubTaskEntry.getKey());
+                                               log.debug(EELFLoggerDelegate.debugLogger,       "Terminated task for peer {} subscription {}",
+                                                                                       peer.getPeerId(), peerSubTaskEntry.getKey());
+                                       }
+                               }
                        }
 
-                       for (MLPPeerSubscription sub : subs) {
-                               log.info(EELFLoggerDelegate.debugLogger, "checkSub " + sub);
-                               PeerTaskHandler peerSubTask = peersSubsTask.get(peer.getPeerId(), sub.getSubId());
+                       //start/update tasks for all current subscriptions
+                       for (MLPPeerSubscription peerSub : peerSubs) {
+                               log.info(EELFLoggerDelegate.debugLogger, "checkSub " + peerSub);
+                               PeerTaskHandler peerSubTask = this.peersSubsTask.get(peer.getPeerId(), peerSub.getSubId());
                                if (peerSubTask != null) {
+                                       MLPPeerSubscription taskSub = peerSubTask.getSubscription();
                                        // was the subscription updated? if yes, cancel current task.
-                                       MLPPeerSubscription currentSub = peerSubTask.getSubscription();
-                                       if (!same(sub, currentSub)) {
+                                       //TODO: this does not correctly identify one time executions that were completed
+                                       if (!((peerSub.getModified() == null && taskSub.getModified() == null) ||
+                                                         (peerSub.getModified() != null && taskSub.getModified() != null &&
+                                                                peerSub.getModified().equals(taskSub.getModified())))) {
                                                log.debug(EELFLoggerDelegate.debugLogger,
-                                                               "subscription {} was updated, stopping current task", sub.getSubId());
+                                                               "peer {} subscription {} was updated, stopping current task", peer.getPeerId(), peerSub.getSubId());
                                                peerSubTask.stopTask();
-                                               peerSubTask = null; // in order to trigger its reset below: no need to remove the entry as we
-                                                                                       // are about
-                                                                                       // to replace it with a new one.
+                                               this.peersSubsTask.remove(peer.getPeerId(), peerSub.getSubId());
                                        }
                                }
 
-                               if (peerSubTask == null && shouldRun(sub)) {
-                                       log.info(EELFLoggerDelegate.debugLogger, "Scheduling peer sub task for peer {}, subscription {}", peer.getName(), sub.getSubId());
-                                       PeerTaskHandler hnd = new PeerTaskHandler().startTask(peer, sub);
+                               if (peerSubTask == null && shouldRun(peerSub)) {
+                                       log.info(EELFLoggerDelegate.debugLogger, "Scheduling peer sub task for peer {}, subscription {}", peer.getName(), peerSub.getSubId());
+                                       PeerTaskHandler hnd = new PeerTaskHandler().startTask(peer, peerSub);
                                        if (hnd != null) {
-                                               this.peersSubsTask.put(peer.getPeerId(), sub.getSubId(), hnd);
+                                               this.peersSubsTask.put(peer.getPeerId(), peerSub.getSubId(), hnd);
                                        }
                                }
                        }