1b6e665d771ffe8f780bfddca9f06384ad623c32
[platform-oam.git] / elk-client / src / main / java / org / acumos / elk / client / service / SnapshotRepositoryServiceImpl.java
1 /*-
2  * ===============LICENSE_START=======================================================
3  * Acumos
4  * ===================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6  * ===================================================================================
7  * This Acumos software file is distributed by AT&T and Tech Mahindra
8  * under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *  
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  * This file is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ===============LICENSE_END=========================================================
19  */
20 package org.acumos.elk.client.service;
21
22 import java.io.IOException;
23 import java.lang.invoke.MethodHandles;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.function.BiPredicate;
29 import java.util.function.Function;
30 import java.util.function.Predicate;
31
32 import org.acumos.elk.client.transport.ArchiveInfo;
33 import org.acumos.elk.client.transport.ELkRepositoryMetaData;
34 import org.acumos.elk.client.transport.ElasticsearchSnapshotsResponse;
35 import org.acumos.elk.client.transport.ElkArchiveRequest;
36 import org.acumos.elk.client.transport.ElkArchiveResponse;
37 import org.acumos.elk.client.transport.ElkGetRepositoriesResponse;
38 import org.acumos.elk.client.transport.ElkGetSnapshotMetaData;
39 import org.acumos.elk.client.transport.ElkRepositoriesRequest;
40 import org.acumos.elk.client.transport.ElkSnapshotsResponse;
41 import org.acumos.elk.client.transport.ErrorTransport;
42 import org.acumos.elk.client.utils.ElkClientConstants;
43 import org.acumos.elk.client.utils.ElkServiceUtils;
44 import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
45 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
46 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
47 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
48 import org.elasticsearch.action.support.master.AcknowledgedResponse;
49 import org.elasticsearch.client.RequestOptions;
50 import org.elasticsearch.client.RestHighLevelClient;
51 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
52 import org.elasticsearch.common.settings.Settings;
53 import org.elasticsearch.repositories.fs.FsRepository;
54 import org.json.simple.JSONObject;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.stereotype.Service;
59 import org.springframework.util.StringUtils;
60
61 import com.acumos.elk.exception.ELKException;
62
63 /**
64  * Implementation of operation related to elastic stack repository.
65  *
66  */
67 @Service
68 public class SnapshotRepositoryServiceImpl extends AbstractELKClientConnection implements ISnapshotRepositoryService {
69
70         private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
71         private Predicate<ELkRepositoryMetaData> chkRepositoryName = obj -> (obj.getName() != null
72                         && obj.getName().startsWith(ElkClientConstants.ARCHIVE_ES_DATA, 0));
73
74         @Autowired
75         SnapshotServiceImpl snapshotServiceImpl;
76
77         @Override
78         public ElkGetRepositoriesResponse getAllElkRepository() {
79                 logger.debug("Inside getAllElkRepository ");
80                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
81                 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> getArchiveRepository = list -> {
82                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
83                         for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
84                                 if (!chkRepositoryName.test(eLkRepositoryMetaData)) {
85                                         repositories.add(eLkRepositoryMetaData);
86                                 }
87                         }
88                         return repositories;
89                 };
90                 List<ELkRepositoryMetaData> repositories = getArchiveRepository.apply(elkRepositoryMetaDataList);
91                 logger.debug("getArchiveRepository count: {} ", repositories.size());
92                 ElkGetRepositoriesResponse elkRepositoriesResponse = new ElkGetRepositoriesResponse();
93                 elkRepositoriesResponse.setRepositories(repositories);
94                 return elkRepositoriesResponse;
95         }
96
97         public String createElkRepository(ElkRepositoriesRequest elkCreateRepositoriesRequest) throws Exception {
98                 logger.debug("Inside createElkRepository ");
99                 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getRepositoryName())) {
100                         return "false | RepositoryName empty is not allowed";
101                 }
102                 ElkGetRepositoriesResponse response = getAllElkRepository();
103                 logger.debug("elkCreateRepositoriesRequest.getRepositoryName():{}" + elkCreateRepositoriesRequest.getRepositoryName());
104                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = response.getRepositories();
105                 Set<String> repositoryNameSet = new HashSet<>();
106                 for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
107                         repositoryNameSet.add(eLkRepositoryMetaData.getName());
108                         logger.debug("eLkRepositoryMetaData.getName():{}" + eLkRepositoryMetaData.getName());
109                 }
110                 boolean acknowledged = false;
111                 if (repositoryNameSet.contains(elkCreateRepositoriesRequest.getRepositoryName())) {
112                         throw new ELKException("false | RepositoryName already exist");
113                 } else {
114                         acknowledged = createRepo(elkCreateRepositoriesRequest, "backup");
115                         createRepo(elkCreateRepositoriesRequest, ElkClientConstants.ARCHIVE_ES_DATA);
116
117                         logger.debug("Repository is created ", acknowledged);
118                 }
119
120                 return String.valueOf(acknowledged);
121         }
122
123         @Override
124         public String deleteElkRepository(ElkRepositoriesRequest elkDeleteRepositoriesRequest) {
125                 logger.debug("Inside deleteElkRepository");
126                 RestHighLevelClient client = restHighLevelClientConnection();
127                 DeleteRepositoryRequest deleteRepositoryRequest = new DeleteRepositoryRequest(
128                                 elkDeleteRepositoriesRequest.getRepositoryName());
129                 deleteRepositoryRequest.masterNodeTimeout(elkDeleteRepositoriesRequest.getNodeTimeout());
130
131                 AcknowledgedResponse deleteAcknowledgedResponse;
132                 try {
133                         deleteAcknowledgedResponse = client.snapshot().deleteRepository(deleteRepositoryRequest,
134                                         RequestOptions.DEFAULT);
135                         client.close();
136                 } catch (IOException ex) {
137                         logger.debug("Exception:", ex);
138                         throw new ErrorTransport("Unable to connect Elasticserach");
139                 }
140                 boolean deleteAcknowledged = deleteAcknowledgedResponse.isAcknowledged();
141                 logger.debug("Repository is delete(true for created)... {}", deleteAcknowledged);
142
143                 return String.valueOf(deleteAcknowledged);
144         }
145
146         @Override
147         public ElkArchiveResponse getArchiveElkRepository() throws Exception {
148                 logger.debug("Inside getArchiveElkRepository");
149                 String action = ElkClientConstants.INFO;
150                 Predicate<ElasticsearchSnapshotsResponse> chkSnapshots = obj -> (obj.getSnapshots() != null
151                                 && obj.getSnapshots().size() > 0);
152                 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> filterArchiveRepository = list -> {
153                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
154                         for (ELkRepositoryMetaData eLkRepositoryMetaData : list) {
155                                 if (chkRepositoryName.test(eLkRepositoryMetaData)) {
156                                         repositories.add(eLkRepositoryMetaData);
157                                 }
158                         }
159                         return repositories;
160                 };
161
162                 Function<List<ArchiveInfo>, List<ArchiveInfo>> removeEmptyArchiveRepository = list -> {
163                         List<ArchiveInfo> archiveInfoList = new ArrayList<>();
164                         for (ArchiveInfo archiveInfo : list) {
165                                 if (archiveInfo.getSnapshots() != null) {
166                                         archiveInfoList.add(archiveInfo);
167                                 }
168                         }
169                         return archiveInfoList;
170                 };
171
172                 Function<ELkRepositoryMetaData, String> repoName = obj -> {
173                         JSONObject settingObj = obj.getSettings();
174                         String location = (String) settingObj.get("location");
175                         String[] p = location.split("/");
176                         return p[1];
177                 };
178
179                 BiPredicate<String, String> compareRepoName = (str1, str2) -> {
180                         if (str1 != null && str2 != null) {
181                                 return str1.equalsIgnoreCase(str2);
182                         }
183                         return false;
184                 };
185
186                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
187                 List<ELkRepositoryMetaData> archiveRepositories = filterArchiveRepository.apply(elkRepositoryMetaDataList);
188                 logger.debug("repositories count: {} ", archiveRepositories.size());
189                 ElkArchiveResponse elkArchiveResponse = archiveOperation(null, action);
190                 logger.debug("elkArchiveResponse.getMsg() {} elkArchiveResponse.getStatus() {}", elkArchiveResponse.getMsg(),
191                                 elkArchiveResponse.getStatus());
192                 if (elkArchiveResponse != null && elkArchiveResponse.getArchiveInfo() != null) {
193                         logger.debug("elkArchiveResponse.getArchiveInfo() count: {} ", elkArchiveResponse.getArchiveInfo().size());
194                         for (ArchiveInfo archiveInfo : elkArchiveResponse.getArchiveInfo()) {
195                                 for (ELkRepositoryMetaData eLkRepositoryMetaData : archiveRepositories) {
196                                         ElasticsearchSnapshotsResponse elasticsearchSnapshotsResponse = snapshotServiceImpl
197                                                         .getElasticsearchSnapshotDetails(null, eLkRepositoryMetaData.getName());
198                                         String repoArchiveName = repoName.apply(eLkRepositoryMetaData);
199                                         logger.debug(
200                                                         "repoArchiveName: {} archiveInfo.getRepositoryName(): {}  elasticsearchSnapshotsResponse.getRepositoryName(): {} ",
201                                                         repoArchiveName, archiveInfo.getRepositoryName(),
202                                                         elasticsearchSnapshotsResponse.getRepositoryName());
203                                         if (chkSnapshots.test(elasticsearchSnapshotsResponse)) {
204                                                 if (compareRepoName.test(archiveInfo.getRepositoryName(), repoArchiveName)) {
205                                                         archiveInfo.setSnapshots(elasticsearchSnapshotsResponse.getSnapshots());
206                                                 }
207                                         }
208                                 }
209                         }
210                         List<ArchiveInfo> filteredArchiveInfoList = removeEmptyArchiveRepository
211                                         .apply(elkArchiveResponse.getArchiveInfo());
212                         elkArchiveResponse.setArchiveInfo(filteredArchiveInfoList);
213                 }
214                 return elkArchiveResponse;
215         }
216
217         @Override
218         public ElkArchiveResponse archiveElkRepository(ElkArchiveRequest archiveRequest) throws Exception {
219                 logger.debug("Inside archiveElkRepository");
220                 String action = archiveRequest.getAction();
221                 logger.debug("action: {} ", action);
222                 logger.debug("archiveRequest: {} ", archiveRequest);
223                 ElkArchiveResponse elkArchiveResponse = archiveOperation(archiveRequest, action);
224                 logger.debug("elkArchiveResponse: {} ", elkArchiveResponse);
225                 return elkArchiveResponse;
226         }
227
228         private ElkArchiveResponse archiveOperation(ElkArchiveRequest archiveRequest, String action) throws Exception {
229                 logger.debug("Inside archiveOperation action:{}", action);
230                 Function<String, ArchiveInfo> f = str -> {
231                         String[] p = str.split(",");
232                         ArchiveInfo archiveInfo1 = new ArchiveInfo();
233                         if (p[0] != null && p[0].length() > 0 && p[1] != null && p[1].length() > 0) {
234                                 archiveInfo1 = new ArchiveInfo(p[0], p[1]);
235                         }
236                         logger.debug("archiveInfo1:{}", archiveInfo1);
237                         return archiveInfo1;
238                 };
239
240                 String result = null;
241         
242                 String[] archiveInfoArray;
243                 List<String> resultList = new ArrayList<>();
244                 if (action.equalsIgnoreCase(ElkClientConstants.INFO)) {
245                         try {
246                                 result = ElkServiceUtils.executeScript(action, "NA");
247                         
248                                 logger.debug("result INFO: {} ", result);
249                                 logger.debug("resultList.size(): {}", resultList.size());
250                                 resultList.add(result.trim());
251                         } catch (Exception ex) {
252                                 logger.debug("Exception:", ex);
253                                 throw new Exception("Error occured elk archive operation");
254                         }
255                 } else {
256                         try {
257                                 for (String repoName : archiveRequest.getRepositoryName()) {
258                                         result = ElkServiceUtils.executeScript(action, repoName);
259                                         logger.debug("result : {}", result);
260                                         resultList.add(result.trim());
261                                         logger.debug("resultList.size(): {}", resultList.size());
262
263                                         
264                                 }
265                         } catch (Exception ex) {
266                                 logger.debug("Exception:", ex);
267                                 throw new Exception("Error occured elk archive operation");
268                         }
269                 }
270                 boolean chkCSV = result.contains(",");
271                 logger.debug("chkCSV:{}", chkCSV);
272                 ElkArchiveResponse elkArchiveResponse = new ElkArchiveResponse();
273                 List<ArchiveInfo> archiveInfoList = new ArrayList<ArchiveInfo>();
274                 if (chkCSV) {
275                         for (String resultOuput : resultList) {
276                                 archiveInfoArray = resultOuput.split("\n");
277                                 for (String archiveInfo : archiveInfoArray) {
278                                         archiveInfoList.add(f.apply(archiveInfo));
279                                 }
280                         }
281                         logger.debug("archiveInfoList.size(): {}", archiveInfoList.size());
282                         elkArchiveResponse.setMsg("Action:" + action + " done");
283                         elkArchiveResponse.setStatus(ElkClientConstants.SUCCESS);
284                         elkArchiveResponse.setArchiveInfo(archiveInfoList);
285                         logger.debug("archiveInfoList:{}" + archiveInfoList);
286                         if (action.equalsIgnoreCase("RESTORE")) {
287                                 for (ArchiveInfo archiveInfo : archiveInfoList) {
288                                         ElkRepositoriesRequest elkCreateRepositoriesRequest = new ElkRepositoriesRequest();
289                                         elkCreateRepositoriesRequest.setRepositoryName(archiveInfo.getRepositoryName());
290                                         logger.debug("archiveInfo.getRepositoryName():{}" + archiveInfo.getRepositoryName());
291                                         elkCreateRepositoriesRequest.setNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
292                                         logger.debug("elkCreateRepositoriesRequest:{}" + elkCreateRepositoriesRequest);
293                                         createElkRepository(elkCreateRepositoriesRequest);
294                                 }
295                         }
296
297                 } else {
298                         logger.debug("result:" + result);
299                         if (result.contains("\n")) {
300                                 result = result.replace("\n", "");
301                                 logger.debug("result :{}" + result);
302                         }
303                         elkArchiveResponse.setStatus(ElkClientConstants.FAIL);
304                         elkArchiveResponse.setMsg(result.trim());
305                         logger.debug("elkArchiveResponse:{}" + elkArchiveResponse);
306                 }
307                 return elkArchiveResponse;
308         }
309
310         private List<ELkRepositoryMetaData> getAllRepository() {
311                 RestHighLevelClient client = restHighLevelClientConnection();
312                 GetRepositoriesRequest request1 = new GetRepositoriesRequest();
313                 request1.repositories();
314                 request1.local(true);
315
316                 try {
317                         GetRepositoriesResponse response = client.snapshot().getRepository(request1, RequestOptions.DEFAULT);
318                         List<RepositoryMetaData> repositoryMetaDataResponse = response.repositories();
319                         logger.debug("Number of repositoryMetaDataResponse size {}", repositoryMetaDataResponse.size());
320
321                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
322                         for (RepositoryMetaData snapshotStatusResponse : repositoryMetaDataResponse) {
323                                 logger.debug("\nNAME: {} \n TYPE: {} \n SETTINGS: {}", snapshotStatusResponse.name(),
324                                                 snapshotStatusResponse.type(), snapshotStatusResponse.settings());
325                                 ELkRepositoryMetaData repositoryMetaData = new ELkRepositoryMetaData();
326                                 repositoryMetaData.setName(snapshotStatusResponse.name());
327                                 repositoryMetaData.setType(snapshotStatusResponse.type());
328                                 JSONObject settings = new JSONObject();
329                                 for (String key : snapshotStatusResponse.settings().keySet()) {
330                                         settings.put(key, snapshotStatusResponse.settings().get(key));
331                                 }
332                                 repositoryMetaData.setSettings(settings);
333                                 repositories.add(repositoryMetaData);
334                         }
335                         client.close();
336                         return repositories;
337                 } catch (Exception ex) {
338                         logger.debug("Exception:", ex);
339                         throw new ErrorTransport("Unable to connect Elasticserach");
340                 }
341         }
342
343         private boolean createRepo(ElkRepositoriesRequest elkCreateRepositoriesRequest, String repoType) {
344                 RestHighLevelClient client = restHighLevelClientConnection();
345                 PutRepositoryRequest request = new PutRepositoryRequest();
346                 String locationKey = FsRepository.LOCATION_SETTING.getKey();
347                 String locationValue = elkCreateRepositoriesRequest.getRepositoryName().trim();
348                 String compressKey = FsRepository.COMPRESS_SETTING.getKey();
349                 logger.debug("locationKey:{}" + locationKey);
350                 logger.debug("locationValue:{}" + locationValue);
351                 logger.debug("compressKey:{}" + compressKey);
352                 boolean compressValue = true;
353                 if (repoType == ElkClientConstants.ARCHIVE_ES_DATA) {
354                         request.name(
355                                         ElkClientConstants.ARCHIVE_ES_DATA + "-" + elkCreateRepositoriesRequest.getRepositoryName().trim());
356                         locationValue = ElkClientConstants.ARCHIVE_ES_DATA + "/"
357                                         + elkCreateRepositoriesRequest.getRepositoryName().trim();
358                 } else {
359                         request.name(elkCreateRepositoriesRequest.getRepositoryName().trim());
360                 }
361                 Settings settings = Settings.builder().put(locationKey, locationValue).put(compressKey, compressValue).build();
362                 logger.debug("settings.size():{}" + settings.size());
363                 logger.debug("settings.toString():{}" + settings.toString());
364                 request.settings(settings);
365                 request.type(FsRepository.TYPE);
366                 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getNodeTimeout())) {
367                         request.masterNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
368                 } else {
369                         logger.debug("elkCreateRepositoriesRequest.getNodeTimeout():{}" + elkCreateRepositoriesRequest.getNodeTimeout());
370                         request.masterNodeTimeout(elkCreateRepositoriesRequest.getNodeTimeout());
371                 }
372                 request.verify(true);
373                 AcknowledgedResponse acknowledgedResponse;
374                 boolean acknowledged = false;
375                 try {
376                         acknowledgedResponse = client.snapshot().createRepository(request, RequestOptions.DEFAULT);
377                         acknowledged = acknowledgedResponse.isAcknowledged();
378                         logger.debug("acknowledged:{}" + acknowledged);
379                         client.close();
380                 } catch (IOException ex) {
381                         logger.debug("Exception:", ex);
382                         throw new ErrorTransport("Unable to connect Elasticserach");
383                 }
384                 return acknowledged;
385         }
386
387 }