2 * ===============LICENSE_START=======================================================
4 * ===================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
6 * ===================================================================================
7 * This Acumos software file is distributed by AT&T and Tech Mahindra
8 * under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * This file is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ===============LICENSE_END=========================================================
20 package org.acumos.elk.client.service;
22 import java.io.IOException;
23 import java.lang.invoke.MethodHandles;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.function.BiPredicate;
29 import java.util.function.Function;
30 import java.util.function.Predicate;
32 import org.acumos.elk.client.transport.ArchiveInfo;
33 import org.acumos.elk.client.transport.ELkRepositoryMetaData;
34 import org.acumos.elk.client.transport.ElasticsearchSnapshotsResponse;
35 import org.acumos.elk.client.transport.ElkArchiveRequest;
36 import org.acumos.elk.client.transport.ElkArchiveResponse;
37 import org.acumos.elk.client.transport.ElkGetRepositoriesResponse;
38 import org.acumos.elk.client.transport.ElkGetSnapshotMetaData;
39 import org.acumos.elk.client.transport.ElkRepositoriesRequest;
40 import org.acumos.elk.client.transport.ElkSnapshotsResponse;
41 import org.acumos.elk.client.transport.ErrorTransport;
42 import org.acumos.elk.client.utils.ElkClientConstants;
43 import org.acumos.elk.client.utils.ElkServiceUtils;
44 import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
45 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
46 import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
47 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
48 import org.elasticsearch.action.support.master.AcknowledgedResponse;
49 import org.elasticsearch.client.RequestOptions;
50 import org.elasticsearch.client.RestHighLevelClient;
51 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
52 import org.elasticsearch.common.settings.Settings;
53 import org.elasticsearch.repositories.fs.FsRepository;
54 import org.json.simple.JSONObject;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.stereotype.Service;
59 import org.springframework.util.StringUtils;
61 import com.acumos.elk.exception.ELKException;
64 * Implementation of operation related to elastic stack repository.
68 public class SnapshotRepositoryServiceImpl extends AbstractELKClientConnection implements ISnapshotRepositoryService {
70 private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
71 private Predicate<ELkRepositoryMetaData> chkRepositoryName = obj -> (obj.getName() != null
72 && obj.getName().startsWith(ElkClientConstants.ARCHIVE_ES_DATA, 0));
75 SnapshotServiceImpl snapshotServiceImpl;
78 public ElkGetRepositoriesResponse getAllElkRepository() {
79 logger.debug("Inside getAllElkRepository ");
80 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
81 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> getArchiveRepository = list -> {
82 List<ELkRepositoryMetaData> repositories = new ArrayList<>();
83 for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
84 if (!chkRepositoryName.test(eLkRepositoryMetaData)) {
85 repositories.add(eLkRepositoryMetaData);
90 List<ELkRepositoryMetaData> repositories = getArchiveRepository.apply(elkRepositoryMetaDataList);
91 logger.debug("getArchiveRepository count: {} ", repositories.size());
92 ElkGetRepositoriesResponse elkRepositoriesResponse = new ElkGetRepositoriesResponse();
93 elkRepositoriesResponse.setRepositories(repositories);
94 return elkRepositoriesResponse;
97 public String createElkRepository(ElkRepositoriesRequest elkCreateRepositoriesRequest) throws Exception {
98 logger.debug("Inside createElkRepository ");
99 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getRepositoryName())) {
100 return "false | RepositoryName empty is not allowed";
102 ElkGetRepositoriesResponse response = getAllElkRepository();
103 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = response.getRepositories();
104 Set<String> repositoryNameSet = new HashSet<>();
105 for (ELkRepositoryMetaData eLkRepositoryMetaData : elkRepositoryMetaDataList) {
106 repositoryNameSet.add(eLkRepositoryMetaData.getName());
108 boolean acknowledged = false;
109 if (repositoryNameSet.contains(elkCreateRepositoriesRequest.getRepositoryName())) {
110 throw new ELKException("false | RepositoryName already exist");
112 acknowledged = createRepo(elkCreateRepositoriesRequest, "backup");
113 createRepo(elkCreateRepositoriesRequest, ElkClientConstants.ARCHIVE_ES_DATA);
115 logger.debug("Repository is created ", acknowledged);
117 return String.valueOf(acknowledged);
121 public String deleteElkRepository(ElkRepositoriesRequest elkDeleteRepositoriesRequest) {
122 logger.debug("Inside deleteElkRepository");
123 RestHighLevelClient client = restHighLevelClientConnection();
124 DeleteRepositoryRequest deleteRepositoryRequest = new DeleteRepositoryRequest(
125 elkDeleteRepositoriesRequest.getRepositoryName());
126 deleteRepositoryRequest.masterNodeTimeout(elkDeleteRepositoriesRequest.getNodeTimeout());
128 AcknowledgedResponse deleteAcknowledgedResponse;
130 deleteAcknowledgedResponse = client.snapshot().deleteRepository(deleteRepositoryRequest,
131 RequestOptions.DEFAULT);
133 } catch (IOException ex) {
134 logger.debug("Exception:", ex);
135 throw new ErrorTransport("Unable to connect Elasticserach");
137 boolean deleteAcknowledged = deleteAcknowledgedResponse.isAcknowledged();
138 logger.debug("Repository is delete(true for created)... {}", deleteAcknowledged);
140 return String.valueOf(deleteAcknowledged);
144 public ElkArchiveResponse getArchiveElkRepository() throws Exception {
145 logger.debug("Inside getArchiveElkRepository");
146 String action = ElkClientConstants.INFO;
147 Predicate<ElasticsearchSnapshotsResponse> chkSnapshots = obj -> (obj.getSnapshots() != null
148 && obj.getSnapshots().size() > 0);
149 Function<List<ELkRepositoryMetaData>, List<ELkRepositoryMetaData>> filterArchiveRepository = list -> {
150 List<ELkRepositoryMetaData> repositories = new ArrayList<>();
151 for (ELkRepositoryMetaData eLkRepositoryMetaData : list) {
152 if (chkRepositoryName.test(eLkRepositoryMetaData)) {
153 repositories.add(eLkRepositoryMetaData);
159 Function<List<ArchiveInfo>, List<ArchiveInfo>> removeEmptyArchiveRepository = list -> {
160 List<ArchiveInfo> archiveInfoList = new ArrayList<>();
161 for (ArchiveInfo archiveInfo : list) {
162 if (archiveInfo.getSnapshots() != null) {
163 archiveInfoList.add(archiveInfo);
166 return archiveInfoList;
169 Function<ELkRepositoryMetaData, String> repoName = obj -> {
170 JSONObject settingObj = obj.getSettings();
171 String location = (String) settingObj.get("location");
172 String[] p = location.split("/");
176 BiPredicate<String, String> compareRepoName = (str1, str2) -> {
177 if (str1 != null && str2 != null) {
178 return str1.equalsIgnoreCase(str2);
183 List<ELkRepositoryMetaData> elkRepositoryMetaDataList = getAllRepository();
184 List<ELkRepositoryMetaData> archiveRepositories = filterArchiveRepository.apply(elkRepositoryMetaDataList);
185 logger.debug("repositories count: {} ", archiveRepositories.size());
186 ElkArchiveResponse elkArchiveResponse = archiveOperation(null, action);
187 logger.debug("elkArchiveResponse.getMsg() {} elkArchiveResponse.getStatus() {}", elkArchiveResponse.getMsg(),
188 elkArchiveResponse.getStatus());
189 if (elkArchiveResponse != null && elkArchiveResponse.getArchiveInfo() != null) {
190 logger.debug("elkArchiveResponse.getArchiveInfo() count: {} ", elkArchiveResponse.getArchiveInfo().size());
191 for (ArchiveInfo archiveInfo : elkArchiveResponse.getArchiveInfo()) {
192 for (ELkRepositoryMetaData eLkRepositoryMetaData : archiveRepositories) {
193 ElasticsearchSnapshotsResponse elasticsearchSnapshotsResponse = snapshotServiceImpl
194 .getElasticsearchSnapshotDetails(null, eLkRepositoryMetaData.getName());
195 String repoArchiveName = repoName.apply(eLkRepositoryMetaData);
197 "repoArchiveName: {} archiveInfo.getRepositoryName(): {} elasticsearchSnapshotsResponse.getRepositoryName(): {} ",
198 repoArchiveName, archiveInfo.getRepositoryName(),
199 elasticsearchSnapshotsResponse.getRepositoryName());
200 if (chkSnapshots.test(elasticsearchSnapshotsResponse)) {
201 if (compareRepoName.test(archiveInfo.getRepositoryName(), repoArchiveName)) {
202 archiveInfo.setSnapshots(elasticsearchSnapshotsResponse.getSnapshots());
207 List<ArchiveInfo> filteredArchiveInfoList = removeEmptyArchiveRepository
208 .apply(elkArchiveResponse.getArchiveInfo());
209 elkArchiveResponse.setArchiveInfo(filteredArchiveInfoList);
211 return elkArchiveResponse;
215 public ElkArchiveResponse archiveElkRepository(ElkArchiveRequest archiveRequest) throws Exception {
216 logger.debug("Inside archiveElkRepository");
217 String action = archiveRequest.getAction();
218 ElkArchiveResponse elkArchiveResponse = archiveOperation(archiveRequest, action);
219 return elkArchiveResponse;
222 private ElkArchiveResponse archiveOperation(ElkArchiveRequest archiveRequest, String action) throws Exception {
223 logger.debug("Inside archiveOperation action:{}", action);
224 Function<String, ArchiveInfo> f = str -> {
225 String[] p = str.split(",");
226 ArchiveInfo archiveInfo1 = new ArchiveInfo();
227 if (p[0] != null && p[0].length() > 0 && p[1] != null && p[1].length() > 0) {
228 archiveInfo1 = new ArchiveInfo(p[0], p[1]);
233 String result = null;
234 String[] archiveInfoArray;
235 List<String> resultList = new ArrayList<>();
236 if (action.equalsIgnoreCase(ElkClientConstants.INFO)) {
238 result = ElkServiceUtils.executeScript(action, "NA");
239 resultList.add(result.trim());
240 } catch (Exception ex) {
241 logger.debug("Exception:", ex);
242 throw new Exception("Error occured elk archive operation");
246 for (String repoName : archiveRequest.getRepositoryName()) {
247 result = ElkServiceUtils.executeScript(action, repoName);
248 resultList.add(result.trim());
251 } catch (Exception ex) {
252 logger.debug("Exception:", ex);
253 throw new Exception("Error occured elk archive operation");
256 boolean chkCSV = result.contains(",");
257 logger.debug("chkCSV:{}", chkCSV);
258 ElkArchiveResponse elkArchiveResponse = new ElkArchiveResponse();
259 List<ArchiveInfo> archiveInfoList = new ArrayList<ArchiveInfo>();
261 for (String resultOuput : resultList) {
262 archiveInfoArray = resultOuput.split("\n");
263 for (String archiveInfo : archiveInfoArray) {
264 archiveInfoList.add(f.apply(archiveInfo));
267 elkArchiveResponse.setMsg("Action:" + action + " done");
268 elkArchiveResponse.setStatus(ElkClientConstants.SUCCESS);
269 elkArchiveResponse.setArchiveInfo(archiveInfoList);
270 logger.debug("archiveInfoList:" + archiveInfoList);
271 if (action.equalsIgnoreCase("RESTORE")) {
272 for (ArchiveInfo archiveInfo : archiveInfoList) {
273 ElkRepositoriesRequest elkCreateRepositoriesRequest = new ElkRepositoriesRequest();
274 elkCreateRepositoriesRequest.setRepositoryName(archiveInfo.getRepositoryName());
275 elkCreateRepositoriesRequest.setNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
276 createElkRepository(elkCreateRepositoriesRequest);
281 logger.debug("result:" + result);
282 if (result.contains("\n")) {
283 result = result.replace("\n", "");
285 elkArchiveResponse.setStatus(ElkClientConstants.FAIL);
286 elkArchiveResponse.setMsg(result.trim());
288 return elkArchiveResponse;
291 private List<ELkRepositoryMetaData> getAllRepository() {
292 RestHighLevelClient client = restHighLevelClientConnection();
293 GetRepositoriesRequest request1 = new GetRepositoriesRequest();
294 request1.repositories();
295 request1.local(true);
298 GetRepositoriesResponse response = client.snapshot().getRepository(request1, RequestOptions.DEFAULT);
299 List<RepositoryMetaData> repositoryMetaDataResponse = response.repositories();
300 logger.debug("Number of repositoryMetaDataResponse size {}", repositoryMetaDataResponse.size());
302 List<ELkRepositoryMetaData> repositories = new ArrayList<>();
303 for (RepositoryMetaData snapshotStatusResponse : repositoryMetaDataResponse) {
304 logger.debug("\nNAME: {} \n TYPE: {} \n SETTINGS: {}", snapshotStatusResponse.name(),
305 snapshotStatusResponse.type(), snapshotStatusResponse.settings());
306 ELkRepositoryMetaData repositoryMetaData = new ELkRepositoryMetaData();
307 repositoryMetaData.setName(snapshotStatusResponse.name());
308 repositoryMetaData.setType(snapshotStatusResponse.type());
309 JSONObject settings = new JSONObject();
310 for (String key : snapshotStatusResponse.settings().keySet()) {
311 settings.put(key, snapshotStatusResponse.settings().get(key));
313 repositoryMetaData.setSettings(settings);
314 repositories.add(repositoryMetaData);
318 } catch (Exception ex) {
319 logger.debug("Exception:", ex);
320 throw new ErrorTransport("Unable to connect Elasticserach");
324 private boolean createRepo(ElkRepositoriesRequest elkCreateRepositoriesRequest, String repoType) {
325 RestHighLevelClient client = restHighLevelClientConnection();
326 PutRepositoryRequest request = new PutRepositoryRequest();
327 String locationKey = FsRepository.LOCATION_SETTING.getKey();
328 String locationValue = elkCreateRepositoriesRequest.getRepositoryName().trim();
329 String compressKey = FsRepository.COMPRESS_SETTING.getKey();
330 boolean compressValue = true;
331 if (repoType == ElkClientConstants.ARCHIVE_ES_DATA) {
333 ElkClientConstants.ARCHIVE_ES_DATA + "-" + elkCreateRepositoriesRequest.getRepositoryName().trim());
334 locationValue = ElkClientConstants.ARCHIVE_ES_DATA + "/"
335 + elkCreateRepositoriesRequest.getRepositoryName().trim();
337 request.name(elkCreateRepositoriesRequest.getRepositoryName().trim());
339 Settings settings = Settings.builder().put(locationKey, locationValue).put(compressKey, compressValue).build();
340 request.settings(settings);
341 request.type(FsRepository.TYPE);
342 if (StringUtils.isEmpty(elkCreateRepositoriesRequest.getNodeTimeout())) {
343 request.masterNodeTimeout(ElkClientConstants.TIME_ONE_MINT_OUT);
345 request.masterNodeTimeout(elkCreateRepositoriesRequest.getNodeTimeout());
347 request.verify(true);
348 AcknowledgedResponse acknowledgedResponse;
349 boolean acknowledged = false;
351 acknowledgedResponse = client.snapshot().createRepository(request, RequestOptions.DEFAULT);
352 acknowledged = acknowledgedResponse.isAcknowledged();
354 } catch (IOException ex) {
355 logger.debug("Exception:", ex);
356 throw new ErrorTransport("Unable to connect Elasticserach");