Federation 3.2.0 - Model Data api 97/6397/16
authorjustin14 <justin.early@est.tech>
Thu, 30 Jan 2020 20:49:51 +0000 (12:49 -0800)
committerjustin14 <justin.early@est.tech>
Fri, 6 Mar 2020 19:41:47 +0000 (11:41 -0800)
- Added route GatewayController /peer/{peerId}/modeldata
- Added route FederationController /modeldata
- GatewayController /peer/{peerId}/modeldata - is used on the subscriber's instance to send model data to /modeldata on supplier instance
- For training and licensing projects to push model usage data and training params back to supplier
- This is critical piece in the continous model training flow
- Same api can be used for both model usage and model params

Issue-Id: ACUMOS-3920

Change-Id: I7929a9df76c0120308a781691598d0cbdae50cdb
Signed-off-by: justin14 <justin.early@est.tech>
22 files changed:
acumos-fgw-client/pom.xml
acumos-fgw-client/src/main/java/org/acumos/federation/client/ClientBase.java
acumos-fgw-client/src/main/java/org/acumos/federation/client/FederationClient.java
acumos-fgw-client/src/main/java/org/acumos/federation/client/GatewayClient.java
acumos-fgw-client/src/main/lombok/org/acumos/federation/client/data/ModelData.java [new file with mode: 0644]
acumos-fgw-client/src/main/lombok/org/acumos/federation/client/data/ModelInfo.java [new file with mode: 0644]
acumos-fgw-client/src/test/java/org/acumos/federation/client/ClientTest.java
docs/config.rst
docs/developer-guide.rst
docs/index.rst
docs/modeldata.rst [new file with mode: 0644]
docs/release-notes.rst
gateway/pom.xml
gateway/src/main/java/org/acumos/federation/gateway/Application.java
gateway/src/main/java/org/acumos/federation/gateway/Clients.java
gateway/src/main/java/org/acumos/federation/gateway/FederationController.java
gateway/src/main/java/org/acumos/federation/gateway/GatewayController.java
gateway/src/main/java/org/acumos/federation/gateway/LogstashClient.java [new file with mode: 0644]
gateway/src/main/java/org/acumos/federation/gateway/LogstashService.java [new file with mode: 0644]
gateway/src/main/java/org/acumos/federation/gateway/LogstashServiceImpl.java [new file with mode: 0644]
gateway/src/test/java/org/acumos/federation/gateway/FederationControllerTest.java
gateway/src/test/java/org/acumos/federation/gateway/GatewayControllerTest.java

index ee25b59..88496e3 100644 (file)
@@ -27,7 +27,7 @@ limitations under the License.
        </parent>
        <groupId>org.acumos.federation</groupId>
        <artifactId>acumos-fgw-client</artifactId>
-       <version>3.1.1-SNAPSHOT</version>
+       <version>3.2.0-SNAPSHOT</version>
        <name>Federation Gateway Client</name>
        <properties>
                <!-- dependency versions -->
index 8f47029..b806c93 100644 (file)
@@ -56,6 +56,7 @@ import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
 import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
 import org.springframework.http.converter.ByteArrayHttpMessageConverter;
 import org.springframework.http.converter.ResourceHttpMessageConverter;
+import org.springframework.http.converter.StringHttpMessageConverter;
 import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpMethod;
@@ -247,7 +248,7 @@ public class ClientBase {
                chrf.setBufferRequestBody(false);
                RestTemplateBuilder rtb = new RestTemplateBuilder()
                    .requestFactory(() -> chrf)
-                   .messageConverters(new ByteArrayHttpMessageConverter(), messageConverter, contentConverter)
+                   .messageConverters(new ByteArrayHttpMessageConverter(), new StringHttpMessageConverter(), messageConverter, contentConverter)
                    .uriTemplateHandler(new DefaultUriBuilderFactory())
                    .rootUri(target);
                if (creds != null && creds.getUsername() != null && creds.getPassword() != null) {
index 9d9dc05..11743d1 100644 (file)
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.springframework.core.io.ResourceLoader;
 import org.springframework.core.ParameterizedTypeReference;
 import org.springframework.http.HttpMethod;
+import org.springframework.web.client.RestClientException;
 
 import org.acumos.cds.domain.MLPPeer;
 import org.acumos.cds.domain.MLPCatalog;
@@ -37,6 +38,8 @@ import org.acumos.cds.domain.MLPDocument;
 
 import org.acumos.federation.client.config.ClientConfig;
 import org.acumos.federation.client.data.JsonResponse;
+import org.acumos.federation.client.data.ModelData;
+
 
 /**
  * Client for the Federation Server's public (E5) API.  Note that servers
@@ -108,6 +111,11 @@ public class FederationClient extends ClientBase {
         */
        public static final String CATID_QUERY = "?catalogId={catalogId}";
 
+       /**
+        * The URI for sending model data from subscriber to supplier.
+        */
+       public static final String MODEL_DATA = "/modeldata";
+
        /**
         * Peer Status Code for Active
         */
@@ -188,6 +196,15 @@ public class FederationClient extends ClientBase {
                return handleResponse(UNREGISTER_URI, HttpMethod.POST, new ParameterizedTypeReference<JsonResponse<MLPPeer>>(){});
        }
 
+       /**
+        * @param modelData model (json) data payload
+        * @return json response
+        * @throws RestClientException if remote acumos is not available
+        */
+       public Void receiveModelData(ModelData modelData) throws RestClientException {
+               return restTemplate.postForObject(MODEL_DATA, modelData, Void.class);
+       }
+
        /**
         * Get a list of the server's catalogs.
         *
index 8f685a7..b6f5ef6 100644 (file)
 package org.acumos.federation.client;
 
 import java.util.List;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.springframework.core.io.ResourceLoader;
 import org.springframework.core.ParameterizedTypeReference;
 import org.springframework.http.HttpMethod;
-
-
+import org.springframework.web.client.RestClientException;
 import org.acumos.cds.domain.MLPPeer;
 import org.acumos.cds.domain.MLPCatalog;
 import org.acumos.cds.domain.MLPSolution;
 
 import org.acumos.federation.client.config.ClientConfig;
 import org.acumos.federation.client.data.JsonResponse;
+import org.acumos.federation.client.data.ModelData;
 
 /**
  * Client for the Federation Gateway's private API.  Except as specified,
@@ -164,4 +163,17 @@ public class GatewayClient extends ClientBase {
        public void triggerPeerSubscription(String peerId, long subscriptionId) {
                handleResponse(PEER_PFX + SUBSCRIPTION_URI, HttpMethod.POST, new ParameterizedTypeReference<JsonResponse<Void>>(){}, peerId, subscriptionId);
        }
+
+       /**
+        * Ask the local gateway server to poll a subscription. Note that this operation is not proxied by
+        * the local gateway server but rather requests the local gateway server to immediately poll the
+        * specified subscription to the remote federation server, without waiting until its normally
+        * scheduled time.
+        * @param peerId id to send model data to
+        * @param modelData log data for sending to supplier of model
+        */
+       public void sendModelData(String peerId, ModelData modelData)
+                       throws RestClientException {
+               restTemplate.postForObject(PEER_PFX + FederationClient.MODEL_DATA, modelData, Void.class, peerId);
+       }
 }
diff --git a/acumos-fgw-client/src/main/lombok/org/acumos/federation/client/data/ModelData.java b/acumos-fgw-client/src/main/lombok/org/acumos/federation/client/data/ModelData.java
new file mode 100644 (file)
index 0000000..d408e6f
--- /dev/null
@@ -0,0 +1,79 @@
+/*-
+ * ===============LICENSE_START=======================================================
+ * Acumos
+ * ===================================================================================
+ * Copyright (C) 2020 Nordix Foundation
+ * ===================================================================================
+ * This Acumos software file is distributed by Nordix Foundation
+ * under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ===============LICENSE_END=========================================================
+ */
+package org.acumos.federation.client.data;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+import lombok.ToString;
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Model data is used to send data back to the supplier of the model
+ * This data can include parameter data or usage data.
+ */
+@Data
+@ToString(callSuper = true, includeFieldNames = true)
+public class ModelData {
+       /**
+        * Example value
+        * 
+        * <pre>
+        *  {@code
+                               "value": {
+                               "B": "121",
+                               "C": "270",
+                               "A": "601"
+                       }
+               * }
+        * </pre>
+        * 
+        *
+        * @param value Open ended json object with key value pairs
+        * @return Param values open ended
+        */
+       private JsonNode value;
+
+       /**
+        * Used for partitioning logs
+        * This could be used for knowing if model data is for model usage or model parameters.
+        * 
+        * @param tags array of tags to be used to separate different logs
+        * @return rray of tags to be used to separate different logs
+        */
+       private String[] tags;
+
+       /**
+        * When the data was initially recorded. Since data may take some time during transfer
+        * we want to keep the original timestamp when the params were collected or 
+        * when the model was used.
+        * @param timestamp time when log was recorded
+        * @return time when log entry was recorded
+        */
+       @JsonProperty("@timestamp")
+       private String timestamp;
+
+       /**
+        * Model identifying information.
+        * @param model model identification ie version, solution id, and subscriber host
+        * @return model identification ie version, solution id, and subscriber host
+        */
+       private ModelInfo model;
+
+}
diff --git a/acumos-fgw-client/src/main/lombok/org/acumos/federation/client/data/ModelInfo.java b/acumos-fgw-client/src/main/lombok/org/acumos/federation/client/data/ModelInfo.java
new file mode 100644 (file)
index 0000000..7681578
--- /dev/null
@@ -0,0 +1,56 @@
+/*-
+ * ===============LICENSE_START=======================================================
+ * Acumos
+ * ===================================================================================
+ * Copyright (C) 2020 Nordix Foundation
+ * ===================================================================================
+ * This Acumos software file is distributed by Nordix Foundation
+ * under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ===============LICENSE_END=========================================================
+ */
+package org.acumos.federation.client.data;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+import lombok.ToString;
+
+
+/**
+ * Document enhanced with file name.
+ */
+@Data
+@ToString(callSuper=true, includeFieldNames=true)
+public class ModelInfo {
+       /**
+        * CDS model id - persistent across all versions of the model
+        * 
+        * @param solutionId the primary identification in CDS for the model
+        * @return the primary identification in CDS for the model
+        */
+       private String solutionId;
+
+       /**
+        * CDS GUID for the revision of the model.
+        * 
+        * @return the unique CDS id for the version
+        * @param revisionId the unique CDS id for the version
+        */
+       private String revisionId;
+
+       /**
+        * Subject name of the peer that is sending parameter updates.
+        * @param subscriberName the name of the subscriber/peer host
+        * @return the name of the subscriber/peer host
+        */
+       private String subscriberName;
+
+}
index 271780f..d1cb2cd 100644 (file)
@@ -22,6 +22,7 @@ package org.acumos.federation.client;
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.List;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.junit.Test;
 import org.junit.Before;
@@ -41,6 +42,7 @@ import org.springframework.web.util.UriTemplateHandler;
 import org.acumos.cds.domain.MLPSolution;
 
 import org.acumos.federation.client.config.ClientConfig;
+import org.acumos.federation.client.data.ModelData;
 import org.acumos.federation.client.data.Solution;
 import org.acumos.federation.client.data.SolutionRevision;
 
@@ -114,6 +116,24 @@ public class ClientTest {
                client.triggerPeerSubscription("somepeerid", 99);
        }
 
+       @Test
+       public void testGatewayModelData() throws Exception {
+               GatewayClient client = new GatewayClient("http://localhost:8888", getConfig("acumosa"));
+               (new ClientMocking()).errorOnNoAuth(401, "Unauthorized")
+               .errorOnBadAuth("acumosa", "acumosa", 403, "Forbidden")
+                   .on("POST /peer/somepeerid/modeldata", xq("{ 'content': 'successfully send model data to peer' }"))
+                   .applyTo(client);
+               ObjectMapper objectMapper = new ObjectMapper();
+               ModelData modelData =
+                               objectMapper.readValue("{\"model\": { \"solutionId\": \"UUID\"}}", ModelData.class);
+                               try {
+                                       client.sendModelData("somepeerid", modelData);
+                               } catch (Exception e) {
+                                       fail("failed to send model data");
+                               }
+       }
+
+
        @Test
        public void testFederation() throws Exception {
                FederationClient client = new FederationClient("http://localhost:9999", getConfig("acumosa"), null, null);
index ac1a863..e13f7ab 100644 (file)
@@ -63,7 +63,8 @@ Example (with syntactically valid but completely made up values)::
       "nexus.group-id": "myorg"
     },
     "license-manager.url": "http://licenseserver:8888",
-    "verification.url": "http://securityserver:9999"
+    "verification.url": "http://securityserver:9999",
+    "logstash.url": "http://logstash:2345"
   }'
 
 Note that::
@@ -301,6 +302,12 @@ verification.url
   URL for the Acumos security-verification server used to perform security
   verification scans on solution revisions.
 
+logstash.url
+  Required.
+
+  URL for the Acumos logstash server used to save model data in elastic search.
+  Required by the /modeldata api.
+
 =========================================
 Federation Gateway Certificate Generation
 =========================================
index 0d3b773..2f214a4 100644 (file)
@@ -126,3 +126,7 @@ The following endpoints are defined on the public "E5" interface:
 * /documents/{documentId}/content
 
   Retrieve the content of the specified document
+
+* /modeldata
+
+  Sends model data to supplier of the model.
index f4b5e8d..b705718 100644 (file)
@@ -31,3 +31,4 @@ Federation Gateway
        config.rst
        client.rst
        release-notes.rst
+       modeldata.rst
diff --git a/docs/modeldata.rst b/docs/modeldata.rst
new file mode 100644 (file)
index 0000000..c65d69e
--- /dev/null
@@ -0,0 +1,95 @@
+.. ===============LICENSE_START=======================================================
+.. Acumos CC-BY-4.0
+.. ===================================================================================
+.. Copyright (C) 2017 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
+.. ===================================================================================
+.. This Acumos documentation file is distributed by AT&T and Tech Mahindra
+.. under the Creative Commons Attribution 4.0 International License (the "License");
+.. you may not use this file except in compliance with the License.
+.. You may obtain a copy of the License at
+..
+.. http://creativecommons.org/licenses/by/4.0
+..
+.. This file is distributed on an "AS IS" BASIS,
+.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+.. See the License for the specific language governing permissions and
+.. limitations under the License.
+.. ===============LICENSE_END=========================================================
+
+======================
+Model Data Admin Guide
+======================
+
+Overview
+--------
+
+The Model data api allows model data such as parameters to flow from a running model in
+a subscriber's instance of Acumos to a supplier's instance of Acumos. In addition to 
+the federation gateway /peer/{peerId}/modeldata api we must connect logstash to send 
+the updated model parameters.
+
+Example Model Data Parameters
+-----------------------------
+
+.. code-block:: javascript
+
+    {
+        "@version": "1",
+        "@timestamp": "2020-02-17T21:21:09.338Z",
+        "tags": [
+            "acumos-model-param-logs",
+            "beats_input_raw_event"
+        ],
+        "model": {
+            "userId": "12345678-abcd-90ab-cdef-1234567890ab",
+            "revisionId": "1c0a4ea4-e822-4fb3-bef1-11f92958c315",
+            "solutionId": "149ea34c-44fc-4329-8189-52d3ae523a15"
+        },
+        "value": {
+            "B": "121",
+            "C": "270",
+            "A": "601"
+        }
+    }
+
+
+
+Example Log stash configuration
+-------------------------------
+
+Log stash has 2 important configuration changes
+
+1. http output plugin sending logs from model runner to gateway service
+
+.. code-block:: javascript
+
+  output
+     ...
+     if "acumos-model-param-logs" in [tags] {
+      elasticsearch {
+        hosts => ["elasticsearch:9200"]
+        index => "acumos-model-param-logs"
+      }
+
+      http {
+        keystore => "/app/certs/acumos-keystore.p12"
+        keystore_password => "[KEYSTORE_PASSWORD]"
+        keystore_type => "PKCS12"
+        truststore => "/app/certs/acumos-truststore.jks"
+        truststore_password => "[TRUSTSTORE_PASSWORD]"
+        retry_failed => false
+        http_method => "post"
+        url => "https://[GATEWAY_SERVICE]:[GATEWAY_PORT]/peer/USE_SOLUTION_SOURCE/modeldata"
+      }
+
+
+2. http input plugin for accepting log entries from federation service.
+
+.. code-block:: javascript
+
+      input
+      ...
+        http {
+            port => 5043
+        }
+
index b2b3cea..79ff3d9 100644 (file)
@@ -23,6 +23,11 @@ Federation Gateway Release Notes
 This server is available as a Docker image in a Docker registry at the Linux Foundation.
 The image name is "federation-gateway" and the tag is a version string as shown below.
 
+Version 3.2.0, 2020-02-17
+-------------------------
+* Adding support for model data sending over federation gateway (`ACUMOS-3920 <https://jira.acumos.org/browse/ACUMOS-3920>`_)
+* Fix solution sourceId !=null (`ACUMOS-4021 <https://jira.acumos.org/browse/ACUMOS-4021>`_)
+
 Version 3.1.1, 2020-01-27
 -------------------------
 * Update dependency version for the common data service client to 3.1.1 (`ACUMOS-3951 <https://jira.acumos.org/browse/ACUMOS-3951>`_)
index 5782570..cde6db7 100644 (file)
@@ -27,7 +27,7 @@
        </parent>
        <groupId>org.acumos.federation</groupId>
        <artifactId>gateway</artifactId>
-       <version>3.1.1-SNAPSHOT</version>
+       <version>3.2.0-SNAPSHOT</version>
        <name>Federation Gateway</name>
        <description>Federated Acumos Interface for inter-acumos communication</description>
        <properties>
index dc63c41..563eb37 100644 (file)
@@ -149,6 +149,12 @@ public class Application {
                return new NexusConfig();
        }
 
+       @Bean
+       @ConfigurationProperties(prefix="logstash")
+       ServiceConfig logstashConfig() {
+               return new ServiceConfig();
+       }
+
        @Bean
        @ConfigurationProperties(prefix="docker")
        DockerConfig dockerConfig() {
@@ -192,6 +198,12 @@ public class Application {
                return new CatalogServiceImpl();
        }
 
+       @Bean
+       LogstashService logstashService() {
+               return new LogstashServiceImpl();
+       }
+
+
        static Docket getApi() {
                String version = Application.class.getPackage().getImplementationVersion();
                return new Docket(DocumentationType.SWAGGER_2)
index a3a2b98..661649e 100644 (file)
@@ -76,10 +76,14 @@ public class Clients {
        @Autowired
        private ServiceConfig lmConfig;
 
+       @Autowired
+       private ServiceConfig logstashConfig;
+
        private ICommonDataServiceRestClient cdsClient;
        private NexusClient nexusClient;
        private ISecurityVerificationClientService svClient;
        private LicenseAsset lmClient;
+       private LogstashClient logstashClient;
 
        public FederationClient getFederationClient(String url) {
                /*
@@ -158,4 +162,13 @@ public class Clients {
                }
                return lmClient;
        }
+
+       public synchronized LogstashClient getLogstashClient(){
+               if(logstashClient == null){
+                       ClientConfig cc = new ClientConfig();
+                       cc.setCreds(logstashConfig);
+                       logstashClient = new LogstashClient(logstashConfig.getUrl(), cc);
+               }
+               return logstashClient;
+       }
 }
index 0fe703c..d58658e 100644 (file)
@@ -41,6 +41,7 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
@@ -61,6 +62,7 @@ import org.acumos.federation.client.FederationClient;
 import org.acumos.federation.client.data.Artifact;
 import org.acumos.federation.client.data.Document;
 import org.acumos.federation.client.data.JsonResponse;
+import org.acumos.federation.client.data.ModelData;
 import org.acumos.federation.client.data.SolutionRevision;
 
 /**
@@ -84,6 +86,9 @@ public class FederationController {
        @Autowired
        private CatalogService catalogService;
 
+       @Autowired
+       private LogstashService logstashService;
+
        @Autowired
        private ContentService contentService;
 
@@ -338,4 +343,27 @@ public class FederationController {
                response.setStatus(badRequest.getCode());
                return ret;
        }
+
+       /**
+        * Receives model data payload from
+        * {@link GatewayController#peerModelData(HttpServletResponse, ModelData, String)}
+        *
+        * @param payload         model data payload The payload must have a model.solutionId
+        * 
+        * @param theHttpResponse HttpServletResponse
+        * 
+        */
+       @CrossOrigin
+       @Secured(Security.ROLE_PEER)
+       @ApiOperation(value = "Invoked by Peer Acumos to post model data to elastic search service .",
+                       response = Void.class)
+       @PostMapping(FederationClient.MODEL_DATA)
+       @ResponseBody
+       public void receiveModelData(@RequestBody ModelData payload,
+                       HttpServletResponse theHttpResponse) {
+
+               log.debug(FederationClient.MODEL_DATA);
+               log.debug("Model parameters: {}", payload);
+               logstashService.sendModelData(payload);
+       }
 }
index a952acf..f4e0aa2 100644 (file)
@@ -37,11 +37,14 @@ import org.springframework.web.bind.annotation.CrossOrigin;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.client.RestClientResponseException;
 import org.springframework.security.access.annotation.Secured;
-
+import org.springframework.security.access.method.P;
+import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
 import org.acumos.cds.domain.MLPCatalog;
 import org.acumos.cds.domain.MLPSolution;
 import org.acumos.cds.domain.MLPPeer;
@@ -50,6 +53,9 @@ import org.acumos.cds.domain.MLPPeerSubscription;
 import org.acumos.federation.client.FederationClient;
 import org.acumos.federation.client.GatewayClient;
 import org.acumos.federation.client.data.JsonResponse;
+import org.acumos.federation.client.data.ModelData;
+import org.acumos.federation.client.data.ModelInfo;
+
 
 
 /**
@@ -71,6 +77,9 @@ public class GatewayController {
        @Autowired
        private SubscriptionPoller poller;
 
+       @Autowired
+       private WebSecurityConfigurerAdapter security;
+
        @ApiOperation(value = "Invoked by local Acumos to get a list of catalogs available from a peer Acumos instance .", response = MLPCatalog.class, responseContainer = "List")
        @GetMapping(FederationClient.CATALOGS_URI)
        @ResponseBody
@@ -152,6 +161,76 @@ public class GatewayController {
                }
                return ret;
        }
+       /**
+        * Receives incoming log message from logstash and Sends to {@link FederationController#receiveModelData(ModelData, HttpServletResponse)}
+        *
+        * @param payload model data payload The payload must have a model.solutionId
+        *
+        * @param theHttpResponse HttpServletResponse
+        * @param peerIdPathVar PeerID from url path param or USE_SOLUTION_SOURCE to lookup peer based on model.solutionId field
+        * @return success message in JSON format
+        *
+        */
+       @Secured(Security.ROLE_PEER)
+       @ApiOperation(
+                       value = "Invoked by local Acumos to post incoming model data to respective remote peer Acumos instance .",
+                       response = ModelData.class)
+       @PostMapping(FederationClient.MODEL_DATA)
+       @ResponseBody
+       public JsonResponse<Void> peerModelData(HttpServletResponse theHttpResponse,
+           @RequestBody ModelData payload, @PathVariable("peerId") String peerIdPathVar) {
+               log.debug("/peer/{}/modeldata  payload: {}", peerIdPathVar, payload);
+               ModelInfo modelInfo = payload.getModel();
+               String peerId = peerIdPathVar;
+               JsonResponse response = new JsonResponse();
+               // peer id lookup from solution if peerid from path variable is null
+               if(peerId.indexOf("USE_SOLUTION_SOURCE") != -1){
+                       String solutionId = modelInfo.getSolutionId();
+                       peerId = getPeerIdFromCds(solutionId);
+               }
+               
+               MLPPeer self = ((Security) security).getSelf();
+               modelInfo.setSubscriberName(self.getSubjectName());
+       
+               try {
+
+                       // check if thePeerId matches to the
+                       // Ignore request if for local peer i.e. peerId same as local peer
+                       //
+                       log.debug("Attempting to connect to peer id {}", peerId);
+                       if (peerId == null) {
+                               log.debug("ignore logging to self-peer {}", peerId);
+                               return this.getSuccessResponse(theHttpResponse,
+                                               "ignore logging to self-peer");
+                       }
+
+                       log.debug("calling peer with request {}", payload);
+                       callPeer(theHttpResponse, peerId, peer -> peer.receiveModelData(payload));
+               } catch (Exception ex) {
+                       log.error("failed posting to remote peerId:" + peerId + " exception {}", ex);
+                       throw new BadRequestException(HttpServletResponse.SC_BAD_GATEWAY, "failed posting to remote peerId:" + peerId);
+               }
+               return response;
+
+       }
+
+       private JsonResponse getSuccessResponse(
+               HttpServletResponse theHttpResponse,
+               String message) {
+               JsonResponse response = new JsonResponse();
+               response.setMessage("modelData - " + message);
+               return response;
+       }
+
+       private String getPeerIdFromCds(String solutionId) {
+               try {
+                       String peerId = clients.getCDSClient().getSolution(solutionId).getSourceId();
+                       return peerId;
+               } catch (RestClientResponseException ex) {
+                       log.error("getSolution failed, server reports: {}", ex);
+                       throw new BadRequestException(HttpServletResponse.SC_NOT_FOUND, "Not Found");
+               }
+       }
 
        private <T> JsonResponse<T> callPeer(HttpServletResponse response, String peerId, Function<FederationClient, T> fcn) {
                MLPPeer peer = peerService.getPeer(peerId);
diff --git a/gateway/src/main/java/org/acumos/federation/gateway/LogstashClient.java b/gateway/src/main/java/org/acumos/federation/gateway/LogstashClient.java
new file mode 100644 (file)
index 0000000..48746a3
--- /dev/null
@@ -0,0 +1,50 @@
+/*-
+ * ===============LICENSE_START=======================================================
+ * Acumos
+ * ===================================================================================
+ * Copyright (C) 2020 Nordix Foundation
+ * ===================================================================================
+ * This Acumos software file is distributed by Nordix Foundation
+ * under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ===============LICENSE_END=========================================================
+ */
+package org.acumos.federation.gateway;
+
+import org.acumos.federation.client.ClientBase;
+import org.acumos.federation.client.config.ClientConfig;
+import org.springframework.web.client.RestClientException;
+import org.acumos.federation.client.data.ModelData;
+
+
+/**
+ * Client for accessing the Logstash service configured with a http input plugin.
+ * https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html
+ */
+public class LogstashClient extends ClientBase {
+
+       /**
+        * Create a logstash client
+        *
+        * @param url URL for accessing the Logstash service.
+        * @param cc  Credentials and TLS parameters for mutual authentication.
+        */
+       public LogstashClient(String url, ClientConfig cc) {
+               super(url, cc, null, null);
+       }
+
+       public void saveModelData(ModelData modelData) throws RestClientException {
+               // We are ignoring resulting string for now but in order to have valid
+               // message converter using String.
+               restTemplate.postForObject("/", modelData, String.class);
+       }
+
+}
diff --git a/gateway/src/main/java/org/acumos/federation/gateway/LogstashService.java b/gateway/src/main/java/org/acumos/federation/gateway/LogstashService.java
new file mode 100644 (file)
index 0000000..6819514
--- /dev/null
@@ -0,0 +1,35 @@
+/*-
+ * ===============LICENSE_START=======================================================
+ * Acumos
+ * ===================================================================================
+ * Copyright (C) 2020 Nordix Foundation
+ * ===================================================================================
+ * This Acumos software file is distributed by Nordix Foundation
+ * under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ===============LICENSE_END=========================================================
+ */
+package org.acumos.federation.gateway;
+
+import org.acumos.federation.client.data.ModelData;
+
+/**
+ * API for sending model data to logstash
+ *
+ * Provides a service that will send model data to logstash
+ */
+public interface LogstashService {
+       /**
+        * Send the model data
+        */
+       public void sendModelData(ModelData payload);
+
+}
diff --git a/gateway/src/main/java/org/acumos/federation/gateway/LogstashServiceImpl.java b/gateway/src/main/java/org/acumos/federation/gateway/LogstashServiceImpl.java
new file mode 100644 (file)
index 0000000..d99dac6
--- /dev/null
@@ -0,0 +1,37 @@
+/*-
+ * ===============LICENSE_START=======================================================
+ * Acumos
+ * ===================================================================================
+ * Copyright (C) 2020 Nordix Foundation
+ * ===================================================================================
+ * This Acumos software file is distributed by Nordix Foundation
+ * under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ===============LICENSE_END=========================================================
+ */
+package org.acumos.federation.gateway;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.client.RestClientException;
+import org.acumos.federation.client.data.ModelData;
+
+public class LogstashServiceImpl implements LogstashService {
+
+
+       @Autowired
+       private Clients clients;
+
+       @Override
+       public void sendModelData(ModelData payload) throws RestClientException{
+        clients.getLogstashClient().saveModelData(payload);
+       }
+
+}
\ No newline at end of file
index 708b205..9becffa 100644 (file)
@@ -22,7 +22,7 @@ package org.acumos.federation.gateway;
 import java.io.InputStream;
 
 import javax.servlet.http.HttpServletResponse;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -58,8 +58,11 @@ import org.acumos.federation.client.ClientBase;
 import org.acumos.federation.client.config.ClientConfig;
 import org.acumos.federation.client.config.BasicAuthConfig;
 import org.acumos.federation.client.config.TlsConfig;
+import org.acumos.federation.client.data.JsonResponse;
+import org.acumos.federation.client.data.ModelData;
 
 import org.acumos.federation.client.test.ClientMocking;
+import org.apache.http.entity.ContentType;
 import static org.acumos.federation.client.test.ClientMocking.getConfig;
 import static org.acumos.federation.client.test.ClientMocking.xq;
 
@@ -81,7 +84,8 @@ import static org.acumos.federation.client.test.ClientMocking.xq;
        "cdms.client.url=http://dummy.org:999",
        "cdms.client.username=dummyuser",
        "cdms.client.password=dummypass",
-       "nexus.url=http://dummy.org:1234"
+       "nexus.url=http://dummy.org:1234",
+       "logstash.url=http://logstash:2345",
     }
 )
 public class FederationControllerTest {
@@ -94,6 +98,9 @@ public class FederationControllerTest {
        @Autowired
        private NexusConfig nexusConfig;
 
+       @Autowired
+       private ServiceConfig logstashConfig;
+
        @Autowired
        private PeerService peerService;
 
@@ -187,6 +194,23 @@ public class FederationControllerTest {
                docker = new SimulatedDockerClient();
                docker.setSaveResult("abcdefg".getBytes());
                when(clients.getDockerClient()).thenReturn(docker.getClient());
+
+               initLogstashMock(clients);
+       }
+
+       public void initLogstashMock(Clients clients) throws Exception {
+
+               String url = logstashConfig.getUrl();
+               ClientConfig ccc = new ClientConfig();
+               ccc.setCreds(logstashConfig);
+               LogstashClient mockLogstashClient = new LogstashClient(url, ccc);
+
+               (new ClientMocking())
+                   .on("POST /", "ok", ContentType.TEXT_PLAIN)
+                   .applyTo(mockLogstashClient);
+
+               when(clients.getLogstashClient()).thenReturn(mockLogstashClient);
+
        }
 
        @Test
@@ -306,6 +330,21 @@ public class FederationControllerTest {
                assertNotNull(self.getArtifacts("somesolid", "altrevid"));
        }
 
+       @Test
+       public void testModelData() throws Exception {
+               FederationClient self = new FederationClient("https://localhost:" + port, getConfig("acumosa"));
+
+               ObjectMapper objectMapper = new ObjectMapper();
+               ModelData payloadObjectNode =  objectMapper.readValue("{\"model\": { \"solutionId\": \"UUID\"}}", ModelData.class);
+               try {
+                       self.receiveModelData(payloadObjectNode);
+               } catch (Exception e) {
+                       System.err.println(e);
+                       fail("exception when sending model data not expected");
+               }
+       }
+
+
        @Test
        public void testSwagger() throws Exception {
                RawAnonClient rac = new RawAnonClient("https://localhost:" + port);
index d741649..c6290c4 100644 (file)
@@ -22,7 +22,7 @@ package org.acumos.federation.gateway;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -58,6 +58,8 @@ import org.acumos.federation.client.ClientBase;
 import org.acumos.federation.client.config.ClientConfig;
 import org.acumos.federation.client.config.BasicAuthConfig;
 import org.acumos.federation.client.config.TlsConfig;
+import org.acumos.federation.client.data.JsonResponse;
+import org.acumos.federation.client.data.ModelData;
 
 import org.acumos.federation.client.test.ClientMocking;
 import static org.acumos.federation.client.test.ClientMocking.getConfig;
@@ -78,7 +80,8 @@ import static org.acumos.federation.client.test.ClientMocking.xq;
        "nexus.group-id=nxsgrpid",
        "nexus.name-separator=,",
        "docker.registry-url=someregistry:9999",
-       "federation.operator=defuserid"
+       "federation.operator=defuserid",
+       "logstash.url=http://logstash:2345",
     }
 )
 public class GatewayControllerTest {
@@ -280,6 +283,71 @@ public class GatewayControllerTest {
                assertEquals("Incomplete steps remain", 0, steps.getCount() - 1);
        }
 
+       @Test
+       public void testModelDataNoPeerLookup() throws Exception {
+               GatewayClient self = new GatewayClient("https://localhost:" + port, getConfig("acumosa"));
+
+               ICommonDataServiceRestClient cdsClient =
+                               CommonDataServiceRestClientImpl.getInstance("http://cds:999",
+                                               ClientBase.buildRestTemplate("http://cds:999", new ClientConfig(), null, null));
+               String peerUrl = "https://somepeer.org:999";
+
+               (new ClientMocking())
+                   .on("GET /peer/peerid", xq("{ 'peerId': 'peerid', 'apiUrl': \'" + peerUrl + "\'}"))
+                   .on("GET /solution/cat2soln", xq("{ 'solutionId': 'cat2soln', 'sourceId': 'peerid' }"))
+                   .on("GET /peer/search?subjectName=gateway.acumosa.org&_j=a&page=0&size=100", xq("{ 'content': [ {'peerId': 'acumosa', 'subjectName': 'gateway.acumosa.org', 'statusCode': 'AC', 'self': true } ], 'last': true, 'number': 0, 'size': 100, 'numberOfElements': 1 }"))
+                   .applyTo(cdsClient);
+               when(clients.getCDSClient()).thenReturn(cdsClient);
+
+               FederationClient fedClient = new FederationClient(peerUrl, new ClientConfig());
+               (new ClientMocking())
+                   .on("POST /modeldata", xq("{'message': 'model data posted and sent to peer'}"))
+                   .applyTo(fedClient);
+               when(clients.getFederationClient(any(String.class))).thenReturn(fedClient);
+
+               ObjectMapper objectMapper = new ObjectMapper();
+               ModelData payloadObjectNode =
+                               objectMapper.readValue("{\"model\": { \"solutionId\": \"cat2soln\"}}", ModelData.class);
+               System.out.println("Justin " + payloadObjectNode);
+               try {
+                       self.sendModelData("peerid", payloadObjectNode);
+               } catch (Exception e) {
+                       fail("model data not sent to peer");
+               }
+       }
+
+       @Test
+       public void testModelDataWithPeerLookup() throws Exception {
+               GatewayClient self = new GatewayClient("https://localhost:" + port, getConfig("acumosa"));
+
+               ICommonDataServiceRestClient cdsClient =
+                               CommonDataServiceRestClientImpl.getInstance("http://cds:999",
+                                               ClientBase.buildRestTemplate("http://cds:999", new ClientConfig(), null, null));
+               String peerUrl = "https://somepeer.org:999";
+
+               (new ClientMocking())
+                   .on("GET /peer/peerid", xq("{ 'peerId': 'peerid', 'apiUrl': \'" + peerUrl + "\'}"))
+                   .on("GET /solution/cat2soln", xq("{ 'solutionId': 'cat2soln', 'sourceId': 'peerid' }"))
+                   .on("GET /peer/search?subjectName=gateway.acumosa.org&_j=a&page=0&size=100", xq("{ 'content': [ {'peerId': 'acumosa', 'subjectName': 'gateway.acumosa.org', 'statusCode': 'AC', 'self': true } ], 'last': true, 'number': 0, 'size': 100, 'numberOfElements': 1 }"))
+                   .applyTo(cdsClient);
+               when(clients.getCDSClient()).thenReturn(cdsClient);
+
+               FederationClient fedClient = new FederationClient(peerUrl, new ClientConfig());
+               (new ClientMocking())
+                   .on("POST /modeldata", xq("{'message': 'successfully posted model data'}"))
+                   .applyTo(fedClient);
+               when(clients.getFederationClient(any(String.class))).thenReturn(fedClient);
+
+               ObjectMapper objectMapper = new ObjectMapper();
+               ModelData payloadObjectNode =
+                               objectMapper.readValue("{\"model\": { \"solutionId\": \"cat2soln\"}}", ModelData.class);
+               try {
+                       self.sendModelData("USE_SOLUTION_SOURCE", payloadObjectNode);
+               } catch (Exception e) {
+                       fail("was not able to send modeldata to peer");
+               }
+       }
+
 
        @Test
        public void testSwagger() throws Exception {