Getting 503 error while Restoring Repository
[platform-oam.git] / elk-client / src / main / java / org / acumos / elk / client / service / SnapshotRepositoryServiceImpl.java
1 /*-
2  * ===============LICENSE_START=======================================================
3  * Acumos
4  * ===================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6  * ===================================================================================
7  * This Acumos software file is distributed by AT&T and Tech Mahindra
8  * under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *  
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  * This file is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ===============LICENSE_END=========================================================
19  */
20 package org.acumos.elk.client.service;
21
22 import java.io.IOException;
23 import java.lang.invoke.MethodHandles;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.function.BiPredicate;
29 import java.util.function.Function;
30 import java.util.function.Predicate;
31
32 import org.acumos.elk.client.transport.ArchiveInfo;
33 import org.acumos.elk.client.transport.ELkRepositoryMetaData;
34 import org.acumos.elk.client.transport.ElasticsearchSnapshotsResponse;
35 import org.acumos.elk.client.transport.ElkArchiveRequest;
36 import org.acumos.elk.client.transport.ElkArchiveResponse;
37 import org.acumos.elk.client.transport.ElkGetRepositoriesResponse;
38 import org.acumos.elk.client.transport.ElkGetSnapshotMetaData;
39 import org.acumos.elk.client.transport.ElkRepositoriesRequest;
40 import org.acumos.elk.client.transport.ElkSnapshotsResponse;
41 import org.acumos.elk.client.transport.ErrorTransport;
42 import org.acumos.elk.client.utils.ElkClientConstants;
43 import org.acumos.elk.client.utils.ElkServiceUtils;
44 import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
45 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
46 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
47 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
48 import org.elasticsearch.action.support.master.AcknowledgedResponse;
49 import org.elasticsearch.client.RequestOptions;
50 import org.elasticsearch.client.RestHighLevelClient;
51 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
52 import org.elasticsearch.common.settings.Settings;
53 import org.elasticsearch.repositories.fs.FsRepository;
54 import org.json.simple.JSONObject;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.stereotype.Service;
59 import org.springframework.util.StringUtils;
60
61 import com.acumos.elk.exception.ELKException;
62
63 /**
64  * Implementation of operation related to elastic stack repository.
65  *
66  */
67 @Service
68 public class SnapshotRepositoryServiceImpl extends AbstractELKClientConnection implements ISnapshotRepositoryService {
69
70         private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
71         private Predicate<ELkRepositoryMetaData> chkRepositoryName = obj -> (obj.getName() != null
72                         && obj.getName().startsWith(ElkClientConstants.ARCHIVE_ES_DATA, 0));
73
74         @Autowired
75         SnapshotServiceImpl snapshotServiceImpl;
76
77         @Override
78         public ElkGetRepositoriesResponse getAllElkRepository() {
79                 logger.debug("Inside getAllElkRepository ");
80                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
81                 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> getArchiveRepository = list -> {
82                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
83                         for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
84                                 if (!chkRepositoryName.test(eLkRepositoryMetaData)) {
85                                         repositories.add(eLkRepositoryMetaData);
86                                 }
87                         }
88                         return repositories;
89                 };
90                 List<ELkRepositoryMetaData> repositories = getArchiveRepository.apply(elkRepositoryMetaDataList);
91                 logger.debug("getArchiveRepository count: {} ", repositories.size());
92                 ElkGetRepositoriesResponse elkRepositoriesResponse = new ElkGetRepositoriesResponse();
93                 elkRepositoriesResponse.setRepositories(repositories);
94                 return elkRepositoriesResponse;
95         }
96
97         public String createElkRepository(ElkRepositoriesRequest elkCreateRepositoriesRequest) throws Exception {
98                 logger.debug("Inside createElkRepository ");
99                 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getRepositoryName())) {
100                         return "false | RepositoryName empty is not allowed";
101                 }
102                 ElkGetRepositoriesResponse response = getAllElkRepository();
103                 logger.debug("elkCreateRepositoriesRequest.getRepositoryName():{}" + elkCreateRepositoriesRequest.getRepositoryName());
104                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = response.getRepositories();
105                 Set<String> repositoryNameSet = new HashSet<>();
106                 for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
107                         repositoryNameSet.add(eLkRepositoryMetaData.getName());
108                         logger.debug("eLkRepositoryMetaData.getName():{}" + eLkRepositoryMetaData.getName());
109                 }
110                 boolean acknowledged = false;
111                 if (repositoryNameSet.contains(elkCreateRepositoriesRequest.getRepositoryName())) {
112                         throw new ELKException("false | RepositoryName already exist");
113                 } else {
114                         acknowledged = createRepo(elkCreateRepositoriesRequest, "backup");
115                         createRepo(elkCreateRepositoriesRequest, ElkClientConstants.ARCHIVE_ES_DATA);
116
117                         logger.debug("Repository is created ", acknowledged);
118                 }
119
120                 return String.valueOf(acknowledged);
121         }
122
123         @Override
124         public String deleteElkRepository(ElkRepositoriesRequest elkDeleteRepositoriesRequest) {
125                 logger.debug("Inside deleteElkRepository");
126                 RestHighLevelClient client = restHighLevelClientConnection();
127                 DeleteRepositoryRequest deleteRepositoryRequest = new DeleteRepositoryRequest(
128                                 elkDeleteRepositoriesRequest.getRepositoryName());
129                 deleteRepositoryRequest.masterNodeTimeout(elkDeleteRepositoriesRequest.getNodeTimeout());
130
131                 AcknowledgedResponse deleteAcknowledgedResponse;
132                 try {
133                         deleteAcknowledgedResponse = client.snapshot().deleteRepository(deleteRepositoryRequest,
134                                         RequestOptions.DEFAULT);
135                         client.close();
136                 } catch (IOException ex) {
137                         logger.debug("Exception:", ex);
138                         throw new ErrorTransport("Unable to connect Elasticserach");
139                 }
140                 boolean deleteAcknowledged = deleteAcknowledgedResponse.isAcknowledged();
141                 logger.debug("Repository is delete(true for created)... {}", deleteAcknowledged);
142
143                 return String.valueOf(deleteAcknowledged);
144         }
145
146         @Override
147         public ElkArchiveResponse getArchiveElkRepository() throws Exception {
148                 logger.debug("Inside getArchiveElkRepository");
149                 String action = ElkClientConstants.INFO;
150                 Predicate<ElasticsearchSnapshotsResponse> chkSnapshots = obj -> (obj.getSnapshots() != null
151                                 && obj.getSnapshots().size() > 0);
152                 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> filterArchiveRepository = list -> {
153                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
154                         for (ELkRepositoryMetaData eLkRepositoryMetaData : list) {
155                                 if (chkRepositoryName.test(eLkRepositoryMetaData)) {
156                                         repositories.add(eLkRepositoryMetaData);
157                                 }
158                         }
159                         return repositories;
160                 };
161
162                 Function<List<ArchiveInfo>, List<ArchiveInfo>> removeEmptyArchiveRepository = list -> {
163                         List<ArchiveInfo> archiveInfoList = new ArrayList<>();
164                         for (ArchiveInfo archiveInfo : list) {
165                                 if (archiveInfo.getSnapshots() != null) {
166                                         archiveInfoList.add(archiveInfo);
167                                 }
168                         }
169                         return archiveInfoList;
170                 };
171
172                 Function<ELkRepositoryMetaData, String> repoName = obj -> {
173                         JSONObject settingObj = obj.getSettings();
174                         String location = (String) settingObj.get("location");
175                         String[] p = location.split("/");
176                         return p[1];
177                 };
178
179                 BiPredicate<String, String> compareRepoName = (str1, str2) -> {
180                         if (str1 != null && str2 != null) {
181                                 return str1.equalsIgnoreCase(str2);
182                         }
183                         return false;
184                 };
185
186                 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
187                 List<ELkRepositoryMetaData> archiveRepositories = filterArchiveRepository.apply(elkRepositoryMetaDataList);
188                 logger.debug("repositories count: {} ", archiveRepositories.size());
189                 ElkArchiveResponse elkArchiveResponse = archiveOperation(null, action);
190                 logger.debug("elkArchiveResponse.getMsg() {} elkArchiveResponse.getStatus() {}", elkArchiveResponse.getMsg(),
191                                 elkArchiveResponse.getStatus());
192                 if (elkArchiveResponse != null && elkArchiveResponse.getArchiveInfo() != null) {
193                         logger.debug("elkArchiveResponse.getArchiveInfo() count: {} ", elkArchiveResponse.getArchiveInfo().size());
194                         for (ArchiveInfo archiveInfo : elkArchiveResponse.getArchiveInfo()) {
195                                 for (ELkRepositoryMetaData eLkRepositoryMetaData : archiveRepositories) {
196                                         ElasticsearchSnapshotsResponse elasticsearchSnapshotsResponse = snapshotServiceImpl
197                                                         .getElasticsearchSnapshotDetails(null, eLkRepositoryMetaData.getName());
198                                         String repoArchiveName = repoName.apply(eLkRepositoryMetaData);
199                                         logger.debug(
200                                                         "repoArchiveName: {} archiveInfo.getRepositoryName(): {}  elasticsearchSnapshotsResponse.getRepositoryName(): {} ",
201                                                         repoArchiveName, archiveInfo.getRepositoryName(),
202                                                         elasticsearchSnapshotsResponse.getRepositoryName());
203                                         if (chkSnapshots.test(elasticsearchSnapshotsResponse)) {
204                                                 if (compareRepoName.test(archiveInfo.getRepositoryName(), repoArchiveName)) {
205                                                         archiveInfo.setSnapshots(elasticsearchSnapshotsResponse.getSnapshots());
206                                                 }
207                                         }
208                                 }
209                         }
210                         List<ArchiveInfo> filteredArchiveInfoList = removeEmptyArchiveRepository
211                                         .apply(elkArchiveResponse.getArchiveInfo());
212                         elkArchiveResponse.setArchiveInfo(filteredArchiveInfoList);
213                 }
214                 return elkArchiveResponse;
215         }
216
217         @Override
218         public ElkArchiveResponse archiveElkRepository(ElkArchiveRequest archiveRequest) throws Exception {
219                 logger.debug("Inside archiveElkRepository");
220                 String action = archiveRequest.getAction();
221                 logger.debug("action: {} ", action);
222                 logger.debug("archiveRequest: {} ", archiveRequest);
223                 ElkArchiveResponse elkArchiveResponse = archiveOperation(archiveRequest, action);
224                 logger.debug("elkArchiveResponse: {} ", elkArchiveResponse);
225                 return elkArchiveResponse;
226         }
227
228         private ElkArchiveResponse archiveOperation(ElkArchiveRequest archiveRequest, String action) throws Exception {
229                 logger.debug("Inside archiveOperation action:{}", action);
230                 Function<String, ArchiveInfo> f = str -> {
231                         String[] p = str.split(",");
232                         ArchiveInfo archiveInfo1 = new ArchiveInfo();
233                         if (p[0] != null && p[0].length() > 0 && p[1] != null && p[1].length() > 0) {
234                                 archiveInfo1 = new ArchiveInfo(p[0], p[1]);
235                         }
236                         logger.debug("archiveInfo1:{}", archiveInfo1);
237                         return archiveInfo1;
238                 };
239
240                 String result = null;
241         
242                 String[] archiveInfoArray;
243                 List<String> resultList = new ArrayList<>();
244                 if (action.equalsIgnoreCase(ElkClientConstants.INFO)) {
245                         try {
246                                 result = ElkServiceUtils.executeScript(action, "NA");
247                         
248                                 logger.debug("result INFO: {} ", result);
249                                 logger.debug("resultList.size(): {}", resultList.size());
250                                 resultList.add(result.trim());
251                         } catch (Exception ex) {
252                                 logger.debug("Exception:", ex);
253                                 throw new Exception("Error occured elk archive operation");
254                         }
255                 } else {
256                         try {
257                                 for (String repoName : archiveRequest.getRepositoryName()) {
258                                         result = ElkServiceUtils.executeScript(action, repoName);
259                                         logger.debug("result : {}", result);
260                                         resultList.add(result.trim());
261                                         logger.debug("resultList.size(): {}", resultList.size());
262                                         if (action.equalsIgnoreCase("delete")) {
263                                                 ElkRepositoriesRequest elkDeleteRepositoriesRequest= new ElkRepositoriesRequest();
264                                                 elkDeleteRepositoriesRequest.setNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
265                                                 elkDeleteRepositoriesRequest.setRepositoryName(repoName);
266                                                 deleteElkRepository(elkDeleteRepositoriesRequest);
267                     }
268                                         
269                                 }
270                         } catch (Exception ex) {
271                                 logger.debug("Exception:", ex);
272                                 throw new Exception("Error occured elk archive operation");
273                         }
274                 }
275                 boolean chkCSV = result.contains(",");
276                 logger.debug("chkCSV:{}", chkCSV);
277                 ElkArchiveResponse elkArchiveResponse = new ElkArchiveResponse();
278                 List<ArchiveInfo> archiveInfoList = new ArrayList<ArchiveInfo>();
279                 if (chkCSV) {
280                         for (String resultOuput : resultList) {
281                                 archiveInfoArray = resultOuput.split("\n");
282                                 for (String archiveInfo : archiveInfoArray) {
283                                         archiveInfoList.add(f.apply(archiveInfo));
284                                 }
285                         }
286                         logger.debug("archiveInfoList.size(): {}", archiveInfoList.size());
287                         elkArchiveResponse.setMsg("Action:" + action + " done");
288                         elkArchiveResponse.setStatus(ElkClientConstants.SUCCESS);
289                         elkArchiveResponse.setArchiveInfo(archiveInfoList);
290                         logger.debug("archiveInfoList:{}" + archiveInfoList);
291                         if (action.equalsIgnoreCase("RESTORE")) {
292                                 for (ArchiveInfo archiveInfo : archiveInfoList) {
293                                         ElkRepositoriesRequest elkCreateRepositoriesRequest = new ElkRepositoriesRequest();
294                                         elkCreateRepositoriesRequest.setRepositoryName(archiveInfo.getRepositoryName());
295                                         logger.debug("archiveInfo.getRepositoryName():{}" + archiveInfo.getRepositoryName());
296                                         elkCreateRepositoriesRequest.setNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
297                                         logger.debug("elkCreateRepositoriesRequest:{}" + elkCreateRepositoriesRequest);
298                                         createElkRepository(elkCreateRepositoriesRequest);
299                                 }
300                         }
301
302                 } else {
303                         logger.debug("result:" + result);
304                         if (result.contains("\n")) {
305                                 result = result.replace("\n", "");
306                                 logger.debug("result :{}" + result);
307                         }
308                         elkArchiveResponse.setStatus(ElkClientConstants.FAIL);
309                         elkArchiveResponse.setMsg(result.trim());
310                         logger.debug("elkArchiveResponse:{}" + elkArchiveResponse);
311                 }
312                 return elkArchiveResponse;
313         }
314
315         private List<ELkRepositoryMetaData> getAllRepository() {
316                 RestHighLevelClient client = restHighLevelClientConnection();
317                 GetRepositoriesRequest request1 = new GetRepositoriesRequest();
318                 request1.repositories();
319                 request1.local(true);
320
321                 try {
322                         GetRepositoriesResponse response = client.snapshot().getRepository(request1, RequestOptions.DEFAULT);
323                         List<RepositoryMetaData> repositoryMetaDataResponse = response.repositories();
324                         logger.debug("Number of repositoryMetaDataResponse size {}", repositoryMetaDataResponse.size());
325
326                         List<ELkRepositoryMetaData> repositories = new ArrayList<>();
327                         for (RepositoryMetaData snapshotStatusResponse : repositoryMetaDataResponse) {
328                                 logger.debug("\nNAME: {} \n TYPE: {} \n SETTINGS: {}", snapshotStatusResponse.name(),
329                                                 snapshotStatusResponse.type(), snapshotStatusResponse.settings());
330                                 ELkRepositoryMetaData repositoryMetaData = new ELkRepositoryMetaData();
331                                 repositoryMetaData.setName(snapshotStatusResponse.name());
332                                 repositoryMetaData.setType(snapshotStatusResponse.type());
333                                 JSONObject settings = new JSONObject();
334                                 for (String key : snapshotStatusResponse.settings().keySet()) {
335                                         settings.put(key, snapshotStatusResponse.settings().get(key));
336                                 }
337                                 repositoryMetaData.setSettings(settings);
338                                 repositories.add(repositoryMetaData);
339                         }
340                         client.close();
341                         return repositories;
342                 } catch (Exception ex) {
343                         logger.debug("Exception:", ex);
344                         throw new ErrorTransport("Unable to connect Elasticserach");
345                 }
346         }
347
348         private boolean createRepo(ElkRepositoriesRequest elkCreateRepositoriesRequest, String repoType) {
349                 RestHighLevelClient client = restHighLevelClientConnection();
350                 PutRepositoryRequest request = new PutRepositoryRequest();
351                 String locationKey = FsRepository.LOCATION_SETTING.getKey();
352                 String locationValue = elkCreateRepositoriesRequest.getRepositoryName().trim();
353                 String compressKey = FsRepository.COMPRESS_SETTING.getKey();
354                 logger.debug("locationKey:{}" + locationKey);
355                 logger.debug("locationValue:{}" + locationValue);
356                 logger.debug("compressKey:{}" + compressKey);
357                 boolean compressValue = true;
358                 if (repoType == ElkClientConstants.ARCHIVE_ES_DATA) {
359                         request.name(
360                                         ElkClientConstants.ARCHIVE_ES_DATA + "-" + elkCreateRepositoriesRequest.getRepositoryName().trim());
361                         locationValue = ElkClientConstants.ARCHIVE_ES_DATA + "/"
362                                         + elkCreateRepositoriesRequest.getRepositoryName().trim();
363                 } else {
364                         request.name(elkCreateRepositoriesRequest.getRepositoryName().trim());
365                 }
366                 Settings settings = Settings.builder().put(locationKey, locationValue).put(compressKey, compressValue).build();
367                 logger.debug("settings.size():{}" + settings.size());
368                 logger.debug("settings.toString():{}" + settings.toString());
369                 request.settings(settings);
370                 request.type(FsRepository.TYPE);
371                 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getNodeTimeout())) {
372                         request.masterNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
373                 } else {
374                         logger.debug("elkCreateRepositoriesRequest.getNodeTimeout():{}" + elkCreateRepositoriesRequest.getNodeTimeout());
375                         request.masterNodeTimeout(elkCreateRepositoriesRequest.getNodeTimeout());
376                 }
377                 request.verify(true);
378                 AcknowledgedResponse acknowledgedResponse;
379                 boolean acknowledged = false;
380                 try {
381                         acknowledgedResponse = client.snapshot().createRepository(request, RequestOptions.DEFAULT);
382                         acknowledged = acknowledgedResponse.isAcknowledged();
383                         logger.debug("acknowledged:{}" + acknowledged);
384                         client.close();
385                 } catch (IOException ex) {
386                         logger.debug("Exception:", ex);
387                         throw new ErrorTransport("Unable to connect Elasticserach");
388                 }
389                 return acknowledged;
390         }
391
392 }