Prevent oversize user notifications
[federation.git] / gateway / src / main / java / org / acumos / federation / gateway / SubscriptionPoller.java
1 /*-
2  * ===============LICENSE_START=======================================================
3  * Acumos
4  * ===================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6  * ===================================================================================
7  * This Acumos software file is distributed by AT&T and Tech Mahindra
8  * under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * This file is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ===============LICENSE_END=========================================================
19  */
20 package org.acumos.federation.gateway;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.lang.invoke.MethodHandles;
25 import java.time.Instant;
26 import java.util.Arrays;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.function.Function;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import com.fasterxml.jackson.core.type.TypeReference;
39 import com.fasterxml.jackson.databind.ObjectMapper;
40
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.scheduling.annotation.Scheduled;
43 import org.springframework.scheduling.TaskScheduler;
44 import org.springframework.web.client.ResourceAccessException;
45
46 import org.acumos.cds.client.ICommonDataServiceRestClient;
47 import org.acumos.cds.domain.MLPArtifact;
48 import org.acumos.cds.domain.MLPCatalog;
49 import org.acumos.cds.domain.MLPDocument;
50 import org.acumos.cds.domain.MLPNotification;
51 import org.acumos.cds.domain.MLPPeer;
52 import org.acumos.cds.domain.MLPPeerSubscription;
53 import org.acumos.cds.domain.MLPRevCatDescription;
54 import org.acumos.cds.domain.MLPSolution;
55 import org.acumos.cds.domain.MLPSolutionRevision;
56
57 import org.acumos.licensemanager.client.model.RegisterAssetRequest;
58 import org.acumos.licensemanager.client.model.RegisterAssetResponse;
59
60 import org.acumos.federation.client.FederationClient;
61 import org.acumos.federation.client.data.Solution;
62 import org.acumos.federation.client.data.SolutionRevision;
63
64 /**
65  * Service bean for periodic polling of subscriptions to other peer's catalogs.
66  */
67 public class SubscriptionPoller {
68         private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
69         @Autowired
70         private TaskScheduler scheduler;
71
72         @Autowired
73         private PeerService peerService;
74
75         @Autowired
76         private CatalogService catalogService;
77
78         @Autowired
79         private ContentService contentService;
80
81         @Autowired
82         private FederationConfig federation;
83
84         @Autowired
85         private Clients clients;
86
87         private static final ObjectMapper mapper = new ObjectMapper();
88         private static final TypeReference<Map<String, Object>> trMapStoO = new TypeReference<Map<String, Object>>(){};
89
90         private static <T> HashMap<String, T> index(List<T> list, Function<T, String> getId) {
91                 HashMap<String, T> ret = new HashMap<>();
92                 for (T item: list) {
93                         ret.put(getId.apply(item), item);
94                 }
95                 return ret;
96         }
97
98         private enum Action {
99                 PROCESS("Processed", "processing"),
100                 FETCH("Fetched", "fetching"),
101                 PARSE("Parsed", "parsing"),
102                 CREATE("Created", "creating"),
103                 UPDATE("Updated", "updating"),
104                 ADD("Added", "adding"),
105                 DELETE("Deleted", "deleting"),
106                 COPY("Copied", "copying");
107
108                 private String done;
109                 private String during;
110
111                 public String getDone() {
112                         return(done);
113                 }
114
115                 public String getDuring() {
116                         return(during);
117                 }
118
119                 Action(String done, String during) {
120                         this.done = done;
121                         this.during = during;
122                 }
123         }
124
125         private static class PendingAction      {
126                 private PendingAction parent;
127                 private Action action;
128                 private String item;
129                 private boolean force;
130                 private Instant start = Instant.now();
131
132                 public PendingAction(PendingAction parent, Action action, String item, boolean force) {
133                         this.parent = parent;
134                         this.action = action;
135                         this.item = " " + item;
136                         this.force = force;
137                 }
138
139                 public void setForce() {
140                         this.force = true;
141                 }
142
143                 public boolean getForce() {
144                         return(this.force);
145                 }
146
147                 public PendingAction pop() {
148                         PendingAction ret = parent;
149                         parent = null;
150                         return(ret);
151                 }
152
153                 public String getDone() {
154                         return(action.getDone() + item);
155                 }
156
157                 public String getDuring() {
158                         return(action.getDuring() + item);
159                 }
160
161                 public String getItem() {
162                         return(item);
163                 }
164                 
165                 public Instant getStart() {
166                         return(start);
167                 }
168         }
169
170         private static class Notifier   {
171                 private PendingAction actions;
172                 private PendingAction leaf;
173                 private ICommonDataServiceRestClient cds;
174                 private String userId;
175
176                 public Notifier(ICommonDataServiceRestClient cds, String userId) {
177                         this.cds = cds;
178                         this.userId = userId;
179                 }
180
181                 public PendingAction begin(String item, Object... args) {
182                         end();
183                         actions = new PendingAction(actions, Action.PROCESS, String.format(item, args), false);
184                         return actions;
185                 }
186
187                 public void noteEnd(PendingAction handle) {
188                         handle.setForce();
189                         end(handle);
190                 }
191
192                 public void end(PendingAction handle) {
193                         end();
194                         do {
195                                 leaf = actions;
196                                 actions = leaf.pop();
197                         } while (end() != handle);
198                 }
199
200                 public void check(Action action, String item, Object... args) {
201                         end();
202                         leaf = new PendingAction(null, action, String.format(item, args), false);
203                 }
204
205                 public void action(Action action, String item, Object... args) {
206                         end();
207                         leaf = new PendingAction(null, action, String.format(item, args), true);
208                 }
209
210                 /*
211                  * MLPNotification.java limits the lengths of the title
212                  * and message fields.
213                  * However, it doesn't define constants with these values, and
214                  * the setters are perfectly happy accepting oversize values.
215                  * We limit ourselves to use significantly less than those
216                  * limits so we don't run into any off-by-one type errors and
217                  * will use at most 73 characters in titles and at most 2003
218                  * characters in messages (the 3 allows for an elipsis ...)
219                  */
220                 private static final int NOTIF_TITLE_MAX_USED = 73;
221                 private static final int NOTIF_MESSAGE_MAX_USED = 2003;
222                 private static final String ELIPSIS = "...";
223                 private static final int ELIPSIS_LENGTH = ELIPSIS.length();
224
225                 private void note(PendingAction cur, String sev, String msg) {
226                         String item = cur.getItem().strip();
227                         if (item.length() > NOTIF_TITLE_MAX_USED) {
228                                 msg = item + " - " + msg;
229                                 item = item.substring(0, NOTIF_TITLE_MAX_USED - ELIPSIS_LENGTH) + ELIPSIS;
230                         }
231                         if (msg.length() > NOTIF_MESSAGE_MAX_USED) {
232                                 msg = msg.substring(0, NOTIF_MESSAGE_MAX_USED - ELIPSIS_LENGTH) + ELIPSIS;
233                         }
234                         MLPNotification note = new MLPNotification(item, sev, cur.getStart(), Instant.now());
235                         note.setMessage(msg);
236                         try {
237                                 cds.addUserToNotification(cds.createNotification(note).getNotificationId(), userId);
238                         } catch (Exception e) {
239                                 log.error("Error notifying user of federation actions", e);
240                         }
241                 }
242
243                 public PendingAction end() {
244                         if (leaf != null) {
245                                 PendingAction cur = leaf;
246                                 boolean logit = leaf.getForce();
247                                 leaf = null;
248                                 if (logit) {
249                                         note(cur, "LO", cur.getDone());
250                                 }
251                                 return(cur);
252                         }
253                         return(null);
254                 }
255
256                 public void fail(PendingAction handle, String msg) {
257                         String format = "Error occurred while %s: %s";
258                         String sev = "HI";
259                         PendingAction cur = null;
260                         do {
261                                 cur = leaf;
262                                 leaf = null;
263                                 if (cur == null) {
264                                         cur = actions;
265                                         actions = cur.pop();
266                                 }
267                                 String during = cur.getDuring();
268                                 note(cur, sev, String.format(format, during, msg));
269                                 msg = during;
270                                 format = "While %s, an error occurred: %s";
271                                 sev = "ME";
272                         } while (cur != handle && actions != null);
273                 }
274         }
275
276         private class PeerSubscriptionPoller implements Runnable {
277                 private long subId;
278                 private String userId;
279                 private String peerId;
280                 private Long interval;
281                 private ScheduledFuture future;
282                 private Notifier events;
283
284                 public PeerSubscriptionPoller(long subId, String userId, String peerId, Long interval) {
285                         this.subId = subId;
286                         this.userId = userId;
287                         this.peerId = peerId;
288                         this.interval = interval;
289                 }
290
291                 public Long getInterval() {
292                         return interval;
293                 }
294
295                 public void setInterval(Long interval) {
296                         this.interval = interval;
297                 }
298
299                 public synchronized void cancel() {
300                         if (future != null) {
301                                 future.cancel(false);
302                                 future = null;
303                         }
304                 }
305
306                 public synchronized void schedule() {
307                         if (interval == null || interval.longValue() <= 0L) {
308                                 scheduler.schedule(this, Instant.now());
309                         } else {
310                                 future = scheduler.scheduleAtFixedRate(this, 1000L * interval.longValue());
311                         }
312                 }
313
314                 private boolean checkRevision(String revisionId, String solutionId, String catalogId, FederationClient peer) {
315                         log.info("Checking revision {} from peer {}", revisionId, peerId);
316                         PendingAction act = events.begin("revision %s", revisionId);
317                         events.check(Action.FETCH, "remote revision");
318                         SolutionRevision pRev = (SolutionRevision)peer.getSolutionRevision(solutionId, revisionId, catalogId);
319                         events.check(Action.FETCH, "local revision");
320                         SolutionRevision lRev = (SolutionRevision)catalogService.getRevision(revisionId, catalogId);
321                         boolean changed = false;
322                         boolean isnew = lRev == null;
323                         pRev.setUserId(userId);
324                         if (isnew) {
325                                 log.info("Revision {} doesn't exist locally.  Creating it", revisionId);
326                                 pRev.setSourceId(peerId);
327                                 events.action(Action.CREATE, "revision %s", revisionId);
328                                 lRev = (SolutionRevision)catalogService.createRevision(pRev);
329                         }
330                         MLPRevCatDescription pDesc = pRev.getRevCatDescription();
331                         MLPRevCatDescription lDesc = lRev.getRevCatDescription();
332                         if (pDesc != null) {
333                                 if (lDesc == null) {
334                                         log.info("Description for revision {} in catalog {} doesn't exist locally.  Creating it", revisionId, catalogId);
335                                         events.action(Action.CREATE, "revision description");
336                                         catalogService.createDescription(pDesc);
337                                 } else if (!Objects.equals(pDesc.getDescription(), lDesc.getDescription())) {
338                                         log.info("Updating description for revision {} in catalog {}", revisionId, catalogId);
339                                         events.action(Action.UPDATE, "revision description");
340                                         catalogService.updateDescription(pDesc);
341                                 }
342                         } else if (lDesc != null) {
343                                 log.info("Deleting old description for revision {} in catalog {}", revisionId, catalogId);
344                                 events.action(Action.DELETE, "revision description");
345                                 catalogService.deleteDescription(revisionId, catalogId);
346                         }
347                         List<MLPArtifact> pArts = pRev.getArtifacts();
348                         HashMap<String, MLPArtifact> lArts = index(lRev.getArtifacts(), MLPArtifact::getArtifactId);
349                         for (MLPArtifact pArt: pArts) {
350                                 String artifactId = pArt.getArtifactId();
351                                 log.debug("Checking artifact {} from peer {}", artifactId, peerId);
352                                 String pTag = pArt.getDescription();
353                                 pArt.setUserId(userId);
354                                 contentService.setArtifactUri(solutionId, pArt);
355                                 MLPArtifact lArt = lArts.get(artifactId);
356                                 if (lArt == null) {
357                                         events.check(Action.FETCH, "local artifact %s metadata", artifactId);
358                                         lArt = catalogService.getArtifact(artifactId);
359                                 }
360                                 if (lArt == null) {
361                                         log.info("Artifact {} doesn't exist locally.  Creating it", artifactId);
362                                         events.action(Action.CREATE, "artifact %s metadata", artifactId);
363                                         lArt = catalogService.createArtifact(pArt);
364                                 } else if (!Objects.equals(pArt.getSize(), lArt.getSize()) || !Objects.equals(pArt.getVersion(), lArt.getVersion())) {
365                                         log.info("Updating artifact {}", artifactId);
366                                         events.action(Action.UPDATE, "artifact %s metadata", artifactId);
367                                         catalogService.updateArtifact(pArt);
368                                 } else {
369                                         continue;
370                                 }
371                                 changed = true;
372                                 events.check(Action.FETCH, "artifact %s content", artifactId);
373                                 try (InputStream is = peer.getArtifactContent(artifactId)) {
374                                         events.action(Action.COPY, "artifact %s content", artifactId);
375                                         contentService.putArtifactContent(pArt, pTag, is);
376                                 } catch (IOException ioe) {
377                                         throw new ResourceAccessException("Failure copying artifact " + artifactId + " from peer " + peerId, ioe);
378                                 }
379                         }
380                         for (MLPArtifact pArt: pArts) {
381                                 if (lArts.get(pArt.getArtifactId()) == null) {
382                                         log.info("Adding artifact {} to revision {}", pArt.getArtifactId(), revisionId);
383                                         events.action(Action.ADD, "artifact %s to revision %s", pArt.getArtifactId(), revisionId);
384                                         catalogService.addArtifact(solutionId, revisionId, pArt.getArtifactId());
385                                 }
386                         }
387                         List<MLPDocument> pDocs = pRev.getDocuments();
388                         HashMap<String, MLPDocument> lDocs = index(lRev.getDocuments(), MLPDocument::getDocumentId);
389                         for (MLPDocument pDoc: pDocs) {
390                                 String documentId = pDoc.getDocumentId();
391                                 log.debug("Checking document {} from peer {}", documentId, peerId);
392                                 pDoc.setUserId(userId);
393                                 contentService.setDocumentUri(solutionId, pDoc);
394                                 MLPDocument lDoc = lDocs.get(documentId);
395                                 if (lDoc == null) {
396                                         events.check(Action.FETCH, "local document %s metadata", documentId);
397                                         lDoc = catalogService.getDocument(documentId);
398                                 }
399                                 if (lDoc == null) {
400                                         log.info("Document {} doesn't exist locally.  Creating it", documentId);
401                                         events.action(Action.CREATE, "document %s metadata", documentId);
402                                         catalogService.createDocument(pDoc);
403                                 } else if (!Objects.equals(pDoc.getSize(), lDoc.getSize()) || !Objects.equals(pDoc.getVersion(), lDoc.getVersion())) {
404                                         log.info("Updating document {}", documentId);
405                                         events.action(Action.UPDATE, "document %s metadata", documentId);
406                                         catalogService.updateDocument(pDoc);
407                                 } else {
408                                         continue;
409                                 }
410                                 events.check(Action.FETCH, "document %s content", documentId);
411                                 try (InputStream is = peer.getDocumentContent(documentId)) {
412                                         events.action(Action.COPY, "document %s content", documentId);
413                                         contentService.putDocumentContent(pDoc, is);
414                                 } catch (IOException ioe) {
415                                         throw new ResourceAccessException("Failure copying document " + documentId + " from peer " + peerId, ioe);
416                                 }
417                         }
418                         for (MLPDocument pDoc: pDocs) {
419                                 if (lDocs.get(pDoc.getDocumentId()) == null) {
420                                         log.info("Adding document {} to revision {} in catalog {}", pDoc.getDocumentId(), revisionId, catalogId);
421                                         events.action(Action.ADD, "document %s to revision %s in catalog %s", pDoc.getDocumentId(), revisionId, catalogId);
422                                         catalogService.addDocument(revisionId, catalogId, pDoc.getDocumentId());
423                                 }
424                         }
425                         changed |= isnew;
426                         if (changed && !isnew) {
427                                 events.action(Action.UPDATE, "revision %s", revisionId);
428                                 catalogService.updateRevision(pRev);
429                         }
430                         if (changed) {
431                                 new Thread(() -> {
432                                         try {
433                                                 clients.getSVClient().securityVerificationScan(solutionId, revisionId, "created", userId);
434                                         } catch (Exception e) {
435                                                 log.error("SV scan failure on revision " + revisionId, e);
436                                         }
437                                 }).start();
438                                 new Thread(() -> {
439                                         try {
440                                                 RegisterAssetRequest rar = new RegisterAssetRequest();
441                                                 rar.setSolutionId(solutionId);
442                                                 rar.setRevisionId(revisionId);
443                                                 rar.setLoggedIdUser(userId);
444                                                 RegisterAssetResponse rax = clients.getLMClient().register(rar).get();
445                                                 if (!rax.isSuccess()) {
446                                                         log.error("License asset registration failure on revision " + revisionId + ": " + rax.getMessage());
447                                                 }
448                                         } catch (Exception e) {
449                                                 log.error("License asset registration failure on revision " + revisionId, e);
450                                         }
451                                 }).start();
452                         }
453                         events.end(act);
454                         return(changed);
455                 }
456
457                 private void checkSolution(String solutionId, String catalogId, boolean inLocalCatalog, FederationClient peer) {
458                         log.info("Checking solution {} from peer {}", solutionId, peerId);
459                         PendingAction act = events.begin("solution %s", solutionId);
460                         events.check(Action.FETCH, "remote solution");
461                         Solution pSol = (Solution)peer.getSolution(solutionId);
462                         events.check(Action.FETCH, "local solution");
463                         Solution lSol = (Solution)catalogService.getSolution(solutionId);
464                         boolean changed = false;
465                         boolean isnew = lSol == null;
466                         if (isnew) {
467                                 log.info("Solution {} doesn't exist locally.  Creating it", solutionId);
468                                 pSol.setDownloadCount(0L);
469                                 pSol.setFeatured(false);
470                                 pSol.setRatingAverageTenths(0L);
471                                 pSol.setRatingCount(0L);
472                                 pSol.setSourceId(peerId);
473                                 pSol.setUserId(userId);
474                                 pSol.setViewCount(0L);
475                                 events.action(Action.CREATE, "solution %s", solutionId);
476                                 lSol = (Solution)catalogService.createSolution(pSol);
477                         } else {
478                                 pSol.setActive(lSol.isActive());
479                                 pSol.setDownloadCount(lSol.getDownloadCount());
480                                 pSol.setFeatured(lSol.isFeatured());
481                                 pSol.setMetadata(lSol.getMetadata());
482                                 pSol.setName(lSol.getName());
483                                 pSol.setOrigin(lSol.getOrigin());
484                                 pSol.setRatingAverageTenths(lSol.getRatingAverageTenths());
485                                 pSol.setRatingCount(lSol.getRatingCount());
486                                 pSol.setToolkitTypeCode(lSol.getToolkitTypeCode());
487                                 pSol.setViewCount(lSol.getViewCount());
488                                 if (lSol.getSourceId() == null) {
489                                         pSol.setSourceId(null);
490                                         pSol.setUserId(lSol.getUserId());
491                                 } else {
492                                         pSol.setSourceId(peerId);
493                                         pSol.setUserId(userId);
494                                 }
495                                 changed |= !pSol.getTags().equals(lSol.getTags());
496                                 changed |= !Objects.equals(lSol.getSourceId(), pSol.getSourceId());
497                                 changed |= !Objects.equals(lSol.getUserId(), pSol.getUserId());
498                         }
499                         if (!Arrays.equals(lSol.getPicture(), pSol.getPicture())) {
500                                 log.info("Updating picture for solution {}", solutionId);
501                                 events.action(Action.UPDATE, "picture for solution %s", solutionId);
502                                 catalogService.savePicture(solutionId, pSol.getPicture());
503                         }
504                         if (!inLocalCatalog) {
505                                 log.info("Adding solution {} to catalog {}", solutionId, catalogId);
506                                 events.action(Action.ADD, "solution %s to catalog %s", solutionId, catalogId);
507                                 catalogService.addSolution(solutionId, catalogId);
508                         }
509                         for (MLPSolutionRevision rev: pSol.getRevisions()) {
510                                 changed |= checkRevision(rev.getRevisionId(), solutionId, catalogId, peer);
511                         }
512                         if (changed && !isnew) {
513                                 events.action(Action.UPDATE, "solution %s", solutionId);
514                                 catalogService.updateSolution(pSol);
515                                 log.info("Updated solution {} from peer {}", solutionId, peerId);
516                         }
517                         events.end(act);
518                 }
519
520                 private void checkCatalog(String catalogId) {
521                         log.info("Checking catalog {} from peer {}", catalogId, peerId);
522                         PendingAction act = events.begin("catalog %s from peer %s", catalogId, peerId);
523                         FederationClient peer = clients.getFederationClient(peerService.getPeer(peerId).getApiUrl());
524                         events.check(Action.FETCH, "list of solutions in remote catalog");
525                         List<MLPSolution> peerSolutions = peer.getSolutions(catalogId);
526                         events.check(Action.FETCH, "list of solutions in local catalog");
527                         HashMap<String, MLPSolution> localSolutions = index(catalogService.getSolutions(catalogId), MLPSolution::getSolutionId);
528                         if (localSolutions.isEmpty() && !peerSolutions.isEmpty() && index(catalogService.getAllCatalogs(), MLPCatalog::getCatalogId).get(catalogId) == null) {
529                                 log.info("Catalog {} doesn't exist locally.  Creating it", catalogId);
530                                 events.action(Action.CREATE, "catalog %s", catalogId);
531                                 catalogService.createCatalog(index(peer.getCatalogs(), MLPCatalog::getCatalogId).get(catalogId));
532                                 events.end();
533                         }
534                         for (MLPSolution solution: peerSolutions) {
535                                 checkSolution(solution.getSolutionId(), catalogId, localSolutions.get(solution.getSolutionId()) != null, peer);
536                         }
537                         log.info("Checked catalog {} from peer {}", catalogId, peerId);
538                         events.noteEnd(act);
539                 }
540
541                 private void checkSubscription() {
542                         log.info("Processing subscription {} for peer {}", subId, peerId);
543                         PendingAction act = events.begin("subscription %s for peer %s", subId, peerId);
544                         events.check(Action.FETCH, "subscription %s", subId);
545                         MLPPeerSubscription subscription = peerService.getSubscription(subId);
546                         events.check(Action.PARSE, "subscription's selector");
547                         Object xcatalogs;
548                         try {
549                                 xcatalogs = ((Map<String, Object>)mapper.readValue(subscription.getSelector(), trMapStoO)).get("catalogId");
550                                 if (xcatalogs instanceof String) {
551                                         xcatalogs = new String[] { (String)xcatalogs };
552                                 } else {
553                                         xcatalogs = ((List)xcatalogs).toArray(new String[((List)xcatalogs).size()]);
554                                 }
555                         } catch (IOException | NullPointerException | ArrayStoreException | ClassCastException ioe) {
556                                 log.error(String.format("Malformed selector %s on subscription %s to peer %s", subscription.getSelector(), subId, peerId));
557                                 events.fail(act, "Subscription selector was malformed");
558                                 return;
559                         }
560                         events.end();
561                         String[] catalogs = (String[])xcatalogs;
562                         Instant startTime = Instant.now();
563                         for (String catalogId: catalogs) {
564                                 checkCatalog(catalogId);
565                         }
566                         subscription.setProcessed(startTime);
567                         peerService.updateSubscription(subscription);
568                         log.info("Subscription {} processed for peer {}", subId, peerId);
569                         events.end(act);
570                 }
571
572                 public void run() {
573                         events = new Notifier(clients.getCDSClient(), userId);
574                         try {
575                                 checkSubscription();
576                         } catch (Exception ex) {
577                                 log.error(String.format("Unexpected error processing subscription %s for peer %s", subId, peerId), ex);
578                                 events.fail(null, ex.toString());
579                         }
580                 }
581         }
582
583
584         private HashMap<Long, PeerSubscriptionPoller> subscriptions = new HashMap<>();
585
586         /**
587          * Schedule an immediate poll of the specified subscription.
588          * @param subscription The subscription to poll.
589          */
590         public void
591         triggerSubscription(MLPPeerSubscription subscription) {
592                 require(subscription, true);
593         }
594
595         /**
596          * Check for changes to the list of peers and subscriptions.
597          */
598         @Scheduled(initialDelay=5000,fixedRateString="${peer.jobchecker.interval:400}000")
599         public void checkPeerJobs() {
600                 HashSet<Long> valid = new HashSet<>();
601                 for (MLPPeer peer: peerService.getPeers()) {
602                         if (peer.isSelf()) {
603                                 continue;
604                         }
605                         for (MLPPeerSubscription subscription: peerService.getSubscriptions(peer.getPeerId())) {
606                                 require(subscription, false);
607                                 valid.add(subscription.getSubId());
608                         }
609                 }
610                 HashSet<Long> invalid = new HashSet<>();
611                 for (Long subid: subscriptions.keySet()) {
612                         if (!valid.contains(subid)) {
613                                 invalid.add(subid);
614                         }
615                 }
616                 for (Long subid: invalid) {
617                         subscriptions.remove(subid).cancel();
618                 }
619         }
620
621
622         private void require(MLPPeerSubscription subscription, boolean force) {
623                 Long subId = subscription.getSubId();
624                 Long interval = subscription.getRefreshInterval();
625                 PeerSubscriptionPoller poller = subscriptions.get(subId);
626                 if (poller == null) {
627                         poller = new PeerSubscriptionPoller(subId, subscription.getUserId(), subscription.getPeerId(), interval);
628                         subscriptions.put(subId, poller);
629                         if (force || (interval != null && (interval.longValue() > 0L || subscription.getProcessed() == null))) {
630                                 poller.schedule();
631                         }
632                 } else if (force || (interval != null && !interval.equals(poller.getInterval()))) {
633                         poller.cancel();
634                         poller.setInterval(interval);
635                         poller.schedule();
636                 } else if (interval == null && poller.getInterval() != null) {
637                         poller.cancel();
638                         poller.setInterval(interval);
639                 }
640         }
641 }