1d46ef36ca9f967097c6a0a356ab6210e03f3f1c
[federation.git] / gateway / src / main / java / org / acumos / federation / gateway / SubscriptionPoller.java
1 /*-
2  * ===============LICENSE_START=======================================================
3  * Acumos
4  * ===================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6  * ===================================================================================
7  * This Acumos software file is distributed by AT&T and Tech Mahindra
8  * under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * This file is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ===============LICENSE_END=========================================================
19  */
20 package org.acumos.federation.gateway;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.lang.invoke.MethodHandles;
25 import java.time.Instant;
26 import java.util.Arrays;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.function.Function;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import com.fasterxml.jackson.core.type.TypeReference;
39 import com.fasterxml.jackson.databind.ObjectMapper;
40
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.scheduling.annotation.Scheduled;
43 import org.springframework.scheduling.TaskScheduler;
44 import org.springframework.web.client.ResourceAccessException;
45
46 import org.acumos.cds.client.ICommonDataServiceRestClient;
47 import org.acumos.cds.domain.MLPArtifact;
48 import org.acumos.cds.domain.MLPCatalog;
49 import org.acumos.cds.domain.MLPDocument;
50 import org.acumos.cds.domain.MLPNotification;
51 import org.acumos.cds.domain.MLPPeer;
52 import org.acumos.cds.domain.MLPPeerSubscription;
53 import org.acumos.cds.domain.MLPRevCatDescription;
54 import org.acumos.cds.domain.MLPSolution;
55 import org.acumos.cds.domain.MLPSolutionRevision;
56
57 import org.acumos.licensemanager.client.model.RegisterAssetRequest;
58 import org.acumos.licensemanager.client.model.RegisterAssetResponse;
59
60 import org.acumos.federation.client.FederationClient;
61 import org.acumos.federation.client.data.Solution;
62 import org.acumos.federation.client.data.SolutionRevision;
63
64 /**
65  * Service bean for periodic polling of subscriptions to other peer's catalogs.
66  */
67 public class SubscriptionPoller {
68         private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
69         @Autowired
70         private TaskScheduler scheduler;
71
72         @Autowired
73         private PeerService peerService;
74
75         @Autowired
76         private CatalogService catalogService;
77
78         @Autowired
79         private ContentService contentService;
80
81         @Autowired
82         private FederationConfig federation;
83
84         @Autowired
85         private Clients clients;
86
87         private static final ObjectMapper mapper = new ObjectMapper();
88         private static final TypeReference<Map<String, Object>> trMapStoO = new TypeReference<Map<String, Object>>(){};
89
90         private static <T> HashMap<String, T> index(List<T> list, Function<T, String> getId) {
91                 HashMap<String, T> ret = new HashMap<>();
92                 for (T item: list) {
93                         ret.put(getId.apply(item), item);
94                 }
95                 return ret;
96         }
97
98         private enum Action {
99                 PROCESS("Processed", "processing"),
100                 FETCH("Fetched", "fetching"),
101                 PARSE("Parsed", "parsing"),
102                 CREATE("Created", "creating"),
103                 UPDATE("Updated", "updating"),
104                 ADD("Added", "adding"),
105                 DELETE("Deleted", "deleting"),
106                 COPY("Copied", "copying");
107
108                 private String done;
109                 private String during;
110
111                 public String getDone() {
112                         return(done);
113                 }
114
115                 public String getDuring() {
116                         return(during);
117                 }
118
119                 Action(String done, String during) {
120                         this.done = done;
121                         this.during = during;
122                 }
123         }
124
125         private static class PendingAction      {
126                 private PendingAction parent;
127                 private Action action;
128                 private String item;
129                 private boolean force;
130                 private Instant start = Instant.now();
131
132                 public PendingAction(PendingAction parent, Action action, String item, boolean force) {
133                         this.parent = parent;
134                         this.action = action;
135                         this.item = " " + item;
136                         this.force = force;
137                 }
138
139                 public void setForce() {
140                         this.force = true;
141                 }
142
143                 public boolean getForce() {
144                         return(this.force);
145                 }
146
147                 public PendingAction pop() {
148                         PendingAction ret = parent;
149                         parent = null;
150                         return(ret);
151                 }
152
153                 public String getDone() {
154                         return(action.getDone() + item);
155                 }
156
157                 public String getDuring() {
158                         return(action.getDuring() + item);
159                 }
160
161                 public String getItem() {
162                         return(item);
163                 }
164                 
165                 public Instant getStart() {
166                         return(start);
167                 }
168         }
169
170         private static class Notifier   {
171                 private PendingAction actions;
172                 private PendingAction leaf;
173                 private ICommonDataServiceRestClient cds;
174                 private String userId;
175
176                 public Notifier(ICommonDataServiceRestClient cds, String userId) {
177                         this.cds = cds;
178                         this.userId = userId;
179                 }
180
181                 public PendingAction begin(String item, Object... args) {
182                         end();
183                         actions = new PendingAction(actions, Action.PROCESS, String.format(item, args), false);
184                         return actions;
185                 }
186
187                 public void noteEnd(PendingAction handle) {
188                         handle.setForce();
189                         end(handle);
190                 }
191
192                 public void end(PendingAction handle) {
193                         end();
194                         do {
195                                 leaf = actions;
196                                 actions = leaf.pop();
197                         } while (end() != handle);
198                 }
199
200                 public void check(Action action, String item, Object... args) {
201                         end();
202                         leaf = new PendingAction(null, action, String.format(item, args), false);
203                 }
204
205                 public void action(Action action, String item, Object... args) {
206                         end();
207                         leaf = new PendingAction(null, action, String.format(item, args), true);
208                 }
209
210                 private void note(PendingAction cur, String sev, String msg) {
211                         MLPNotification note = new MLPNotification(cur.getItem(), sev, cur.getStart(), Instant.now());
212                         note.setMessage(msg);
213                         cds.addUserToNotification(cds.createNotification(note).getNotificationId(), userId);
214                 }
215
216                 public PendingAction end() {
217                         if (leaf != null) {
218                                 PendingAction cur = leaf;
219                                 boolean logit = leaf.getForce();
220                                 leaf = null;
221                                 if (logit) {
222                                         note(cur, "LO", cur.getDone());
223                                 }
224                                 return(cur);
225                         }
226                         return(null);
227                 }
228
229                 public void fail(PendingAction handle, String msg) {
230                         String format = "Error occurred while %s: %s";
231                         String sev = "HI";
232                         PendingAction cur = null;
233                         do {
234                                 cur = leaf;
235                                 leaf = null;
236                                 if (cur == null) {
237                                         cur = actions;
238                                         actions = cur.pop();
239                                 }
240                                 String during = cur.getDuring();
241                                 note(cur, sev, String.format(format, during, msg));
242                                 msg = during;
243                                 format = "While %s, an error occurred: %s";
244                                 sev = "ME";
245                         } while (cur != handle && actions != null);
246                 }
247         }
248
249         private class PeerSubscriptionPoller implements Runnable {
250                 private long subId;
251                 private String userId;
252                 private String peerId;
253                 private Long interval;
254                 private ScheduledFuture future;
255                 private Notifier events;
256
257                 public PeerSubscriptionPoller(long subId, String userId, String peerId, Long interval) {
258                         this.subId = subId;
259                         this.userId = userId;
260                         this.peerId = peerId;
261                         this.interval = interval;
262                 }
263
264                 public Long getInterval() {
265                         return interval;
266                 }
267
268                 public void setInterval(Long interval) {
269                         this.interval = interval;
270                 }
271
272                 public synchronized void cancel() {
273                         if (future != null) {
274                                 future.cancel(false);
275                                 future = null;
276                         }
277                 }
278
279                 public synchronized void schedule() {
280                         if (interval == null || interval.longValue() <= 0L) {
281                                 scheduler.schedule(this, Instant.now());
282                         } else {
283                                 future = scheduler.scheduleAtFixedRate(this, 1000L * interval.longValue());
284                         }
285                 }
286
287                 private boolean checkRevision(String revisionId, String solutionId, String catalogId, FederationClient peer) {
288                         log.info("Checking revision {} from peer {}", revisionId, peerId);
289                         PendingAction act = events.begin("revision %s", revisionId);
290                         events.check(Action.FETCH, "remote revision");
291                         SolutionRevision pRev = (SolutionRevision)peer.getSolutionRevision(solutionId, revisionId, catalogId);
292                         events.check(Action.FETCH, "local revision");
293                         SolutionRevision lRev = (SolutionRevision)catalogService.getRevision(revisionId, catalogId);
294                         boolean changed = false;
295                         boolean isnew = lRev == null;
296                         pRev.setUserId(userId);
297                         if (isnew) {
298                                 log.info("Revision {} doesn't exist locally.  Creating it", revisionId);
299                                 pRev.setSourceId(peerId);
300                                 events.action(Action.CREATE, "revision %s", revisionId);
301                                 lRev = (SolutionRevision)catalogService.createRevision(pRev);
302                         }
303                         MLPRevCatDescription pDesc = pRev.getRevCatDescription();
304                         MLPRevCatDescription lDesc = lRev.getRevCatDescription();
305                         if (pDesc != null) {
306                                 if (lDesc == null) {
307                                         log.info("Description for revision {} in catalog {} doesn't exist locally.  Creating it", revisionId, catalogId);
308                                         events.action(Action.CREATE, "revision description");
309                                         catalogService.createDescription(pDesc);
310                                 } else if (!Objects.equals(pDesc.getDescription(), lDesc.getDescription())) {
311                                         log.info("Updating description for revision {} in catalog {}", revisionId, catalogId);
312                                         events.action(Action.UPDATE, "revision description");
313                                         catalogService.updateDescription(pDesc);
314                                 }
315                         } else if (lDesc != null) {
316                                 log.info("Deleting old description for revision {} in catalog {}", revisionId, catalogId);
317                                 events.action(Action.DELETE, "revision description");
318                                 catalogService.deleteDescription(revisionId, catalogId);
319                         }
320                         List<MLPArtifact> pArts = pRev.getArtifacts();
321                         HashMap<String, MLPArtifact> lArts = index(lRev.getArtifacts(), MLPArtifact::getArtifactId);
322                         for (MLPArtifact pArt: pArts) {
323                                 String artifactId = pArt.getArtifactId();
324                                 log.debug("Checking artifact {} from peer {}", artifactId, peerId);
325                                 String pTag = pArt.getDescription();
326                                 pArt.setUserId(userId);
327                                 contentService.setArtifactUri(solutionId, pArt);
328                                 MLPArtifact lArt = lArts.get(artifactId);
329                                 if (lArt == null) {
330                                         events.check(Action.FETCH, "local artifact %s metadata", artifactId);
331                                         lArt = catalogService.getArtifact(artifactId);
332                                 }
333                                 if (lArt == null) {
334                                         log.info("Artifact {} doesn't exist locally.  Creating it", artifactId);
335                                         events.action(Action.CREATE, "artifact %s metadata", artifactId);
336                                         lArt = catalogService.createArtifact(pArt);
337                                 } else if (!Objects.equals(pArt.getSize(), lArt.getSize()) || !Objects.equals(pArt.getVersion(), lArt.getVersion())) {
338                                         log.info("Updating artifact {}", artifactId);
339                                         events.action(Action.UPDATE, "artifact %s metadata", artifactId);
340                                         catalogService.updateArtifact(pArt);
341                                 } else {
342                                         continue;
343                                 }
344                                 changed = true;
345                                 events.check(Action.FETCH, "artifact %s content", artifactId);
346                                 try (InputStream is = peer.getArtifactContent(artifactId)) {
347                                         events.action(Action.COPY, "artifact %s content", artifactId);
348                                         contentService.putArtifactContent(pArt, pTag, is);
349                                 } catch (IOException ioe) {
350                                         throw new ResourceAccessException("Failure copying artifact " + artifactId + " from peer " + peerId, ioe);
351                                 }
352                         }
353                         for (MLPArtifact pArt: pArts) {
354                                 if (lArts.get(pArt.getArtifactId()) == null) {
355                                         log.info("Adding artifact {} to revision {}", pArt.getArtifactId(), revisionId);
356                                         events.action(Action.ADD, "artifact %s to revision %s", pArt.getArtifactId(), revisionId);
357                                         catalogService.addArtifact(solutionId, revisionId, pArt.getArtifactId());
358                                 }
359                         }
360                         List<MLPDocument> pDocs = pRev.getDocuments();
361                         HashMap<String, MLPDocument> lDocs = index(lRev.getDocuments(), MLPDocument::getDocumentId);
362                         for (MLPDocument pDoc: pDocs) {
363                                 String documentId = pDoc.getDocumentId();
364                                 log.debug("Checking document {} from peer {}", documentId, peerId);
365                                 pDoc.setUserId(userId);
366                                 contentService.setDocumentUri(solutionId, pDoc);
367                                 MLPDocument lDoc = lDocs.get(documentId);
368                                 if (lDoc == null) {
369                                         events.check(Action.FETCH, "local document %s metadata", documentId);
370                                         lDoc = catalogService.getDocument(documentId);
371                                 }
372                                 if (lDoc == null) {
373                                         log.info("Document {} doesn't exist locally.  Creating it", documentId);
374                                         events.action(Action.CREATE, "document %s metadata", documentId);
375                                         catalogService.createDocument(pDoc);
376                                 } else if (!Objects.equals(pDoc.getSize(), lDoc.getSize()) || !Objects.equals(pDoc.getVersion(), lDoc.getVersion())) {
377                                         log.info("Updating document {}", documentId);
378                                         events.action(Action.UPDATE, "document %s metadata", documentId);
379                                         catalogService.updateDocument(pDoc);
380                                 } else {
381                                         continue;
382                                 }
383                                 events.check(Action.FETCH, "document %s content", documentId);
384                                 try (InputStream is = peer.getDocumentContent(documentId)) {
385                                         events.action(Action.COPY, "document %s content", documentId);
386                                         contentService.putDocumentContent(pDoc, is);
387                                 } catch (IOException ioe) {
388                                         throw new ResourceAccessException("Failure copying document " + documentId + " from peer " + peerId, ioe);
389                                 }
390                         }
391                         for (MLPDocument pDoc: pDocs) {
392                                 if (lDocs.get(pDoc.getDocumentId()) == null) {
393                                         log.info("Adding document {} to revision {} in catalog {}", pDoc.getDocumentId(), revisionId, catalogId);
394                                         events.action(Action.ADD, "document %s to revision %s in catalog %s", pDoc.getDocumentId(), revisionId, catalogId);
395                                         catalogService.addDocument(revisionId, catalogId, pDoc.getDocumentId());
396                                 }
397                         }
398                         changed |= isnew;
399                         if (changed && !isnew) {
400                                 events.action(Action.UPDATE, "revision %s", revisionId);
401                                 catalogService.updateRevision(pRev);
402                         }
403                         if (changed) {
404                                 new Thread(() -> {
405                                         try {
406                                                 clients.getSVClient().securityVerificationScan(solutionId, revisionId, "created", userId);
407                                         } catch (Exception e) {
408                                                 log.error("SV scan failure on revision " + revisionId, e);
409                                         }
410                                 }).start();
411                                 new Thread(() -> {
412                                         try {
413                                                 RegisterAssetRequest rar = new RegisterAssetRequest();
414                                                 rar.setSolutionId(solutionId);
415                                                 rar.setRevisionId(revisionId);
416                                                 rar.setLoggedIdUser(userId);
417                                                 RegisterAssetResponse rax = clients.getLMClient().register(rar).get();
418                                                 if (!rax.isSuccess()) {
419                                                         log.error("License asset registration failure on revision " + revisionId + ": " + rax.getMessage());
420                                                 }
421                                         } catch (Exception e) {
422                                                 log.error("License asset registration failure on revision " + revisionId, e);
423                                         }
424                                 }).start();
425                         }
426                         events.end(act);
427                         return(changed);
428                 }
429
430                 private void checkSolution(String solutionId, String catalogId, boolean inLocalCatalog, FederationClient peer) {
431                         log.info("Checking solution {} from peer {}", solutionId, peerId);
432                         PendingAction act = events.begin("solution %s", solutionId);
433                         events.check(Action.FETCH, "remote solution");
434                         Solution pSol = (Solution)peer.getSolution(solutionId);
435                         events.check(Action.FETCH, "local solution");
436                         Solution lSol = (Solution)catalogService.getSolution(solutionId);
437                         boolean changed = false;
438                         boolean isnew = lSol == null;
439                         if (isnew) {
440                                 log.info("Solution {} doesn't exist locally.  Creating it", solutionId);
441                                 pSol.setDownloadCount(0L);
442                                 pSol.setFeatured(false);
443                                 pSol.setRatingAverageTenths(0L);
444                                 pSol.setRatingCount(0L);
445                                 pSol.setSourceId(peerId);
446                                 pSol.setUserId(userId);
447                                 pSol.setViewCount(0L);
448                                 events.action(Action.CREATE, "solution %s", solutionId);
449                                 lSol = (Solution)catalogService.createSolution(pSol);
450                         } else {
451                                 pSol.setActive(lSol.isActive());
452                                 pSol.setDownloadCount(lSol.getDownloadCount());
453                                 pSol.setFeatured(lSol.isFeatured());
454                                 pSol.setMetadata(lSol.getMetadata());
455                                 pSol.setName(lSol.getName());
456                                 pSol.setOrigin(lSol.getOrigin());
457                                 pSol.setRatingAverageTenths(lSol.getRatingAverageTenths());
458                                 pSol.setRatingCount(lSol.getRatingCount());
459                                 pSol.setToolkitTypeCode(lSol.getToolkitTypeCode());
460                                 pSol.setViewCount(lSol.getViewCount());
461                                 if (lSol.getSourceId() == null) {
462                                         pSol.setSourceId(null);
463                                         pSol.setUserId(lSol.getUserId());
464                                 } else {
465                                         pSol.setSourceId(peerId);
466                                         pSol.setUserId(userId);
467                                 }
468                                 changed |= !pSol.getTags().equals(lSol.getTags());
469                                 changed |= !Objects.equals(lSol.getSourceId(), pSol.getSourceId());
470                                 changed |= !Objects.equals(lSol.getUserId(), pSol.getUserId());
471                         }
472                         if (!Arrays.equals(lSol.getPicture(), pSol.getPicture())) {
473                                 log.info("Updating picture for solution {}", solutionId);
474                                 events.action(Action.UPDATE, "picture for solution %s", solutionId);
475                                 catalogService.savePicture(solutionId, pSol.getPicture());
476                         }
477                         if (!inLocalCatalog) {
478                                 log.info("Adding solution {} to catalog {}", solutionId, catalogId);
479                                 events.action(Action.ADD, "solution %s to catalog %s", solutionId, catalogId);
480                                 catalogService.addSolution(solutionId, catalogId);
481                         }
482                         for (MLPSolutionRevision rev: pSol.getRevisions()) {
483                                 changed |= checkRevision(rev.getRevisionId(), solutionId, catalogId, peer);
484                         }
485                         if (changed && !isnew) {
486                                 events.action(Action.UPDATE, "solution %s", solutionId);
487                                 catalogService.updateSolution(pSol);
488                                 log.info("Updated solution {} from peer {}", solutionId, peerId);
489                         }
490                         events.end(act);
491                 }
492
493                 private void checkCatalog(String catalogId) {
494                         log.info("Checking catalog {} from peer {}", catalogId, peerId);
495                         PendingAction act = events.begin("catalog %s from peer %s", catalogId, peerId);
496                         FederationClient peer = clients.getFederationClient(peerService.getPeer(peerId).getApiUrl());
497                         events.check(Action.FETCH, "list of solutions in remote catalog");
498                         List<MLPSolution> peerSolutions = peer.getSolutions(catalogId);
499                         events.check(Action.FETCH, "list of solutions in local catalog");
500                         HashMap<String, MLPSolution> localSolutions = index(catalogService.getSolutions(catalogId), MLPSolution::getSolutionId);
501                         if (localSolutions.isEmpty() && !peerSolutions.isEmpty() && index(catalogService.getAllCatalogs(), MLPCatalog::getCatalogId).get(catalogId) == null) {
502                                 log.info("Catalog {} doesn't exist locally.  Creating it", catalogId);
503                                 events.action(Action.CREATE, "catalog %s", catalogId);
504                                 catalogService.createCatalog(index(peer.getCatalogs(), MLPCatalog::getCatalogId).get(catalogId));
505                                 events.end();
506                         }
507                         for (MLPSolution solution: peerSolutions) {
508                                 checkSolution(solution.getSolutionId(), catalogId, localSolutions.get(solution.getSolutionId()) != null, peer);
509                         }
510                         log.info("Checked catalog {} from peer {}", catalogId, peerId);
511                         events.noteEnd(act);
512                 }
513
514                 private void checkSubscription() {
515                         log.info("Processing subscription {} for peer {}", subId, peerId);
516                         PendingAction act = events.begin("subscription %s for peer %s", subId, peerId);
517                         events.check(Action.FETCH, "subscription %s", subId);
518                         MLPPeerSubscription subscription = peerService.getSubscription(subId);
519                         events.check(Action.PARSE, "subscription's selector");
520                         Object xcatalogs;
521                         try {
522                                 xcatalogs = ((Map<String, Object>)mapper.readValue(subscription.getSelector(), trMapStoO)).get("catalogId");
523                                 if (xcatalogs instanceof String) {
524                                         xcatalogs = new String[] { (String)xcatalogs };
525                                 } else {
526                                         xcatalogs = ((List)xcatalogs).toArray(new String[((List)xcatalogs).size()]);
527                                 }
528                         } catch (IOException | NullPointerException | ArrayStoreException | ClassCastException ioe) {
529                                 log.error(String.format("Malformed selector %s on subscription %s to peer %s", subscription.getSelector(), subId, peerId));
530                                 events.fail(act, "Subscription selector was malformed");
531                                 return;
532                         }
533                         events.end();
534                         String[] catalogs = (String[])xcatalogs;
535                         Instant startTime = Instant.now();
536                         for (String catalogId: catalogs) {
537                                 checkCatalog(catalogId);
538                         }
539                         subscription.setProcessed(startTime);
540                         peerService.updateSubscription(subscription);
541                         log.info("Subscription {} processed for peer {}", subId, peerId);
542                         events.end(act);
543                 }
544
545                 public void run() {
546                         events = new Notifier(clients.getCDSClient(), userId);
547                         try {
548                                 checkSubscription();
549                         } catch (Exception ex) {
550                                 log.error(String.format("Unexpected error processing subscription %s for peer %s", subId, peerId), ex);
551                                 events.fail(null, ex.toString());
552                         }
553                 }
554         }
555
556
557         private HashMap<Long, PeerSubscriptionPoller> subscriptions = new HashMap<>();
558
559         /**
560          * Schedule an immediate poll of the specified subscription.
561          * @param subscription The subscription to poll.
562          */
563         public void
564         triggerSubscription(MLPPeerSubscription subscription) {
565                 require(subscription, true);
566         }
567
568         /**
569          * Check for changes to the list of peers and subscriptions.
570          */
571         @Scheduled(initialDelay=5000,fixedRateString="${peer.jobchecker.interval:400}000")
572         public void checkPeerJobs() {
573                 HashSet<Long> valid = new HashSet<>();
574                 for (MLPPeer peer: peerService.getPeers()) {
575                         if (peer.isSelf()) {
576                                 continue;
577                         }
578                         for (MLPPeerSubscription subscription: peerService.getSubscriptions(peer.getPeerId())) {
579                                 require(subscription, false);
580                                 valid.add(subscription.getSubId());
581                         }
582                 }
583                 HashSet<Long> invalid = new HashSet<>();
584                 for (Long subid: subscriptions.keySet()) {
585                         if (!valid.contains(subid)) {
586                                 invalid.add(subid);
587                         }
588                 }
589                 for (Long subid: invalid) {
590                         subscriptions.remove(subid).cancel();
591                 }
592         }
593
594
595         private void require(MLPPeerSubscription subscription, boolean force) {
596                 Long subId = subscription.getSubId();
597                 Long interval = subscription.getRefreshInterval();
598                 PeerSubscriptionPoller poller = subscriptions.get(subId);
599                 if (poller == null) {
600                         poller = new PeerSubscriptionPoller(subId, subscription.getUserId(), subscription.getPeerId(), interval);
601                         subscriptions.put(subId, poller);
602                         if (force || (interval != null && (interval.longValue() > 0L || subscription.getProcessed() == null))) {
603                                 poller.schedule();
604                         }
605                 } else if (force || (interval != null && !interval.equals(poller.getInterval()))) {
606                         poller.cancel();
607                         poller.setInterval(interval);
608                         poller.schedule();
609                 } else if (interval == null && poller.getInterval() != null) {
610                         poller.cancel();
611                         poller.setInterval(interval);
612                 }
613         }
614 }