18df590cc779312a6f253aae23dffe7d892ff329
[platform-oam.git] / elk-client / src / main / java / org / acumos / elk / client / service / SnapshotRepositoryServiceImpl.java
1 /*-
2  * ===============LICENSE_START=======================================================
3  * Acumos
4  * ===================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6  * ===================================================================================
7  * This Acumos software file is distributed by AT&T and Tech Mahindra
8  * under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *  
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  * This file is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ===============LICENSE_END=========================================================
19  */
20 package org.acumos.elk.client.service;
21
22 import java.io.IOException;
23 import java.lang.invoke.MethodHandles;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.function.BiPredicate;
29 import java.util.function.Function;
30 import java.util.function.Predicate;
31
32 import org.acumos.elk.client.transport.ArchiveInfo;
33 import org.acumos.elk.client.transport.ELkRepositoryMetaData;
34 import org.acumos.elk.client.transport.ElasticsearchSnapshotsResponse;
35 import org.acumos.elk.client.transport.ElkArchiveRequest;
36 import org.acumos.elk.client.transport.ElkArchiveResponse;
37 import org.acumos.elk.client.transport.ElkGetRepositoriesResponse;
38 import org.acumos.elk.client.transport.ElkGetSnapshotMetaData;
39 import org.acumos.elk.client.transport.ElkRepositoriesRequest;
40 import org.acumos.elk.client.transport.ElkSnapshotsResponse;
41 import org.acumos.elk.client.transport.ErrorTransport;
42 import org.acumos.elk.client.utils.ElkClientConstants;
43 import org.acumos.elk.client.utils.ElkServiceUtils;
44 import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
45 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
46 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
47 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
48 import org.elasticsearch.action.support.master.AcknowledgedResponse;
49 import org.elasticsearch.client.RequestOptions;
50 import org.elasticsearch.client.RestHighLevelClient;
51 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
52 import org.elasticsearch.common.settings.Settings;
53 import org.elasticsearch.repositories.fs.FsRepository;
54 import org.json.simple.JSONObject;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.stereotype.Service;
59 import org.springframework.util.StringUtils;
60
61 import com.acumos.elk.exception.ELKException;
62
63 /**
64  * Implementation of operation related to elastic stack repository.
65  *
66  */
67 @Service
68 public class SnapshotRepositoryServiceImpl extends AbstractELKClientConnection implements ISnapshotRepositoryService {
69
70         private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
71         private Predicate<ELkRepositoryMetaData> chkRepositoryName = obj -> (obj.getName() != null
72                         && obj.getName().startsWith(ElkClientConstants.ARCHIVE_ES_DATA, 0));
73
74         @Autowired
75         SnapshotServiceImpl snapshotServiceImpl;
76
77         @Override
78         public ElkGetRepositoriesResponse getAllElkRepository() {
79                 logger.debug("Inside getAllElkRepository ");
80                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
81                 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> getArchiveRepository = list -> {
82                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
83                         for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
84                                 if (!chkRepositoryName.test(eLkRepositoryMetaData)) {
85                                         repositories.add(eLkRepositoryMetaData);
86                                 }
87                         }
88                         return repositories;
89                 };
90                 List<ELkRepositoryMetaData> repositories = getArchiveRepository.apply(elkRepositoryMetaDataList);
91                 logger.debug("getArchiveRepository count: {} ", repositories.size());
92                 ElkGetRepositoriesResponse elkRepositoriesResponse = new ElkGetRepositoriesResponse();
93                 elkRepositoriesResponse.setRepositories(repositories);
94                 return elkRepositoriesResponse;
95         }
96
97         public String createElkRepository(ElkRepositoriesRequest elkCreateRepositoriesRequest) throws Exception {
98                 logger.debug("Inside createElkRepository ");
99                 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getRepositoryName())) {
100                         return "false | RepositoryName empty is not allowed";
101                 }
102                 ElkGetRepositoriesResponse response = getAllElkRepository();
103                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = response.getRepositories();
104                 Set<String> repositoryNameSet = new HashSet<>();
105                 for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
106                         repositoryNameSet.add(eLkRepositoryMetaData.getName());
107                 }
108                 boolean acknowledged = false;
109                 if (repositoryNameSet.contains(elkCreateRepositoriesRequest.getRepositoryName())) {
110                         throw new ELKException("false | RepositoryName already exist");
111                 } else {
112                         acknowledged = createRepo(elkCreateRepositoriesRequest, "backup");
113                         createRepo(elkCreateRepositoriesRequest, ElkClientConstants.ARCHIVE_ES_DATA);
114
115                         logger.debug("Repository is created ", acknowledged);
116                 }               
117                 return String.valueOf(acknowledged);
118         }
119
120         @Override
121         public String deleteElkRepository(ElkRepositoriesRequest elkDeleteRepositoriesRequest) {
122                 logger.debug("Inside deleteElkRepository");
123                 RestHighLevelClient client = restHighLevelClientConnection();
124                 DeleteRepositoryRequest deleteRepositoryRequest = new DeleteRepositoryRequest(
125                                 elkDeleteRepositoriesRequest.getRepositoryName());
126                 deleteRepositoryRequest.masterNodeTimeout(elkDeleteRepositoriesRequest.getNodeTimeout());
127
128                 AcknowledgedResponse deleteAcknowledgedResponse;
129                 try {
130                         deleteAcknowledgedResponse = client.snapshot().deleteRepository(deleteRepositoryRequest,
131                                         RequestOptions.DEFAULT);
132                         client.close();
133                 } catch (IOException ex) {
134                         logger.debug("Exception:", ex);
135                         throw new ErrorTransport("Unable to connect Elasticserach");
136                 }
137                 boolean deleteAcknowledged = deleteAcknowledgedResponse.isAcknowledged();
138                 logger.debug("Repository is delete(true for created)... {}", deleteAcknowledged);
139
140                 return String.valueOf(deleteAcknowledged);
141         }
142
143         @Override
144         public ElkArchiveResponse getArchiveElkRepository() throws Exception {
145                 logger.debug("Inside getArchiveElkRepository");
146                 String action = ElkClientConstants.INFO;
147                 Predicate<ElasticsearchSnapshotsResponse> chkSnapshots = obj -> (obj.getSnapshots() != null
148                                 && obj.getSnapshots().size() > 0);
149                 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> filterArchiveRepository = list -> {
150                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
151                         for (ELkRepositoryMetaData eLkRepositoryMetaData : list) {
152                                 if (chkRepositoryName.test(eLkRepositoryMetaData)) {
153                                         repositories.add(eLkRepositoryMetaData);
154                                 }
155                         }
156                         return repositories;
157                 };
158
159                 Function<List<ArchiveInfo>, List<ArchiveInfo>> removeEmptyArchiveRepository = list -> {
160                         List<ArchiveInfo> archiveInfoList = new ArrayList<>();
161                         for (ArchiveInfo archiveInfo : list) {
162                                 if (archiveInfo.getSnapshots() != null) {
163                                         archiveInfoList.add(archiveInfo);
164                                 }
165                         }
166                         return archiveInfoList;
167                 };
168
169                 Function<ELkRepositoryMetaData, String> repoName = obj -> {
170                         JSONObject settingObj = obj.getSettings();
171                         String location = (String) settingObj.get("location");
172                         String[] p = location.split("/");
173                         return p[1];
174                 };
175
176                 BiPredicate<String, String> compareRepoName = (str1, str2) -> {
177                         if (str1 != null && str2 != null) {
178                                 return str1.equalsIgnoreCase(str2);
179                         }
180                         return false;
181                 };
182
183                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
184                 List<ELkRepositoryMetaData> archiveRepositories = filterArchiveRepository.apply(elkRepositoryMetaDataList);
185                 logger.debug("repositories count: {} ", archiveRepositories.size());
186                 ElkArchiveResponse elkArchiveResponse = archiveOperation(null, action);
187                 logger.debug("elkArchiveResponse.getMsg() {} elkArchiveResponse.getStatus() {}", elkArchiveResponse.getMsg(),
188                                 elkArchiveResponse.getStatus());
189                 if (elkArchiveResponse != null && elkArchiveResponse.getArchiveInfo() != null) {
190                         logger.debug("elkArchiveResponse.getArchiveInfo() count: {} ", elkArchiveResponse.getArchiveInfo().size());
191                         for (ArchiveInfo archiveInfo : elkArchiveResponse.getArchiveInfo()) {
192                                 for (ELkRepositoryMetaData eLkRepositoryMetaData : archiveRepositories) {
193                                         ElasticsearchSnapshotsResponse elasticsearchSnapshotsResponse = snapshotServiceImpl
194                                                         .getElasticsearchSnapshotDetails(null, eLkRepositoryMetaData.getName());
195                                         String repoArchiveName = repoName.apply(eLkRepositoryMetaData);
196                                         logger.debug(
197                                                         "repoArchiveName: {} archiveInfo.getRepositoryName(): {}  elasticsearchSnapshotsResponse.getRepositoryName(): {} ",
198                                                         repoArchiveName, archiveInfo.getRepositoryName(),
199                                                         elasticsearchSnapshotsResponse.getRepositoryName());
200                                         if (chkSnapshots.test(elasticsearchSnapshotsResponse)) {
201                                                 if (compareRepoName.test(archiveInfo.getRepositoryName(), repoArchiveName)) {
202                                                         archiveInfo.setSnapshots(elasticsearchSnapshotsResponse.getSnapshots());
203                                                 }
204                                         }
205                                 }
206                         }
207                         List<ArchiveInfo> filteredArchiveInfoList = removeEmptyArchiveRepository
208                                         .apply(elkArchiveResponse.getArchiveInfo());
209                         elkArchiveResponse.setArchiveInfo(filteredArchiveInfoList);
210                 }
211                 return elkArchiveResponse;
212         }
213
214         @Override
215         public ElkArchiveResponse archiveElkRepository(ElkArchiveRequest archiveRequest) throws Exception {
216                 logger.debug("Inside archiveElkRepository");
217                 String action = archiveRequest.getAction();
218                 ElkArchiveResponse elkArchiveResponse = archiveOperation(archiveRequest, action);
219                 return elkArchiveResponse;
220         }
221
222         private ElkArchiveResponse archiveOperation(ElkArchiveRequest archiveRequest, String action) throws Exception {
223                 logger.debug("Inside archiveOperation action:{}", action);
224                 Function<String, ArchiveInfo> f = str -> {
225                         String[] p = str.split(",");
226                         ArchiveInfo archiveInfo1 = new ArchiveInfo();
227                         if (p[0] != null && p[0].length() > 0 && p[1] != null && p[1].length() > 0) {
228                                 archiveInfo1 = new ArchiveInfo(p[0], p[1]);
229                         }
230                         return archiveInfo1;
231                 };
232
233                 String result = null;
234                 String[] archiveInfoArray;
235                 List<String> resultList = new ArrayList<>();
236                 if (action.equalsIgnoreCase(ElkClientConstants.INFO)) {
237                         try {
238                                 result = ElkServiceUtils.executeScript(action, "NA");
239                                 resultList.add(result.trim());
240                         } catch (Exception ex) {
241                                 logger.debug("Exception:", ex);
242                                 throw new Exception("Error occured elk archive operation");
243                         }
244                 } else {
245                         try {
246                                 for (String repoName : archiveRequest.getRepositoryName()) {
247                                         result = ElkServiceUtils.executeScript(action, repoName);
248                                         resultList.add(result.trim());
249                                         
250                                 }
251                         } catch (Exception ex) {
252                                 logger.debug("Exception:", ex);
253                                 throw new Exception("Error occured elk archive operation");
254                         }
255                 }
256                 boolean chkCSV = result.contains(",");
257                 logger.debug("chkCSV:{}", chkCSV);
258                 ElkArchiveResponse elkArchiveResponse = new ElkArchiveResponse();
259                 List<ArchiveInfo> archiveInfoList = new ArrayList<ArchiveInfo>();
260                 if (chkCSV) {
261                         for (String resultOuput : resultList) {
262                                 archiveInfoArray = resultOuput.split("\n");
263                                 for (String archiveInfo : archiveInfoArray) {
264                                         archiveInfoList.add(f.apply(archiveInfo));
265                                 }
266                         }
267                         elkArchiveResponse.setMsg("Action:" + action + " done");
268                         elkArchiveResponse.setStatus(ElkClientConstants.SUCCESS);
269                         elkArchiveResponse.setArchiveInfo(archiveInfoList);
270                         logger.debug("archiveInfoList:" + archiveInfoList);
271                         if (action.equalsIgnoreCase("RESTORE")) {
272                                 for (ArchiveInfo archiveInfo : archiveInfoList) {
273                                         ElkRepositoriesRequest elkCreateRepositoriesRequest = new ElkRepositoriesRequest();
274                                         elkCreateRepositoriesRequest.setRepositoryName(archiveInfo.getRepositoryName());
275                                         elkCreateRepositoriesRequest.setNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
276                                         createElkRepository(elkCreateRepositoriesRequest);
277                                 }
278                         }
279
280                 } else {
281                         logger.debug("result:" + result);
282                         if (result.contains("\n")) {
283                                 result = result.replace("\n", "");
284                         }
285                         elkArchiveResponse.setStatus(ElkClientConstants.FAIL);
286                         elkArchiveResponse.setMsg(result.trim());
287                 }
288                 return elkArchiveResponse;
289         }
290
291         private List<ELkRepositoryMetaData> getAllRepository() {
292                 RestHighLevelClient client = restHighLevelClientConnection();
293                 GetRepositoriesRequest request1 = new GetRepositoriesRequest();
294                 request1.repositories();
295                 request1.local(true);
296
297                 try {
298                         GetRepositoriesResponse response = client.snapshot().getRepository(request1, RequestOptions.DEFAULT);
299                         List<RepositoryMetaData> repositoryMetaDataResponse = response.repositories();
300                         logger.debug("Number of repositoryMetaDataResponse size {}", repositoryMetaDataResponse.size());
301
302                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
303                         for (RepositoryMetaData snapshotStatusResponse : repositoryMetaDataResponse) {
304                                 logger.debug("\nNAME: {} \n TYPE: {} \n SETTINGS: {}", snapshotStatusResponse.name(),
305                                                 snapshotStatusResponse.type(), snapshotStatusResponse.settings());
306                                 ELkRepositoryMetaData repositoryMetaData = new ELkRepositoryMetaData();
307                                 repositoryMetaData.setName(snapshotStatusResponse.name());
308                                 repositoryMetaData.setType(snapshotStatusResponse.type());
309                                 JSONObject settings = new JSONObject();
310                                 for (String key : snapshotStatusResponse.settings().keySet()) {
311                                         settings.put(key, snapshotStatusResponse.settings().get(key));
312                                 }
313                                 repositoryMetaData.setSettings(settings);
314                                 repositories.add(repositoryMetaData);
315                         }
316                         client.close();
317                         return repositories;
318                 } catch (Exception ex) {
319                         logger.debug("Exception:", ex);
320                         throw new ErrorTransport("Unable to connect Elasticserach");
321                 }
322         }
323
324         private boolean createRepo(ElkRepositoriesRequest elkCreateRepositoriesRequest, String repoType) {
325                 RestHighLevelClient client = restHighLevelClientConnection();
326                 PutRepositoryRequest request = new PutRepositoryRequest();
327                 String locationKey = FsRepository.LOCATION_SETTING.getKey();
328                 String locationValue = elkCreateRepositoriesRequest.getRepositoryName().trim();
329                 String compressKey = FsRepository.COMPRESS_SETTING.getKey();
330                 boolean compressValue = true;
331                 if (repoType == ElkClientConstants.ARCHIVE_ES_DATA) {
332                         request.name(
333                                         ElkClientConstants.ARCHIVE_ES_DATA + "-" + elkCreateRepositoriesRequest.getRepositoryName().trim());
334                         locationValue = ElkClientConstants.ARCHIVE_ES_DATA + "/"
335                                         + elkCreateRepositoriesRequest.getRepositoryName().trim();
336                 } else {
337                         request.name(elkCreateRepositoriesRequest.getRepositoryName().trim());
338                 }
339                 Settings settings = Settings.builder().put(locationKey, locationValue).put(compressKey, compressValue).build();
340                 request.settings(settings);
341                 request.type(FsRepository.TYPE);
342                 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getNodeTimeout())) {
343                         request.masterNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
344                 } else {
345                         request.masterNodeTimeout(elkCreateRepositoriesRequest.getNodeTimeout());
346                 }
347                 request.verify(true);
348                 AcknowledgedResponse acknowledgedResponse;
349                 boolean acknowledged = false;
350                 try {
351                         acknowledgedResponse = client.snapshot().createRepository(request, RequestOptions.DEFAULT);
352                         acknowledged = acknowledgedResponse.isAcknowledged();
353                         client.close();
354                 } catch (IOException ex) {
355                         logger.debug("Exception:", ex);
356                         throw new ErrorTransport("Unable to connect Elasticserach");
357                 }
358                 return acknowledged;
359         }
360
361 }