2 * ===============LICENSE_START=======================================================
4 * ===================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6 * ===================================================================================
7 * This Acumos software file is distributed by AT&T and Tech Mahindra
8 * under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * This file is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ===============LICENSE_END=========================================================
20 package org.acumos.elk.client.service;
22 import java.io.IOException;
23 import java.lang.invoke.MethodHandles;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.function.BiPredicate;
29 import java.util.function.Function;
30 import java.util.function.Predicate;
32 import org.acumos.elk.client.transport.ArchiveInfo;
33 import org.acumos.elk.client.transport.ELkRepositoryMetaData;
34 import org.acumos.elk.client.transport.ElasticsearchSnapshotsResponse;
35 import org.acumos.elk.client.transport.ElkArchiveRequest;
36 import org.acumos.elk.client.transport.ElkArchiveResponse;
37 import org.acumos.elk.client.transport.ElkGetRepositoriesResponse;
38 import org.acumos.elk.client.transport.ElkGetSnapshotMetaData;
39 import org.acumos.elk.client.transport.ElkRepositoriesRequest;
40 import org.acumos.elk.client.transport.ElkSnapshotsResponse;
41 import org.acumos.elk.client.transport.ErrorTransport;
42 import org.acumos.elk.client.utils.ElkClientConstants;
43 import org.acumos.elk.client.utils.ElkServiceUtils;
44 import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
45 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
46 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
47 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
48 import org.elasticsearch.action.support.master.AcknowledgedResponse;
49 import org.elasticsearch.client.RequestOptions;
50 import org.elasticsearch.client.RestHighLevelClient;
51 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
52 import org.elasticsearch.common.settings.Settings;
53 import org.elasticsearch.repositories.fs.FsRepository;
54 import org.json.simple.JSONObject;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.stereotype.Service;
59 import org.springframework.util.StringUtils;
61 import com.acumos.elk.exception.ELKException;
64 * Implementation of operation related to elastic stack repository.
68 public class SnapshotRepositoryServiceImpl extends AbstractELKClientConnection implements ISnapshotRepositoryService {
70 private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
71 private Predicate<ELkRepositoryMetaData> chkRepositoryName = obj -> (obj.getName() != null
72 && obj.getName().startsWith(ElkClientConstants.ARCHIVE_ES_DATA, 0));
75 SnapshotServiceImpl snapshotServiceImpl;
78 public ElkGetRepositoriesResponse getAllElkRepository() {
79 logger.debug("Inside getAllElkRepository ");
80 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
81 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> getArchiveRepository = list -> {
82 List<ELkRepositoryMetaData> repositories = new ArrayList<>();
83 for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
84 if (!chkRepositoryName.test(eLkRepositoryMetaData)) {
85 repositories.add(eLkRepositoryMetaData);
90 List<ELkRepositoryMetaData> repositories = getArchiveRepository.apply(elkRepositoryMetaDataList);
91 logger.debug("getArchiveRepository count: {} ", repositories.size());
92 ElkGetRepositoriesResponse elkRepositoriesResponse = new ElkGetRepositoriesResponse();
93 elkRepositoriesResponse.setRepositories(repositories);
94 return elkRepositoriesResponse;
97 public String createElkRepository(ElkRepositoriesRequest elkCreateRepositoriesRequest) throws Exception {
98 logger.debug("Inside createElkRepository ");
99 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getRepositoryName())) {
100 return "false | RepositoryName empty is not allowed";
102 ElkGetRepositoriesResponse response = getAllElkRepository();
103 logger.debug("elkCreateRepositoriesRequest.getRepositoryName():{}" + elkCreateRepositoriesRequest.getRepositoryName());
104 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = response.getRepositories();
105 Set<String> repositoryNameSet = new HashSet<>();
106 for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
107 repositoryNameSet.add(eLkRepositoryMetaData.getName());
108 logger.debug("eLkRepositoryMetaData.getName():{}" + eLkRepositoryMetaData.getName());
110 boolean acknowledged = false;
111 if (repositoryNameSet.contains(elkCreateRepositoriesRequest.getRepositoryName())) {
112 throw new ELKException("false | RepositoryName already exist");
114 acknowledged = createRepo(elkCreateRepositoriesRequest, "backup");
115 createRepo(elkCreateRepositoriesRequest, ElkClientConstants.ARCHIVE_ES_DATA);
117 logger.debug("Repository is created ", acknowledged);
120 return String.valueOf(acknowledged);
124 public String deleteElkRepository(ElkRepositoriesRequest elkDeleteRepositoriesRequest) {
125 logger.debug("Inside deleteElkRepository");
126 RestHighLevelClient client = restHighLevelClientConnection();
127 DeleteRepositoryRequest deleteRepositoryRequest = new DeleteRepositoryRequest(
128 elkDeleteRepositoriesRequest.getRepositoryName());
129 deleteRepositoryRequest.masterNodeTimeout(elkDeleteRepositoriesRequest.getNodeTimeout());
131 AcknowledgedResponse deleteAcknowledgedResponse;
133 deleteAcknowledgedResponse = client.snapshot().deleteRepository(deleteRepositoryRequest,
134 RequestOptions.DEFAULT);
136 } catch (IOException ex) {
137 logger.debug("Exception:", ex);
138 throw new ErrorTransport("Unable to connect Elasticserach");
140 boolean deleteAcknowledged = deleteAcknowledgedResponse.isAcknowledged();
141 logger.debug("Repository is delete(true for created)... {}", deleteAcknowledged);
143 return String.valueOf(deleteAcknowledged);
147 public ElkArchiveResponse getArchiveElkRepository() throws Exception {
148 logger.debug("Inside getArchiveElkRepository");
149 String action = ElkClientConstants.INFO;
150 Predicate<ElasticsearchSnapshotsResponse> chkSnapshots = obj -> (obj.getSnapshots() != null
151 && obj.getSnapshots().size() > 0);
152 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> filterArchiveRepository = list -> {
153 List<ELkRepositoryMetaData> repositories = new ArrayList<>();
154 for (ELkRepositoryMetaData eLkRepositoryMetaData : list) {
155 if (chkRepositoryName.test(eLkRepositoryMetaData)) {
156 repositories.add(eLkRepositoryMetaData);
162 Function<List<ArchiveInfo>, List<ArchiveInfo>> removeEmptyArchiveRepository = list -> {
163 List<ArchiveInfo> archiveInfoList = new ArrayList<>();
164 for (ArchiveInfo archiveInfo : list) {
165 if (archiveInfo.getSnapshots() != null) {
166 archiveInfoList.add(archiveInfo);
169 return archiveInfoList;
172 Function<ELkRepositoryMetaData, String> repoName = obj -> {
173 JSONObject settingObj = obj.getSettings();
174 String location = (String) settingObj.get("location");
175 String[] p = location.split("/");
179 BiPredicate<String, String> compareRepoName = (str1, str2) -> {
180 if (str1 != null && str2 != null) {
181 return str1.equalsIgnoreCase(str2);
186 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
187 List<ELkRepositoryMetaData> archiveRepositories = filterArchiveRepository.apply(elkRepositoryMetaDataList);
188 logger.debug("repositories count: {} ", archiveRepositories.size());
189 ElkArchiveResponse elkArchiveResponse = archiveOperation(null, action);
190 logger.debug("elkArchiveResponse.getMsg() {} elkArchiveResponse.getStatus() {}", elkArchiveResponse.getMsg(),
191 elkArchiveResponse.getStatus());
192 if (elkArchiveResponse != null && elkArchiveResponse.getArchiveInfo() != null) {
193 logger.debug("elkArchiveResponse.getArchiveInfo() count: {} ", elkArchiveResponse.getArchiveInfo().size());
194 for (ArchiveInfo archiveInfo : elkArchiveResponse.getArchiveInfo()) {
195 for (ELkRepositoryMetaData eLkRepositoryMetaData : archiveRepositories) {
196 ElasticsearchSnapshotsResponse elasticsearchSnapshotsResponse = snapshotServiceImpl
197 .getElasticsearchSnapshotDetails(null, eLkRepositoryMetaData.getName());
198 String repoArchiveName = repoName.apply(eLkRepositoryMetaData);
200 "repoArchiveName: {} archiveInfo.getRepositoryName(): {} elasticsearchSnapshotsResponse.getRepositoryName(): {} ",
201 repoArchiveName, archiveInfo.getRepositoryName(),
202 elasticsearchSnapshotsResponse.getRepositoryName());
203 if (chkSnapshots.test(elasticsearchSnapshotsResponse)) {
204 if (compareRepoName.test(archiveInfo.getRepositoryName(), repoArchiveName)) {
205 archiveInfo.setSnapshots(elasticsearchSnapshotsResponse.getSnapshots());
210 List<ArchiveInfo> filteredArchiveInfoList = removeEmptyArchiveRepository
211 .apply(elkArchiveResponse.getArchiveInfo());
212 elkArchiveResponse.setArchiveInfo(filteredArchiveInfoList);
214 return elkArchiveResponse;
218 public ElkArchiveResponse archiveElkRepository(ElkArchiveRequest archiveRequest) throws Exception {
219 logger.debug("Inside archiveElkRepository");
220 String action = archiveRequest.getAction();
221 logger.debug("action: {} ", action);
222 logger.debug("archiveRequest: {} ", archiveRequest);
223 ElkArchiveResponse elkArchiveResponse = archiveOperation(archiveRequest, action);
224 logger.debug("elkArchiveResponse: {} ", elkArchiveResponse);
225 return elkArchiveResponse;
228 private ElkArchiveResponse archiveOperation(ElkArchiveRequest archiveRequest, String action) throws Exception {
229 logger.debug("Inside archiveOperation action:{}", action);
230 Function<String, ArchiveInfo> f = str -> {
231 String[] p = str.split(",");
232 ArchiveInfo archiveInfo1 = new ArchiveInfo();
233 if (p[0] != null && p[0].length() > 0 && p[1] != null && p[1].length() > 0) {
234 archiveInfo1 = new ArchiveInfo(p[0], p[1]);
236 logger.debug("archiveInfo1:{}", archiveInfo1);
240 String result = null;
242 String[] archiveInfoArray;
243 List<String> resultList = new ArrayList<>();
244 if (action.equalsIgnoreCase(ElkClientConstants.INFO)) {
246 result = ElkServiceUtils.executeScript(action, "NA");
248 logger.debug("result INFO: {} ", result);
249 logger.debug("resultList.size(): {}", resultList.size());
250 resultList.add(result.trim());
251 } catch (Exception ex) {
252 logger.debug("Exception:", ex);
253 throw new Exception("Error occured elk archive operation");
257 for (String repoName : archiveRequest.getRepositoryName()) {
258 result = ElkServiceUtils.executeScript(action, repoName);
259 logger.debug("result : {}", result);
260 resultList.add(result.trim());
261 logger.debug("resultList.size(): {}", resultList.size());
265 } catch (Exception ex) {
266 logger.debug("Exception:", ex);
267 throw new Exception("Error occured elk archive operation");
270 boolean chkCSV = result.contains(",");
271 logger.debug("chkCSV:{}", chkCSV);
272 ElkArchiveResponse elkArchiveResponse = new ElkArchiveResponse();
273 List<ArchiveInfo> archiveInfoList = new ArrayList<ArchiveInfo>();
275 for (String resultOuput : resultList) {
276 archiveInfoArray = resultOuput.split("\n");
277 for (String archiveInfo : archiveInfoArray) {
278 archiveInfoList.add(f.apply(archiveInfo));
281 logger.debug("archiveInfoList.size(): {}", archiveInfoList.size());
282 elkArchiveResponse.setMsg("Action:" + action + " done");
283 elkArchiveResponse.setStatus(ElkClientConstants.SUCCESS);
284 elkArchiveResponse.setArchiveInfo(archiveInfoList);
285 logger.debug("archiveInfoList:{}" + archiveInfoList);
286 if (action.equalsIgnoreCase("RESTORE")) {
287 for (ArchiveInfo archiveInfo : archiveInfoList) {
288 ElkRepositoriesRequest elkCreateRepositoriesRequest = new ElkRepositoriesRequest();
289 elkCreateRepositoriesRequest.setRepositoryName(archiveInfo.getRepositoryName());
290 logger.debug("archiveInfo.getRepositoryName():{}" + archiveInfo.getRepositoryName());
291 elkCreateRepositoriesRequest.setNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
292 logger.debug("elkCreateRepositoriesRequest:{}" + elkCreateRepositoriesRequest);
293 createElkRepository(elkCreateRepositoriesRequest);
298 logger.debug("result:" + result);
299 if (result.contains("\n")) {
300 result = result.replace("\n", "");
301 logger.debug("result :{}" + result);
303 elkArchiveResponse.setStatus(ElkClientConstants.FAIL);
304 elkArchiveResponse.setMsg(result.trim());
305 logger.debug("elkArchiveResponse:{}" + elkArchiveResponse);
307 return elkArchiveResponse;
310 private List<ELkRepositoryMetaData> getAllRepository() {
311 RestHighLevelClient client = restHighLevelClientConnection();
312 GetRepositoriesRequest request1 = new GetRepositoriesRequest();
313 request1.repositories();
314 request1.local(true);
317 GetRepositoriesResponse response = client.snapshot().getRepository(request1, RequestOptions.DEFAULT);
318 List<RepositoryMetaData> repositoryMetaDataResponse = response.repositories();
319 logger.debug("Number of repositoryMetaDataResponse size {}", repositoryMetaDataResponse.size());
321 List<ELkRepositoryMetaData> repositories = new ArrayList<>();
322 for (RepositoryMetaData snapshotStatusResponse : repositoryMetaDataResponse) {
323 logger.debug("\nNAME: {} \n TYPE: {} \n SETTINGS: {}", snapshotStatusResponse.name(),
324 snapshotStatusResponse.type(), snapshotStatusResponse.settings());
325 ELkRepositoryMetaData repositoryMetaData = new ELkRepositoryMetaData();
326 repositoryMetaData.setName(snapshotStatusResponse.name());
327 repositoryMetaData.setType(snapshotStatusResponse.type());
328 JSONObject settings = new JSONObject();
329 for (String key : snapshotStatusResponse.settings().keySet()) {
330 settings.put(key, snapshotStatusResponse.settings().get(key));
332 repositoryMetaData.setSettings(settings);
333 repositories.add(repositoryMetaData);
337 } catch (Exception ex) {
338 logger.debug("Exception:", ex);
339 throw new ErrorTransport("Unable to connect Elasticserach");
343 private boolean createRepo(ElkRepositoriesRequest elkCreateRepositoriesRequest, String repoType) {
344 RestHighLevelClient client = restHighLevelClientConnection();
345 PutRepositoryRequest request = new PutRepositoryRequest();
346 String locationKey = FsRepository.LOCATION_SETTING.getKey();
347 String locationValue = elkCreateRepositoriesRequest.getRepositoryName().trim();
348 String compressKey = FsRepository.COMPRESS_SETTING.getKey();
349 logger.debug("locationKey:{}" + locationKey);
350 logger.debug("locationValue:{}" + locationValue);
351 logger.debug("compressKey:{}" + compressKey);
352 boolean compressValue = true;
353 if (repoType == ElkClientConstants.ARCHIVE_ES_DATA) {
355 ElkClientConstants.ARCHIVE_ES_DATA + "-" + elkCreateRepositoriesRequest.getRepositoryName().trim());
356 locationValue = ElkClientConstants.ARCHIVE_ES_DATA + "/"
357 + elkCreateRepositoriesRequest.getRepositoryName().trim();
359 request.name(elkCreateRepositoriesRequest.getRepositoryName().trim());
361 Settings settings = Settings.builder().put(locationKey, locationValue).put(compressKey, compressValue).build();
362 logger.debug("settings.size():{}" + settings.size());
363 logger.debug("settings.toString():{}" + settings.toString());
364 request.settings(settings);
365 request.type(FsRepository.TYPE);
366 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getNodeTimeout())) {
367 request.masterNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
369 logger.debug("elkCreateRepositoriesRequest.getNodeTimeout():{}" + elkCreateRepositoriesRequest.getNodeTimeout());
370 request.masterNodeTimeout(elkCreateRepositoriesRequest.getNodeTimeout());
372 request.verify(true);
373 AcknowledgedResponse acknowledgedResponse;
374 boolean acknowledged = false;
376 acknowledgedResponse = client.snapshot().createRepository(request, RequestOptions.DEFAULT);
377 acknowledged = acknowledgedResponse.isAcknowledged();
378 logger.debug("acknowledged:{}" + acknowledged);
380 } catch (IOException ex) {
381 logger.debug("Exception:", ex);
382 throw new ErrorTransport("Unable to connect Elasticserach");