Notify subscription owners of federation actions 00/5200/5
authorAndrew Gauld <agauld@att.com>
Fri, 6 Sep 2019 15:23:19 +0000 (15:23 +0000)
committerAndrew Gauld <agauld@att.com>
Fri, 6 Sep 2019 18:46:52 +0000 (18:46 +0000)
Also simplify some method annotations in the controllers

Change-Id: Ibaa9ae706f669ab1b3ba11727988f0886f33eee2
Issue-ID: ACUMOS-1778
Signed-off-by: Andrew Gauld <agauld@att.com>
docs/release-notes.rst
gateway/src/main/java/org/acumos/federation/gateway/FederationController.java
gateway/src/main/java/org/acumos/federation/gateway/GatewayController.java
gateway/src/main/java/org/acumos/federation/gateway/SubscriptionPoller.java
gateway/src/test/java/org/acumos/federation/gateway/GatewayControllerTest.java
gateway/src/test/java/org/acumos/federation/gateway/PollingTest.java

index f0028ec..c2b90be 100644 (file)
@@ -23,8 +23,10 @@ Federation Gateway Release Notes
 This server is available as a Docker image in a Docker registry at the Linux Foundation.
 The image name is "federation-gateway" and the tag is a version string as shown below.
 
-Version 2.3.0, 2019-08-09
+Version 2.3.0, 2019-09-06
 -------------------------
+* Portal to show details of federation actions (`ACUMOS-1778 <https://jira.acumos.org/browse/ACUMOS-1778>`_)
+
 * Run SV license scan when a model has been federated (`ACUMOS-3396 <https://jira.acumos.org/browse/ACUMOS-3396>`_)
   * This adds a new required configuration value, "verification.url" for the
     security verification service.
index e0bfa1a..0fe703c 100644 (file)
@@ -41,6 +41,7 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.util.UriTemplateHandler;
@@ -67,6 +68,7 @@ import org.acumos.federation.client.data.SolutionRevision;
  */
 @Controller
 @CrossOrigin
+@RequestMapping(produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
 public class FederationController {
        private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -142,7 +144,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_PEER)
        @ApiOperation(value = "Invoked by Peer Acumos to get status and self information.", response = MLPPeer.class)
-       @GetMapping(value = FederationClient.PING_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.PING_URI)
        @ResponseBody
        public JsonResponse<MLPPeer> ping() {
                log.debug("/ping");
@@ -151,7 +153,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_PARTNER)
        @ApiOperation(value = "Invoked by Peer Acumos to get a list of peers from local Acumos Instance .", response = MLPPeer.class, responseContainer = "List")
-       @GetMapping(value = FederationClient.PEERS_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.PEERS_URI)
        @ResponseBody
        public JsonResponse<List<MLPPeer>> getPeers() {
                log.debug("/peers");
@@ -160,7 +162,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_REGISTER)
        @ApiOperation(value = "Invoked by another Acumos Instance to request federation.", response = MLPPeer.class)
-       @PostMapping(value = FederationClient.REGISTER_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @PostMapping(FederationClient.REGISTER_URI)
        @ResponseBody
        public JsonResponse<MLPPeer> register() {
                log.debug("/peer/register");
@@ -173,7 +175,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_UNREGISTER)
        @ApiOperation(value = "Invoked by another Acumos Instance to request federation termination.", response = MLPPeer.class)
-       @PostMapping(value = FederationClient.UNREGISTER_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @PostMapping(FederationClient.UNREGISTER_URI)
        @ResponseBody
        public JsonResponse<MLPPeer> unregister() {
                log.debug("/peer/unregister");
@@ -186,7 +188,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_PEER)
        @ApiOperation(value = "Invoked by Peer Acumos to get a list of visible Catalogs from the local Acumos Instance .", response = MLPCatalog.class, responseContainer = "List")
-       @GetMapping(value = FederationClient.CATALOGS_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.CATALOGS_URI)
        @ResponseBody
        public JsonResponse<List<MLPCatalog>> getCatalogs() {
                log.debug("/catalogs");
@@ -195,7 +197,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_PEER)
        @ApiOperation(value = "Invoked by Peer Acumos to get a list of Published Solutions from the Catalog of the local Acumos Instance .", response = MLPSolution.class, responseContainer = "List")
-       @GetMapping(value = FederationClient.SOLUTIONS_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.SOLUTIONS_URI)
        @ResponseBody
        public JsonResponse<List<MLPSolution>> getSolutions(@RequestParam(value="catalogId", required = true) String catalogId) {
                log.debug("/solutions?catalogId={}", catalogId);
@@ -211,7 +213,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_PEER)
        @ApiOperation(value = "Invoked by Peer Acumos to get a list detailed solution information from the Catalog of the local Acumos Instance .", response = MLPSolution.class)
-       @GetMapping(value = FederationClient.SOLUTION_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.SOLUTION_URI)
        @ResponseBody
        public JsonResponse<MLPSolution> getSolution(@PathVariable("solutionId") String solutionId) {
                log.debug("/solutions/{}", solutionId);
@@ -225,7 +227,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_PEER)
        @ApiOperation(value = "Invoked by Peer Acumos to get a list of Solution Revision from the Catalog of the local Acumos Instance .", response = MLPSolutionRevision.class, responseContainer = "List")
-       @GetMapping(value = FederationClient.REVISIONS_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.REVISIONS_URI)
        @ResponseBody
        public JsonResponse<List<MLPSolutionRevision>> getRevisions(@PathVariable("solutionId") String solutionId) {
                log.debug("/solutions/{}/revisions", solutionId);
@@ -238,7 +240,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_PEER)
        @ApiOperation(value = "Invoked by peer Acumos to get solution revision details from the local Acumos Instance .", response = MLPSolutionRevision.class)
-       @GetMapping(value = FederationClient.REVISION_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.REVISION_URI)
        @ResponseBody
        public JsonResponse<MLPSolutionRevision> getRevision(
            @PathVariable("solutionId") String solutionId,
@@ -263,7 +265,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_PEER)
        @ApiOperation(value = "Invoked by Peer Acumos to get a list of solution revision artifacts from the local Acumos Instance .", response = MLPArtifact.class, responseContainer = "List")
-       @GetMapping(value = FederationClient.ARTIFACTS_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.ARTIFACTS_URI)
        @ResponseBody
        public JsonResponse<List<MLPArtifact>> getArtifacts(
            @PathVariable("solutionId") String solutionId,
@@ -281,7 +283,7 @@ public class FederationController {
 
        @Secured(Security.ROLE_PEER)
        @ApiOperation(value = "Invoked by Peer Acumos to get a list of solution revision public documents from the local Acumos Instance .", response = MLPArtifact.class, responseContainer = "List")
-       @GetMapping(value = FederationClient.DOCUMENTS_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.DOCUMENTS_URI)
        @ResponseBody
        public JsonResponse<List<MLPDocument>> getDocuments(
            @PathVariable("revisionId") String revisionId,
index 8a500e8..a952acf 100644 (file)
@@ -58,7 +58,7 @@ import org.acumos.federation.client.data.JsonResponse;
 @Controller
 @CrossOrigin
 @Secured(Security.ROLE_INTERNAL)
-@RequestMapping(GatewayClient.PEER_PFX)
+@RequestMapping(value = GatewayClient.PEER_PFX, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
 public class GatewayController {
        private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -72,7 +72,7 @@ public class GatewayController {
        private SubscriptionPoller poller;
 
        @ApiOperation(value = "Invoked by local Acumos to get a list of catalogs available from a peer Acumos instance .", response = MLPCatalog.class, responseContainer = "List")
-       @GetMapping(value = FederationClient.CATALOGS_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.CATALOGS_URI)
        @ResponseBody
        public JsonResponse<List<MLPCatalog>> getCatalogs(
            HttpServletResponse response,
@@ -82,7 +82,7 @@ public class GatewayController {
        }
 
        @ApiOperation(value = "Invoked by local Acumos to get a list of solutions available from a peer Acumos instance .", response = MLPSolution.class, responseContainer = "List")
-       @GetMapping(value = FederationClient.SOLUTIONS_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.SOLUTIONS_URI)
        @ResponseBody
        public JsonResponse<List<MLPSolution>> getSolutions(
            HttpServletResponse response,
@@ -93,7 +93,7 @@ public class GatewayController {
        }
 
        @ApiOperation(value = "Invoked by local Acumos to get detailed solution information from the catalog of a peer acumos Instance.", response = MLPSolution.class)
-       @GetMapping(value = FederationClient.SOLUTION_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.SOLUTION_URI)
        @ResponseBody
        public JsonResponse<MLPSolution> getSolution(
            HttpServletResponse response,
@@ -104,7 +104,7 @@ public class GatewayController {
        }
 
        @ApiOperation(value = "Invoked by local Acumos to get peers information from remote Acumos peer.", response = MLPPeer.class, responseContainer = "List")
-       @GetMapping(value = FederationClient.PEERS_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.PEERS_URI)
        @ResponseBody
        public JsonResponse<List<MLPPeer>> getPeers(
            HttpServletResponse response,
@@ -114,7 +114,7 @@ public class GatewayController {
        }
 
        @ApiOperation(value = "Invoked by local Acumos to get peer Acumos status and information.", response = MLPPeer.class)
-       @GetMapping(value = FederationClient.PING_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @GetMapping(FederationClient.PING_URI)
        @ResponseBody
        public JsonResponse<MLPPeer> ping(
            HttpServletResponse response,
@@ -124,7 +124,7 @@ public class GatewayController {
        }
 
        @ApiOperation(value = "Invoked by local Acumos to register with a remote Acumos peer.", response = MLPPeer.class)
-       @PostMapping(value = FederationClient.REGISTER_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @PostMapping(FederationClient.REGISTER_URI)
        @ResponseBody
        public JsonResponse<MLPPeer> register(
            HttpServletResponse response,
@@ -133,8 +133,8 @@ public class GatewayController {
                return callPeer(response, peerId, FederationClient::register);
        }
 
-       @ApiOperation(value = "Invoked by other Acumos components in order to trigger subscription execution")
-       @PostMapping(value = GatewayClient.SUBSCRIPTION_URI, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+       @ApiOperation("Invoked by other Acumos components in order to trigger subscription execution")
+       @PostMapping(GatewayClient.SUBSCRIPTION_URI)
        @ResponseBody
        public JsonResponse<Void> triggerPeerSubscription(
            HttpServletResponse response,
index bd47cdd..e7ed638 100644 (file)
@@ -43,9 +43,11 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.web.client.ResourceAccessException;
 
+import org.acumos.cds.client.ICommonDataServiceRestClient;
 import org.acumos.cds.domain.MLPArtifact;
 import org.acumos.cds.domain.MLPCatalog;
 import org.acumos.cds.domain.MLPDocument;
+import org.acumos.cds.domain.MLPNotification;
 import org.acumos.cds.domain.MLPPeer;
 import org.acumos.cds.domain.MLPPeerSubscription;
 import org.acumos.cds.domain.MLPRevCatDescription;
@@ -90,15 +92,168 @@ public class SubscriptionPoller {
                return ret;
        }
 
+       private enum Action {
+               PROCESS("Processed", "processing"),
+               FETCH("Fetched", "fetching"),
+               PARSE("Parsed", "parsing"),
+               CREATE("Created", "creating"),
+               UPDATE("Updated", "updating"),
+               ADD("Added", "adding"),
+               DELETE("Deleted", "deleting"),
+               COPY("Copied", "copying");
+
+               private String done;
+               private String during;
+
+               public String getDone() {
+                       return(done);
+               }
+
+               public String getDuring() {
+                       return(during);
+               }
+
+               Action(String done, String during) {
+                       this.done = done;
+                       this.during = during;
+               }
+       }
+
+       private static class PendingAction      {
+               private PendingAction parent;
+               private Action action;
+               private String item;
+               private boolean force;
+               private Instant start = Instant.now();
+
+               public PendingAction(PendingAction parent, Action action, String item, boolean force) {
+                       this.parent = parent;
+                       this.action = action;
+                       this.item = " " + item;
+                       this.force = force;
+               }
+
+               public void setForce() {
+                       this.force = true;
+               }
+
+               public boolean getForce() {
+                       return(this.force);
+               }
+
+               public PendingAction pop() {
+                       PendingAction ret = parent;
+                       parent = null;
+                       return(ret);
+               }
+
+               public String getDone() {
+                       return(action.getDone() + item);
+               }
+
+               public String getDuring() {
+                       return(action.getDuring() + item);
+               }
+
+               public String getItem() {
+                       return(item);
+               }
+               
+               public Instant getStart() {
+                       return(start);
+               }
+       }
+
+       private static class Notifier   {
+               private PendingAction actions;
+               private PendingAction leaf;
+               private ICommonDataServiceRestClient cds;
+               private String userId;
+
+               public Notifier(ICommonDataServiceRestClient cds, String userId) {
+                       this.cds = cds;
+                       this.userId = userId;
+               }
+
+               public PendingAction begin(String item, Object... args) {
+                       end();
+                       actions = new PendingAction(actions, Action.PROCESS, String.format(item, args), false);
+                       return actions;
+               }
+
+               public void noteEnd(PendingAction handle) {
+                       handle.setForce();
+                       end(handle);
+               }
+
+               public void end(PendingAction handle) {
+                       end();
+                       do {
+                               leaf = actions;
+                               actions = leaf.pop();
+                       } while (end() != handle);
+               }
+
+               public void check(Action action, String item, Object... args) {
+                       end();
+                       leaf = new PendingAction(null, action, String.format(item, args), false);
+               }
+
+               public void action(Action action, String item, Object... args) {
+                       end();
+                       leaf = new PendingAction(null, action, String.format(item, args), true);
+               }
+
+               private void note(PendingAction cur, String sev, String msg) {
+                       MLPNotification note = new MLPNotification(cur.getItem(), sev, cur.getStart(), Instant.now());
+                       note.setMessage(msg);
+                       cds.addUserToNotification(cds.createNotification(note).getNotificationId(), userId);
+               }
+
+               public PendingAction end() {
+                       if (leaf != null) {
+                               PendingAction cur = leaf;
+                               boolean logit = leaf.getForce();
+                               leaf = null;
+                               if (logit) {
+                                       note(cur, "LO", cur.getDone());
+                               }
+                               return(cur);
+                       }
+                       return(null);
+               }
+
+               public void fail(PendingAction handle, String msg) {
+                       String format = "Error occurred while %s: %s";
+                       String sev = "HI";
+                       PendingAction cur = null;
+                       do {
+                               cur = leaf;
+                               leaf = null;
+                               if (cur == null) {
+                                       cur = actions;
+                                       actions = cur.pop();
+                               }
+                               String during = cur.getDuring();
+                               note(cur, sev, String.format(format, during, msg));
+                               msg = during;
+                               format = "While %s, an error occurred: %s";
+                               sev = "ME";
+                       } while (cur != handle && actions != null);
+               }
+       }
+
        private class PeerSubscriptionPoller implements Runnable {
                private long subId;
+               private String userId;
                private String peerId;
                private Long interval;
                private ScheduledFuture future;
-               private String userId;
+               private Notifier events;
 
-               public PeerSubscriptionPoller(long subId, String peerId, Long interval) {
+               public PeerSubscriptionPoller(long subId, String userId, String peerId, Long interval) {
                        this.subId = subId;
+                       this.userId = userId;
                        this.peerId = peerId;
                        this.interval = interval;
                }
@@ -128,7 +283,10 @@ public class SubscriptionPoller {
 
                private boolean checkRevision(String revisionId, String solutionId, String catalogId, FederationClient peer) {
                        log.info("Checking revision {} from peer {}", revisionId, peerId);
+                       PendingAction act = events.begin("revision %s", revisionId);
+                       events.check(Action.FETCH, "remote revision");
                        SolutionRevision pRev = (SolutionRevision)peer.getSolutionRevision(solutionId, revisionId, catalogId);
+                       events.check(Action.FETCH, "local revision");
                        SolutionRevision lRev = (SolutionRevision)catalogService.getRevision(revisionId, catalogId);
                        boolean changed = false;
                        boolean isnew = lRev == null;
@@ -136,6 +294,7 @@ public class SubscriptionPoller {
                        if (isnew) {
                                log.info("Revision {} doesn't exist locally.  Creating it", revisionId);
                                pRev.setSourceId(peerId);
+                               events.action(Action.CREATE, "revision %s", revisionId);
                                lRev = (SolutionRevision)catalogService.createRevision(pRev);
                        }
                        MLPRevCatDescription pDesc = pRev.getRevCatDescription();
@@ -143,13 +302,16 @@ public class SubscriptionPoller {
                        if (pDesc != null) {
                                if (lDesc == null) {
                                        log.info("Description for revision {} in catalog {} doesn't exist locally.  Creating it", revisionId, catalogId);
+                                       events.action(Action.CREATE, "revision description");
                                        catalogService.createDescription(pDesc);
                                } else if (!Objects.equals(pDesc.getDescription(), lDesc.getDescription())) {
                                        log.info("Updating description for revision {} in catalog {}", revisionId, catalogId);
+                                       events.action(Action.UPDATE, "revision description");
                                        catalogService.updateDescription(pDesc);
                                }
                        } else if (lDesc != null) {
                                log.info("Deleting old description for revision {} in catalog {}", revisionId, catalogId);
+                               events.action(Action.DELETE, "revision description");
                                catalogService.deleteDescription(revisionId, catalogId);
                        }
                        List<MLPArtifact> pArts = pRev.getArtifacts();
@@ -158,20 +320,28 @@ public class SubscriptionPoller {
                                String artifactId = pArt.getArtifactId();
                                log.debug("Checking artifact {} from peer {}", artifactId, peerId);
                                String pTag = pArt.getDescription();
-                               MLPArtifact lArt = catalogService.getArtifact(artifactId);
                                pArt.setUserId(userId);
                                contentService.setArtifactUri(solutionId, pArt);
+                               MLPArtifact lArt = lArts.get(artifactId);
+                               if (lArt == null) {
+                                       events.check(Action.FETCH, "local artifact %s metadata", artifactId);
+                                       lArt = catalogService.getArtifact(artifactId);
+                               }
                                if (lArt == null) {
                                        log.info("Artifact {} doesn't exist locally.  Creating it", artifactId);
+                                       events.action(Action.CREATE, "artifact %s metadata", artifactId);
                                        lArt = catalogService.createArtifact(pArt);
                                } else if (!Objects.equals(pArt.getSize(), lArt.getSize()) || !Objects.equals(pArt.getVersion(), lArt.getVersion())) {
                                        log.info("Updating artifact {}", artifactId);
+                                       events.action(Action.UPDATE, "artifact %s metadata", artifactId);
                                        catalogService.updateArtifact(pArt);
                                } else {
                                        continue;
                                }
                                changed = true;
+                               events.check(Action.FETCH, "artifact %s content", artifactId);
                                try (InputStream is = peer.getArtifactContent(artifactId)) {
+                                       events.action(Action.COPY, "artifact %s content", artifactId);
                                        contentService.putArtifactContent(pArt, pTag, is);
                                } catch (IOException ioe) {
                                        throw new ResourceAccessException("Failure copying artifact " + artifactId + " from peer " + peerId, ioe);
@@ -180,6 +350,7 @@ public class SubscriptionPoller {
                        for (MLPArtifact pArt: pArts) {
                                if (lArts.get(pArt.getArtifactId()) == null) {
                                        log.info("Adding artifact {} to revision {}", pArt.getArtifactId(), revisionId);
+                                       events.action(Action.ADD, "artifact %s to revision %s", pArt.getArtifactId(), revisionId);
                                        catalogService.addArtifact(solutionId, revisionId, pArt.getArtifactId());
                                }
                        }
@@ -188,19 +359,27 @@ public class SubscriptionPoller {
                        for (MLPDocument pDoc: pDocs) {
                                String documentId = pDoc.getDocumentId();
                                log.debug("Checking document {} from peer {}", documentId, peerId);
-                               MLPDocument lDoc = catalogService.getDocument(documentId);
                                pDoc.setUserId(userId);
                                contentService.setDocumentUri(solutionId, pDoc);
+                               MLPDocument lDoc = lDocs.get(documentId);
+                               if (lDoc == null) {
+                                       events.check(Action.FETCH, "local document %s metadata", documentId);
+                                       lDoc = catalogService.getDocument(documentId);
+                               }
                                if (lDoc == null) {
                                        log.info("Document {} doesn't exist locally.  Creating it", documentId);
+                                       events.action(Action.CREATE, "document %s metadata", documentId);
                                        catalogService.createDocument(pDoc);
                                } else if (!Objects.equals(pDoc.getSize(), lDoc.getSize()) || !Objects.equals(pDoc.getVersion(), lDoc.getVersion())) {
                                        log.info("Updating document {}", documentId);
+                                       events.action(Action.UPDATE, "document %s metadata", documentId);
                                        catalogService.updateDocument(pDoc);
                                } else {
                                        continue;
                                }
+                               events.check(Action.FETCH, "document %s content", documentId);
                                try (InputStream is = peer.getDocumentContent(documentId)) {
+                                       events.action(Action.COPY, "document %s content", documentId);
                                        contentService.putDocumentContent(pDoc, is);
                                } catch (IOException ioe) {
                                        throw new ResourceAccessException("Failure copying document " + documentId + " from peer " + peerId, ioe);
@@ -209,22 +388,28 @@ public class SubscriptionPoller {
                        for (MLPDocument pDoc: pDocs) {
                                if (lDocs.get(pDoc.getDocumentId()) == null) {
                                        log.info("Adding document {} to revision {} in catalog {}", pDoc.getDocumentId(), revisionId, catalogId);
+                                       events.action(Action.ADD, "document %s to revision %s in catalog %s", pDoc.getDocumentId(), revisionId, catalogId);
                                        catalogService.addDocument(revisionId, catalogId, pDoc.getDocumentId());
                                }
                        }
                        changed |= isnew;
                        if (changed && !isnew) {
+                               events.action(Action.UPDATE, "revision %s", revisionId);
                                catalogService.updateRevision(pRev);
                        }
                        if (changed) {
                                new Thread(() -> { try {clients.getSVClient().securityVerificationScan(solutionId, revisionId, "created", userId); } catch (Exception e) { log.error("SV scan failure on revision " + revisionId, e); }}).start();
                        }
-                       return changed;
+                       events.end(act);
+                       return(changed);
                }
 
                private void checkSolution(String solutionId, String catalogId, boolean inLocalCatalog, FederationClient peer) {
                        log.info("Checking solution {} from peer {}", solutionId, peerId);
+                       PendingAction act = events.begin("solution %s", solutionId);
+                       events.check(Action.FETCH, "remote solution");
                        Solution pSol = (Solution)peer.getSolution(solutionId);
+                       events.check(Action.FETCH, "local solution");
                        Solution lSol = (Solution)catalogService.getSolution(solutionId);
                        boolean changed = false;
                        boolean isnew = lSol == null;
@@ -237,6 +422,7 @@ public class SubscriptionPoller {
                                pSol.setSourceId(peerId);
                                pSol.setUserId(userId);
                                pSol.setViewCount(0L);
+                               events.action(Action.CREATE, "solution %s", solutionId);
                                lSol = (Solution)catalogService.createSolution(pSol);
                        } else {
                                pSol.setActive(lSol.isActive());
@@ -261,39 +447,52 @@ public class SubscriptionPoller {
                        }
                        if (!Arrays.equals(lSol.getPicture(), pSol.getPicture())) {
                                log.info("Updating picture for solution {}", solutionId);
+                               events.action(Action.UPDATE, "picture for solution %s", solutionId);
                                catalogService.savePicture(solutionId, pSol.getPicture());
                        }
                        if (!inLocalCatalog) {
                                log.info("Adding solution {} to catalog {}", solutionId, catalogId);
+                               events.action(Action.ADD, "solution %s to catalog %s", solutionId, catalogId);
                                catalogService.addSolution(solutionId, catalogId);
                        }
                        for (MLPSolutionRevision rev: pSol.getRevisions()) {
                                changed |= checkRevision(rev.getRevisionId(), solutionId, catalogId, peer);
                        }
                        if (changed && !isnew) {
+                               events.action(Action.UPDATE, "solution %s", solutionId);
                                catalogService.updateSolution(pSol);
                                log.info("Updated solution {} from peer {}", solutionId, peerId);
                        }
+                       events.end(act);
                }
 
                private void checkCatalog(String catalogId) {
                        log.info("Checking catalog {} from peer {}", catalogId, peerId);
+                       PendingAction act = events.begin("catalog %s from peer %s", catalogId, peerId);
                        FederationClient peer = clients.getFederationClient(peerService.getPeer(peerId).getApiUrl());
+                       events.check(Action.FETCH, "list of solutions in remote catalog");
                        List<MLPSolution> peerSolutions = peer.getSolutions(catalogId);
+                       events.check(Action.FETCH, "list of solutions in local catalog");
                        HashMap<String, MLPSolution> localSolutions = index(catalogService.getSolutions(catalogId), MLPSolution::getSolutionId);
                        if (localSolutions.isEmpty() && !peerSolutions.isEmpty() && index(catalogService.getAllCatalogs(), MLPCatalog::getCatalogId).get(catalogId) == null) {
                                log.info("Catalog {} doesn't exist locally.  Creating it", catalogId);
+                               events.action(Action.CREATE, "catalog %s", catalogId);
                                catalogService.createCatalog(index(peer.getCatalogs(), MLPCatalog::getCatalogId).get(catalogId));
+                               events.end();
                        }
-                       for (MLPSolution solution: peer.getSolutions(catalogId)) {
+                       for (MLPSolution solution: peerSolutions) {
                                checkSolution(solution.getSolutionId(), catalogId, localSolutions.get(solution.getSolutionId()) != null, peer);
                        }
                        log.info("Checked catalog {} from peer {}", catalogId, peerId);
+                       events.noteEnd(act);
                }
 
                private void checkSubscription() {
                        log.info("Processing subscription {} for peer {}", subId, peerId);
+                       PendingAction act = events.begin("subscription %s for peer %s", subId, peerId);
+                       events.check(Action.FETCH, "subscription %s", subId);
                        MLPPeerSubscription subscription = peerService.getSubscription(subId);
+                       events.check(Action.PARSE, "subscription's selector");
                        Object xcatalogs;
                        try {
                                xcatalogs = ((Map<String, Object>)mapper.readValue(subscription.getSelector(), trMapStoO)).get("catalogId");
@@ -304,24 +503,28 @@ public class SubscriptionPoller {
                                }
                        } catch (IOException | NullPointerException | ArrayStoreException | ClassCastException ioe) {
                                log.error(String.format("Malformed selector %s on subscription %s to peer %s", subscription.getSelector(), subId, peerId));
+                               events.fail(act, "Subscription selector was malformed");
                                return;
                        }
+                       events.end();
                        String[] catalogs = (String[])xcatalogs;
                        Instant startTime = Instant.now();
-                       userId = subscription.getUserId();
                        for (String catalogId: catalogs) {
                                checkCatalog(catalogId);
                        }
                        subscription.setProcessed(startTime);
                        peerService.updateSubscription(subscription);
                        log.info("Subscription {} processed for peer {}", subId, peerId);
+                       events.end(act);
                }
 
                public void run() {
+                       events = new Notifier(clients.getCDSClient(), userId);
                        try {
                                checkSubscription();
                        } catch (Exception ex) {
                                log.error(String.format("Unexpected error processing subscription %s for peer %s", subId, peerId), ex);
+                               events.fail(null, ex.toString());
                        }
                }
        }
@@ -370,7 +573,7 @@ public class SubscriptionPoller {
                Long interval = subscription.getRefreshInterval();
                PeerSubscriptionPoller poller = subscriptions.get(subId);
                if (poller == null) {
-                       poller = new PeerSubscriptionPoller(subId, subscription.getPeerId(), interval);
+                       poller = new PeerSubscriptionPoller(subId, subscription.getUserId(), subscription.getPeerId(), interval);
                        subscriptions.put(subId, poller);
                        if (force || (interval != null && (interval.longValue() > 0L || subscription.getProcessed() == null))) {
                                poller.schedule();
index 08788f2..d741649 100644 (file)
@@ -125,14 +125,14 @@ public class GatewayControllerTest {
                    .on("GET /peer/search?subjectName=gateway.acumosa.org&_j=a&page=0&size=100", xq("{ 'content': [ {'peerId': 'acumosa', 'subjectName': 'gateway.acumosa.org', 'statusCode': 'AC', 'self': true } ], 'last': true, 'number': 0, 'size': 100, 'numberOfElements': 1 }"))
                    .on("GET /peer/search?subjectName=gateway.acumosb.org&_j=a&page=0&size=100", xq("{ 'content': [ {'peerId': 'acumosb', 'subjectName': 'gateway.acumosb.org', 'statusCode': 'AC', 'self': false } ], 'last': true, 'number': 0, 'size': 100, 'numberOfElements': 1 }"))
                    .on("GET /peer/search?subjectName=gateway.acumosc.org&_j=a&page=0&size=100", xq("{ 'content': [ ], 'last': true, 'number': 0, 'size': 100, 'numberOfElements': 0 }"))
-                   .on("GET /peer/sub/999", xq("{ 'subId': 999, 'peerId': 'somepeer', 'selector': '{ \\'catalogId\\': \\'somecatalog\\' } ' }"))
+                   .on("GET /peer/sub/999", xq("{ 'subId': 999, 'peerId': 'somepeer', 'selector': '{ \\'catalogId\\': \\'somecatalog\\' } ', 'userId': 'someUser' }"))
                    .on("GET /peer/sub/998", "")
                    .on("GET /peer/sub/997", xq("{ 'subId': 997, 'peerId': 'someotherpeer' }"))
                    .on("GET /peer/sub/992", xq("{ 'subId': 992, 'peerId': 'somepeer', 'selector': '{ \\'catalogId\\': [ \\'firstcatalog\\', \\'secondcatalog\\' ] }', 'refreshInterval': 3600, 'userId': 'someUser' }"))
-                   .on("GET /peer/sub/993", xq("{ 'subId': 993, 'peerId': 'somepeer', 'selector': '{ \\'catalogId\\': true }', 'refreshInterval': 3600 }"))
-                   .on("GET /peer/sub/994", xq("{ 'subId': 994, 'peerId': 'somepeer', 'selector': '{ \\'catalogId\\': [ \\'x\\', true ] }', 'refreshInterval': 3600 }"))
-                   .on("GET /peer/sub/995", xq("{ 'subId': 995, 'peerId': 'somepeer', 'selector': '}', 'refreshInterval': 3600 }"))
-                   .on("GET /peer/sub/996", xq("{ 'subId': 996, 'peerId': 'somepeer', 'selector': '{}', 'refreshInterval': 3600 }"))
+                   .on("GET /peer/sub/993", xq("{ 'subId': 993, 'peerId': 'somepeer', 'selector': '{ \\'catalogId\\': true }', 'refreshInterval': 3600, 'userId': 'someUser' }"))
+                   .on("GET /peer/sub/994", xq("{ 'subId': 994, 'peerId': 'somepeer', 'selector': '{ \\'catalogId\\': [ \\'x\\', true ] }', 'refreshInterval': 3600, 'userId': 'someUser' }"))
+                   .on("GET /peer/sub/995", xq("{ 'subId': 995, 'peerId': 'somepeer', 'selector': '}', 'refreshInterval': 3600, 'userId': 'someUser' }"))
+                   .on("GET /peer/sub/996", xq("{ 'subId': 996, 'peerId': 'somepeer', 'selector': '{}', 'refreshInterval': 3600, 'userId': 'someUser' }"))
                    .on("PUT /peer/sub/999", "", count)
                    .on("GET /catalog/solution?ctlg=somecatalog&page=0&size=100", xq("{ 'content': [], 'last': true, 'number': 0, 'size': 100, 'numberOfElements': 0 }"))
                    .on("GET /catalog?page=0&size=100", xq("{ 'content': [], 'last': true, 'number': 0, 'size': 100, 'numberOfElements': 0 }"))
@@ -175,6 +175,8 @@ public class GatewayControllerTest {
                    .on("POST /solution/cat2soln/tag/tag1", "")
                    .on("PUT /solution/cat2soln", "")
                    .on("PUT /peer/sub/992", "", count)
+                   .on("POST /notif", xq("{ 'notificationId': 'noteid' }"))
+                   .on("POST /notif/noteid/user/someUser", "")
                    .applyTo(cdsClient);
                when(clients.getCDSClient()).thenReturn(cdsClient);
 
index df28f96..02c40a9 100644 (file)
@@ -91,27 +91,33 @@ public class PollingTest {
                steps = new CountDownLatch(6);
                (new ClientMocking())
                    .on("GET /peer?page=0&size=100", xq("{ 'content': [ { 'peerId': '1', 'self': true }, { 'peerId': '2' } ], 'last': true, 'number': 2, 'size': 100, 'numberOfElements': 2 }"), count)
-                   .on("GET /peer/2/sub", xq("[ { 'subId': 1, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }'}, { 'subId': 2, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600}, { 'subId': 3, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 0}, { 'subId': 4, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 0, 'processed': '2019-01-01T00:00:00Z' }]"), count)
-                   .on("GET /peer/sub/2", xq("{ 'subId': 2, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600}"), count)
-                   .on("GET /peer/sub/3", xq("{ 'subId': 3, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 0}"), count)
+                   .on("GET /peer/2/sub", xq("[ { 'subId': 1, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'userId': 'someuser'}, { 'subId': 2, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600, 'userId': 'someuser'}, { 'subId': 3, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 0, 'userId': 'someuser'}, { 'subId': 4, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 0, 'processed': '2019-01-01T00:00:00Z' , 'userId': 'someuser'}]"), count)
+                   .on("GET /peer/sub/2", xq("{ 'subId': 2, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600, 'userId': 'someuser'}"), count)
+                   .on("GET /peer/sub/3", xq("{ 'subId': 3, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 0, 'userId': 'someuser'}"), count)
                    .on("PUT /peer/sub/2", "", count)
                    .on("PUT /peer/sub/3", "", count)
+                   .on("POST /notif", xq("{ 'notificationId': 'noteid' }"))
+                   .on("POST /notif/noteid/user/someuser", "")
                    .applyTo(cdsClient);
                steps.await(6, TimeUnit.SECONDS);
                assertEquals("Incomplete steps remain", 0, steps.getCount());
                steps = new CountDownLatch(4);
                (new ClientMocking())
                    .on("GET /peer?page=0&size=100", xq("{ 'content': [ { 'peerId': '1', 'self': true }, { 'peerId': '2' } ], 'last': true, 'number': 2, 'size': 100, 'numberOfElements': 2 }"), count)
-                   .on("GET /peer/2/sub", xq("[ { 'subId': 1, 'peerId': '2', 'selector': '{ \\'catalogId\\': []}', 'refreshInterval': 1800 }, { 'subId': 2, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600}, { 'subId': 3, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }' } ]"), count)
-                   .on("GET /peer/sub/1", xq("{ 'subId': 1, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600}"), count)
+                   .on("GET /peer/2/sub", xq("[ { 'subId': 1, 'peerId': '2', 'selector': '{ \\'catalogId\\': []}', 'refreshInterval': 1800, 'userId': 'someuser' }, { 'subId': 2, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600, 'userId': 'someuser'}, { 'subId': 3, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'userId': 'someuser' } ]"), count)
+                   .on("GET /peer/sub/1", xq("{ 'subId': 1, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600, 'userId': 'someuser'}"), count)
                    .on("PUT /peer/sub/1", "", count)
+                   .on("POST /notif", xq("{ 'notificationId': 'noteid' }"))
+                   .on("POST /notif/noteid/user/someuser", "")
                    .applyTo(cdsClient);
                steps.await(3, TimeUnit.SECONDS);
                assertEquals("Incomplete steps remain", 0, steps.getCount());
                steps = new CountDownLatch(2);
                (new ClientMocking())
                    .on("GET /peer?page=0&size=100", xq("{ 'content': [ { 'peerId': '1', 'self': true }, { 'peerId': '2' } ], 'last': true, 'number': 2, 'size': 100, 'numberOfElements': 2 }"), count)
-                   .on("GET /peer/2/sub", xq("[ { 'subId': 2, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600}, { 'subId': 3, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }' } ]"), count)
+                   .on("GET /peer/2/sub", xq("[ { 'subId': 2, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'refreshInterval': 3600, 'userId': 'someuser'}, { 'subId': 3, 'peerId': '2', 'selector': '{ \\'catalogId\\': [] }', 'userId': 'someuser' } ]"), count)
+                   .on("POST /notif", xq("{ 'notificationId': 'noteid' }"))
+                   .on("POST /notif/noteid/user/someuser", "")
                    .applyTo(cdsClient);
                steps.await(3, TimeUnit.SECONDS);
                assertEquals("Incomplete steps remain", 0, steps.getCount());