Records Not Disappearing from Archived Page
[platform-oam.git] / elk-client / src / main / java / org / acumos / elk / client / service / SnapshotRepositoryServiceImpl.java
1 /*-
2  * ===============LICENSE_START=======================================================
3  * Acumos
4  * ===================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6  * ===================================================================================
7  * This Acumos software file is distributed by AT&T and Tech Mahindra
8  * under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *  
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  * This file is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ===============LICENSE_END=========================================================
19  */
20 package org.acumos.elk.client.service;
21
22 import java.io.IOException;
23 import java.lang.invoke.MethodHandles;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.function.BiPredicate;
29 import java.util.function.Function;
30 import java.util.function.Predicate;
31
32 import org.acumos.elk.client.transport.ArchiveInfo;
33 import org.acumos.elk.client.transport.ELkRepositoryMetaData;
34 import org.acumos.elk.client.transport.ElasticsearchSnapshotsResponse;
35 import org.acumos.elk.client.transport.ElkArchiveRequest;
36 import org.acumos.elk.client.transport.ElkArchiveResponse;
37 import org.acumos.elk.client.transport.ElkGetRepositoriesResponse;
38 import org.acumos.elk.client.transport.ElkGetSnapshotMetaData;
39 import org.acumos.elk.client.transport.ElkRepositoriesRequest;
40 import org.acumos.elk.client.transport.ElkSnapshotsResponse;
41 import org.acumos.elk.client.transport.ErrorTransport;
42 import org.acumos.elk.client.utils.ElkClientConstants;
43 import org.acumos.elk.client.utils.ElkServiceUtils;
44 import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
45 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
46 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
47 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
48 import org.elasticsearch.action.support.master.AcknowledgedResponse;
49 import org.elasticsearch.client.RequestOptions;
50 import org.elasticsearch.client.RestHighLevelClient;
51 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
52 import org.elasticsearch.common.settings.Settings;
53 import org.elasticsearch.repositories.fs.FsRepository;
54 import org.json.simple.JSONObject;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.stereotype.Service;
59 import org.springframework.util.StringUtils;
60
61 import com.acumos.elk.exception.ELKException;
62
63 /**
64  * Implementation of operation related to elastic stack repository.
65  *
66  */
67 @Service
68 public class SnapshotRepositoryServiceImpl extends AbstractELKClientConnection implements ISnapshotRepositoryService {
69
70         private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
71         private Predicate<ELkRepositoryMetaData> chkRepositoryName = obj -> (obj.getName() != null
72                         && obj.getName().startsWith(ElkClientConstants.ARCHIVE_ES_DATA, 0));
73
74         @Autowired
75         SnapshotServiceImpl snapshotServiceImpl;
76
77         @Override
78         public ElkGetRepositoriesResponse getAllElkRepository() {
79                 logger.debug("Inside getAllElkRepository ");
80                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
81                 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> getArchiveRepository = list -> {
82                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
83                         for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
84                                 if (!chkRepositoryName.test(eLkRepositoryMetaData)) {
85                                         repositories.add(eLkRepositoryMetaData);
86                                 }
87                         }
88                         return repositories;
89                 };
90                 List<ELkRepositoryMetaData> repositories = getArchiveRepository.apply(elkRepositoryMetaDataList);
91                 logger.debug("getArchiveRepository count: {} ", repositories.size());
92                 ElkGetRepositoriesResponse elkRepositoriesResponse = new ElkGetRepositoriesResponse();
93                 elkRepositoriesResponse.setRepositories(repositories);
94                 return elkRepositoriesResponse;
95         }
96
97         public String createElkRepository(ElkRepositoriesRequest elkCreateRepositoriesRequest) throws Exception {
98                 logger.debug("Inside createElkRepository ");
99                 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getRepositoryName())) {
100                         return "false | RepositoryName empty is not allowed";
101                 }
102                 ElkGetRepositoriesResponse response = getAllElkRepository();
103                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = response.getRepositories();
104                 Set<String> repositoryNameSet = new HashSet<>();
105                 for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
106                         repositoryNameSet.add(eLkRepositoryMetaData.getName());
107                 }
108                 boolean acknowledged = false;
109                 if (repositoryNameSet.contains(elkCreateRepositoriesRequest.getRepositoryName())) {
110                         throw new ELKException("false | RepositoryName already exist");
111                 } else {
112                         acknowledged = createRepo(elkCreateRepositoriesRequest, "backup");
113                         createRepo(elkCreateRepositoriesRequest, ElkClientConstants.ARCHIVE_ES_DATA);
114
115                         logger.debug("Repository is created ", acknowledged);
116                 }
117
118                 return String.valueOf(acknowledged);
119         }
120
121         @Override
122         public String deleteElkRepository(ElkRepositoriesRequest elkDeleteRepositoriesRequest) {
123                 logger.debug("Inside deleteElkRepository");
124                 RestHighLevelClient client = restHighLevelClientConnection();
125                 DeleteRepositoryRequest deleteRepositoryRequest = new DeleteRepositoryRequest(
126                                 elkDeleteRepositoriesRequest.getRepositoryName());
127                 deleteRepositoryRequest.masterNodeTimeout(elkDeleteRepositoriesRequest.getNodeTimeout());
128
129                 AcknowledgedResponse deleteAcknowledgedResponse;
130                 try {
131                         deleteAcknowledgedResponse = client.snapshot().deleteRepository(deleteRepositoryRequest,
132                                         RequestOptions.DEFAULT);
133                         client.close();
134                 } catch (IOException ex) {
135                         logger.debug("Exception:", ex);
136                         throw new ErrorTransport("Unable to connect Elasticserach");
137                 }
138                 boolean deleteAcknowledged = deleteAcknowledgedResponse.isAcknowledged();
139                 logger.debug("Repository is delete(true for created)... {}", deleteAcknowledged);
140
141                 return String.valueOf(deleteAcknowledged);
142         }
143
144         @Override
145         public ElkArchiveResponse getArchiveElkRepository() throws Exception {
146                 logger.debug("Inside getArchiveElkRepository");
147                 String action = ElkClientConstants.INFO;
148                 Predicate<ElasticsearchSnapshotsResponse> chkSnapshots = obj -> (obj.getSnapshots() != null
149                                 && obj.getSnapshots().size() > 0);
150                 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> filterArchiveRepository = list -> {
151                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
152                         for (ELkRepositoryMetaData eLkRepositoryMetaData : list) {
153                                 if (chkRepositoryName.test(eLkRepositoryMetaData)) {
154                                         repositories.add(eLkRepositoryMetaData);
155                                 }
156                         }
157                         return repositories;
158                 };
159
160                 Function<List<ArchiveInfo>, List<ArchiveInfo>> removeEmptyArchiveRepository = list -> {
161                         List<ArchiveInfo> archiveInfoList = new ArrayList<>();
162                         for (ArchiveInfo archiveInfo : list) {
163                                 if (archiveInfo.getSnapshots() != null) {
164                                         archiveInfoList.add(archiveInfo);
165                                 }
166                         }
167                         return archiveInfoList;
168                 };
169
170                 Function<ELkRepositoryMetaData, String> repoName = obj -> {
171                         JSONObject settingObj = obj.getSettings();
172                         String location = (String) settingObj.get("location");
173                         String[] p = location.split("/");
174                         return p[1];
175                 };
176
177                 BiPredicate<String, String> compareRepoName = (str1, str2) -> {
178                         if (str1 != null && str2 != null) {
179                                 return str1.equalsIgnoreCase(str2);
180                         }
181                         return false;
182                 };
183
184                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
185                 List<ELkRepositoryMetaData> archiveRepositories = filterArchiveRepository.apply(elkRepositoryMetaDataList);
186                 logger.debug("repositories count: {} ", archiveRepositories.size());
187                 ElkArchiveResponse elkArchiveResponse = archiveOperation(null, action);
188                 logger.debug("elkArchiveResponse.getMsg() {} elkArchiveResponse.getStatus() {}", elkArchiveResponse.getMsg(),
189                                 elkArchiveResponse.getStatus());
190                 if (elkArchiveResponse != null && elkArchiveResponse.getArchiveInfo() != null) {
191                         logger.debug("elkArchiveResponse.getArchiveInfo() count: {} ", elkArchiveResponse.getArchiveInfo().size());
192                         for (ArchiveInfo archiveInfo : elkArchiveResponse.getArchiveInfo()) {
193                                 for (ELkRepositoryMetaData eLkRepositoryMetaData : archiveRepositories) {
194                                         ElasticsearchSnapshotsResponse elasticsearchSnapshotsResponse = snapshotServiceImpl
195                                                         .getElasticsearchSnapshotDetails(null, eLkRepositoryMetaData.getName());
196                                         String repoArchiveName = repoName.apply(eLkRepositoryMetaData);
197                                         logger.debug(
198                                                         "repoArchiveName: {} archiveInfo.getRepositoryName(): {}  elasticsearchSnapshotsResponse.getRepositoryName(): {} ",
199                                                         repoArchiveName, archiveInfo.getRepositoryName(),
200                                                         elasticsearchSnapshotsResponse.getRepositoryName());
201                                         if (chkSnapshots.test(elasticsearchSnapshotsResponse)) {
202                                                 if (compareRepoName.test(archiveInfo.getRepositoryName(), repoArchiveName)) {
203                                                         archiveInfo.setSnapshots(elasticsearchSnapshotsResponse.getSnapshots());
204                                                 }
205                                         }
206                                 }
207                         }
208                         List<ArchiveInfo> filteredArchiveInfoList = removeEmptyArchiveRepository
209                                         .apply(elkArchiveResponse.getArchiveInfo());
210                         elkArchiveResponse.setArchiveInfo(filteredArchiveInfoList);
211                 }
212                 return elkArchiveResponse;
213         }
214
215         @Override
216         public ElkArchiveResponse archiveElkRepository(ElkArchiveRequest archiveRequest) throws Exception {
217                 logger.debug("Inside archiveElkRepository");
218                 String action = archiveRequest.getAction();
219                 ElkArchiveResponse elkArchiveResponse = archiveOperation(archiveRequest, action);
220                 return elkArchiveResponse;
221         }
222
223         private ElkArchiveResponse archiveOperation(ElkArchiveRequest archiveRequest, String action) throws Exception {
224                 logger.debug("Inside archiveOperation action:{}", action);
225                 Function<String, ArchiveInfo> f = str -> {
226                         String[] p = str.split(",");
227                         ArchiveInfo archiveInfo1 = new ArchiveInfo();
228                         if (p[0] != null && p[0].length() > 0 && p[1] != null && p[1].length() > 0) {
229                                 archiveInfo1 = new ArchiveInfo(p[0], p[1]);
230                         }
231                         return archiveInfo1;
232                 };
233
234                 String result = null;
235                 String resultDelete = null;
236                 String[] archiveInfoArray;
237                 List<String> resultList = new ArrayList<>();
238                 if (action.equalsIgnoreCase(ElkClientConstants.INFO)) {
239                         try {
240                                 result = ElkServiceUtils.executeScript(action, "NA");
241                                 resultList.add(result.trim());
242                         } catch (Exception ex) {
243                                 logger.debug("Exception:", ex);
244                                 throw new Exception("Error occured elk archive operation");
245                         }
246                 } else {
247                         try {
248                                 for (String repoName : archiveRequest.getRepositoryName()) {
249                                         result = ElkServiceUtils.executeScript(action, repoName);
250                                         resultList.add(result.trim());
251
252                                         if (action != null && !action.isEmpty()
253                                                         && action.equalsIgnoreCase(ElkClientConstants.RESTORE_REQUEST)) {
254                                                 resultDelete = ElkServiceUtils.executeScript(ElkClientConstants.DELETE_REQUEST, repoName);
255                                         }
256
257                                         resultList.add(resultDelete.trim());
258                                 }
259                         } catch (Exception ex) {
260                                 logger.debug("Exception:", ex);
261                                 throw new Exception("Error occured elk archive operation");
262                         }
263                 }
264                 boolean chkCSV = result.contains(",");
265                 logger.debug("chkCSV:{}", chkCSV);
266                 ElkArchiveResponse elkArchiveResponse = new ElkArchiveResponse();
267                 List<ArchiveInfo> archiveInfoList = new ArrayList<ArchiveInfo>();
268                 if (chkCSV) {
269                         for (String resultOuput : resultList) {
270                                 archiveInfoArray = resultOuput.split("\n");
271                                 for (String archiveInfo : archiveInfoArray) {
272                                         archiveInfoList.add(f.apply(archiveInfo));
273                                 }
274                         }
275                         elkArchiveResponse.setMsg("Action:" + action + " done");
276                         elkArchiveResponse.setStatus(ElkClientConstants.SUCCESS);
277                         elkArchiveResponse.setArchiveInfo(archiveInfoList);
278                         logger.debug("archiveInfoList:" + archiveInfoList);
279                         if (action.equalsIgnoreCase("RESTORE")) {
280                                 for (ArchiveInfo archiveInfo : archiveInfoList) {
281                                         ElkRepositoriesRequest elkCreateRepositoriesRequest = new ElkRepositoriesRequest();
282                                         elkCreateRepositoriesRequest.setRepositoryName(archiveInfo.getRepositoryName());
283                                         elkCreateRepositoriesRequest.setNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
284                                         createElkRepository(elkCreateRepositoriesRequest);
285                                 }
286                         }
287
288                 } else {
289                         logger.debug("result:" + result);
290                         if (result.contains("\n")) {
291                                 result = result.replace("\n", "");
292                         }
293                         elkArchiveResponse.setStatus(ElkClientConstants.FAIL);
294                         elkArchiveResponse.setMsg(result.trim());
295                 }
296                 return elkArchiveResponse;
297         }
298
299         private List<ELkRepositoryMetaData> getAllRepository() {
300                 RestHighLevelClient client = restHighLevelClientConnection();
301                 GetRepositoriesRequest request1 = new GetRepositoriesRequest();
302                 request1.repositories();
303                 request1.local(true);
304
305                 try {
306                         GetRepositoriesResponse response = client.snapshot().getRepository(request1, RequestOptions.DEFAULT);
307                         List<RepositoryMetaData> repositoryMetaDataResponse = response.repositories();
308                         logger.debug("Number of repositoryMetaDataResponse size {}", repositoryMetaDataResponse.size());
309
310                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
311                         for (RepositoryMetaData snapshotStatusResponse : repositoryMetaDataResponse) {
312                                 logger.debug("\nNAME: {} \n TYPE: {} \n SETTINGS: {}", snapshotStatusResponse.name(),
313                                                 snapshotStatusResponse.type(), snapshotStatusResponse.settings());
314                                 ELkRepositoryMetaData repositoryMetaData = new ELkRepositoryMetaData();
315                                 repositoryMetaData.setName(snapshotStatusResponse.name());
316                                 repositoryMetaData.setType(snapshotStatusResponse.type());
317                                 JSONObject settings = new JSONObject();
318                                 for (String key : snapshotStatusResponse.settings().keySet()) {
319                                         settings.put(key, snapshotStatusResponse.settings().get(key));
320                                 }
321                                 repositoryMetaData.setSettings(settings);
322                                 repositories.add(repositoryMetaData);
323                         }
324                         client.close();
325                         return repositories;
326                 } catch (Exception ex) {
327                         logger.debug("Exception:", ex);
328                         throw new ErrorTransport("Unable to connect Elasticserach");
329                 }
330         }
331
332         private boolean createRepo(ElkRepositoriesRequest elkCreateRepositoriesRequest, String repoType) {
333                 RestHighLevelClient client = restHighLevelClientConnection();
334                 PutRepositoryRequest request = new PutRepositoryRequest();
335                 String locationKey = FsRepository.LOCATION_SETTING.getKey();
336                 String locationValue = elkCreateRepositoriesRequest.getRepositoryName().trim();
337                 String compressKey = FsRepository.COMPRESS_SETTING.getKey();
338                 boolean compressValue = true;
339                 if (repoType == ElkClientConstants.ARCHIVE_ES_DATA) {
340                         request.name(
341                                         ElkClientConstants.ARCHIVE_ES_DATA + "-" + elkCreateRepositoriesRequest.getRepositoryName().trim());
342                         locationValue = ElkClientConstants.ARCHIVE_ES_DATA + "/"
343                                         + elkCreateRepositoriesRequest.getRepositoryName().trim();
344                 } else {
345                         request.name(elkCreateRepositoriesRequest.getRepositoryName().trim());
346                 }
347                 Settings settings = Settings.builder().put(locationKey, locationValue).put(compressKey, compressValue).build();
348                 request.settings(settings);
349                 request.type(FsRepository.TYPE);
350                 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getNodeTimeout())) {
351                         request.masterNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
352                 } else {
353                         request.masterNodeTimeout(elkCreateRepositoriesRequest.getNodeTimeout());
354                 }
355                 request.verify(true);
356                 AcknowledgedResponse acknowledgedResponse;
357                 boolean acknowledged = false;
358                 try {
359                         acknowledgedResponse = client.snapshot().createRepository(request, RequestOptions.DEFAULT);
360                         acknowledged = acknowledgedResponse.isAcknowledged();
361                         client.close();
362                 } catch (IOException ex) {
363                         logger.debug("Exception:", ex);
364                         throw new ErrorTransport("Unable to connect Elasticserach");
365                 }
366                 return acknowledged;
367         }
368
369 }