From 2ad4da94e23981bc1db08d584ee93039ef5c50c6 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Mon, 29 Apr 2024 07:05:37 -0500 Subject: [PATCH 1/4] Cleaned up DocumentReindexer error handling and logging Signed-off-by: Chris Helma --- .../java/com/rfs/ReindexFromSnapshot.java | 11 +++-- .../com/rfs/common/DocumentReindexer.java | 41 +++++++------------ .../com/rfs/common/LuceneDocumentsReader.java | 4 +- 3 files changed, 25 insertions(+), 31 deletions(-) diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index e1a4717a7..797866083 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -358,12 +358,17 @@ public static void main(String[] args) throws InterruptedException { Flux documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId); String targetIndex = indexMetadata.getName() + indexSuffix; - DocumentReindexer.reindex(targetIndex, documents, targetConnection); - logger.info("Shard reindexing completed"); + int targetShardId = shardId; // Define in local context for the lambda + DocumentReindexer.reindex(targetIndex, documents, targetConnection) + .doOnError(error -> logger.error("Error during reindexing: " + error)) + .doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + targetShardId)) + // Wait for the shard reindexing to complete before proceeding; fine in this demo script, but + // shouldn't be done quite this way in the real RFS Worker. + .block(); } } - logger.info("Refreshing newly added documents"); + logger.info("Refreshing target cluster to reflect newly added documents"); DocumentReindexer.refreshAllDocuments(targetConnection); logger.info("Refresh complete"); } diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index 6c84488e7..cb7380e6f 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -18,7 +18,7 @@ public class DocumentReindexer { private static final Logger logger = LogManager.getLogger(DocumentReindexer.class); private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen - public static void reindex(String indexName, Flux documentStream, ConnectionDetails targetConnection) throws Exception { + public static Mono reindex(String indexName, Flux documentStream, ConnectionDetails targetConnection) throws Exception { String targetUrl = "/" + indexName + "/_bulk"; HttpClient client = HttpClient.create() .host(targetConnection.hostName) @@ -32,16 +32,19 @@ public static void reindex(String indexName, Flux documentStream, Conn } }); - documentStream + return documentStream .map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation .buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size + .doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request")) .map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts - .flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request + .flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson) // Send the request + .doOnSuccess(unused -> logger.debug("Batch succeeded")) + .doOnError(error -> logger.error("Batch failed", error)) + .onErrorResume(e -> Mono.empty()) // Prevent the error from stopping the entire stream + ) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) - .subscribe( - response -> logger.info("Batch uploaded successfully"), - error -> logger.error("Failed to upload batch", error) - ); + .doOnComplete(() -> logger.debug("All batches processed")) + .then(); } private static String convertDocumentToBulkSection(Document document) { @@ -53,7 +56,6 @@ private static String convertDocumentToBulkSection(Document document) { } private static String convertToBulkRequestBody(List bulkSections) { - logger.info(bulkSections.size() + " documents in current bulk request"); StringBuilder builder = new StringBuilder(); for (String section : bulkSections) { builder.append(section).append("\n"); @@ -65,26 +67,13 @@ private static Mono sendBulkRequest(HttpClient client, String url, String return client.post() .uri(url) .send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes()))) - .responseSingle((res, content) -> - content.asString() // Convert the response content to a string - .map(body -> new BulkResponseDetails(res.status().code(), body)) // Map both status code and body into a response details object - ) + .responseSingle((res, content) -> content.asString().map(body -> new BulkResponseDetails(res.status().code(), body))) .flatMap(responseDetails -> { - // Something bad happened with our request, log it - if (responseDetails.hasBadStatusCode()) { - logger.error(responseDetails.getFailureMessage()); + if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) { + return Mono.error(new RuntimeException(responseDetails.getFailureMessage())); } - // Some of the bulk operations failed, log it - else if (responseDetails.hasFailedOperations()) { - logger.error(responseDetails.getFailureMessage()); - } - return Mono.just(responseDetails); - }) - .doOnError(err -> { - // We weren't even able to complete the request, log it - logger.error("Bulk request failed", err); - }) - .then(); + return Mono.empty(); + }); } public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { diff --git a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java index d8698c2e2..f4571090e 100644 --- a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java @@ -35,7 +35,7 @@ public Flux readDocuments(Path luceneFilesBasePath, String indexName, try { reader.close(); } catch (IOException e) { - logger.error("Failed to close IndexReader", e); + throw new RuntimeException("Failed to close IndexReader", e); } } ); @@ -61,7 +61,7 @@ protected Document getDocument(IndexReader reader, int docId) { } if (source_bytes == null || source_bytes.bytes.length == 0) { logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled"); - return null; // Skip deleted documents or those without the _source field + return null; // Skip these too } logger.debug("Document " + id + " read successfully"); From 4552e66b38f2c64e995ffeeeb975eac12e49102c Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Tue, 30 Apr 2024 12:34:15 -0500 Subject: [PATCH 2/4] Made reactor-netty the sole HTTP client provider Signed-off-by: Chris Helma --- .../com/rfs/common/DocumentReindexer.java | 64 +------ .../main/java/com/rfs/common/RestClient.java | 180 ++++++++---------- .../java/com/rfs/common/SnapshotCreator.java | 2 +- 3 files changed, 87 insertions(+), 159 deletions(-) diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index cb7380e6f..11d10fb62 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -1,16 +1,13 @@ package com.rfs.common; import java.time.Duration; -import java.util.Base64; import java.util.List; -import io.netty.buffer.Unpooled; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.document.Document; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.netty.http.client.HttpClient; import reactor.util.retry.Retry; @@ -18,26 +15,17 @@ public class DocumentReindexer { private static final Logger logger = LogManager.getLogger(DocumentReindexer.class); private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen - public static Mono reindex(String indexName, Flux documentStream, ConnectionDetails targetConnection) throws Exception { - String targetUrl = "/" + indexName + "/_bulk"; - HttpClient client = HttpClient.create() - .host(targetConnection.hostName) - .port(targetConnection.port) - .headers(h -> { - h.set("Content-Type", "application/json"); - if (targetConnection.authType == ConnectionDetails.AuthType.BASIC) { - String credentials = targetConnection.username + ":" + targetConnection.password; - String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); - h.set("Authorization", "Basic " + encodedCredentials); - } - }); + public static Mono reindex(String indexName, Flux documentStream, ConnectionDetails targetConnection) throws Exception { + RestClient client = new RestClient(targetConnection); + + String targetPath = indexName + "/_bulk"; return documentStream .map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation .buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size .doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request")) .map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts - .flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson) // Send the request + .flatMap(bulkJson -> client.postBulkAsync(targetPath, bulkJson) // Send the request .doOnSuccess(unused -> logger.debug("Batch succeeded")) .doOnError(error -> logger.error("Batch failed", error)) .onErrorResume(e -> Mono.empty()) // Prevent the error from stopping the entire stream @@ -63,51 +51,9 @@ private static String convertToBulkRequestBody(List bulkSections) { return builder.toString(); } - private static Mono sendBulkRequest(HttpClient client, String url, String bulkJson) { - return client.post() - .uri(url) - .send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes()))) - .responseSingle((res, content) -> content.asString().map(body -> new BulkResponseDetails(res.status().code(), body))) - .flatMap(responseDetails -> { - if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) { - return Mono.error(new RuntimeException(responseDetails.getFailureMessage())); - } - return Mono.empty(); - }); - } - public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { // Send the request RestClient client = new RestClient(targetConnection); client.get("_refresh", false); } - - static class BulkResponseDetails { - public final int statusCode; - public final String body; - - BulkResponseDetails(int statusCode, String body) { - this.statusCode = statusCode; - this.body = body; - } - - public boolean hasBadStatusCode() { - return !(statusCode == 200 || statusCode == 201); - } - - public boolean hasFailedOperations() { - return body.contains("\"errors\":true"); - } - - public String getFailureMessage() { - String failureMessage; - if (hasBadStatusCode()) { - failureMessage = "Bulk request failed. Status code: " + statusCode + ", Response body: " + body; - } else { - failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; - } - - return failureMessage; - } - } } diff --git a/RFS/src/main/java/com/rfs/common/RestClient.java b/RFS/src/main/java/com/rfs/common/RestClient.java index 55bf36ea4..98c32826a 100644 --- a/RFS/src/main/java/com/rfs/common/RestClient.java +++ b/RFS/src/main/java/com/rfs/common/RestClient.java @@ -1,21 +1,18 @@ package com.rfs.common; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.StandardCharsets; import java.util.Base64; -import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.ByteBufMono; + public class RestClient { private static final Logger logger = LogManager.getLogger(RestClient.class); - public class Response { + public static class Response { public final int code; public final String body; public final String message; @@ -27,109 +24,94 @@ public Response(int responseCode, String responseBody, String responseMessage) { } } + public static class BulkResponse extends Response { + public BulkResponse(int responseCode, String responseBody, String responseMessage) { + super(responseCode, responseBody, responseMessage); + } + + public boolean hasBadStatusCode() { + return !(code == 200 || code == 201); + } + + public boolean hasFailedOperations() { + return body.contains("\"errors\":true"); + } + + public String getFailureMessage() { + String failureMessage; + if (hasBadStatusCode()) { + failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body; + } else { + failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; + } + + return failureMessage; + } + } + public final ConnectionDetails connectionDetails; + private final HttpClient client; public RestClient(ConnectionDetails connectionDetails) { this.connectionDetails = connectionDetails; - } - public Response get(String path, boolean quietLogging) throws Exception { - String urlString = connectionDetails.url + "/" + path; - - URL url = new URL(urlString); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - - // Set the request method - conn.setRequestMethod("GET"); - - // Set the necessary headers - setAuthHeader(conn); - - // Enable input and output streams - conn.setDoOutput(true); - - // Report the results - int responseCode = conn.getResponseCode(); - - String responseBody; - if (responseCode >= HttpURLConnection.HTTP_BAD_REQUEST) { - // Read error stream if present - try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream(), StandardCharsets.UTF_8))) { - responseBody = br.lines().collect(Collectors.joining(System.lineSeparator())); - } - } else { - // Read input stream for successful requests - try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) { - responseBody = br.lines().collect(Collectors.joining(System.lineSeparator())); - } - } + this.client = HttpClient.create() + .baseUrl(connectionDetails.url) + .headers(h -> { + h.add("Content-Type", "application/json"); + if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) { + String credentials = connectionDetails.username + ":" + connectionDetails.password; + String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); + h.add("Authorization", "Basic " + encodedCredentials); + } + }); + } - if (quietLogging || (responseCode == HttpURLConnection.HTTP_CREATED || responseCode == HttpURLConnection.HTTP_OK)) { - logger.debug("Response Code: " + responseCode + ", Response Message: " + conn.getResponseMessage() + ", Response Body: " + responseBody); - } else { - logger.error("Response Code: " + responseCode + ", Response Message: " + conn.getResponseMessage() + ", Response Body: " + responseBody); - } + public Mono getAsync(String path, boolean quietLogging) { + return client.get() + .uri("/" + path) + .responseSingle((response, bytes) -> bytes.asString() + .map(body -> new Response(response.status().code(), body, response.status().reasonPhrase())) + .doOnNext(resp -> logResponse(resp, quietLogging))); + } - conn.disconnect(); + public Response get(String path, boolean quietLogging) { + return getAsync(path, quietLogging).block(); + } - return new Response(responseCode, responseBody, conn.getResponseMessage()); + public Mono postBulkAsync(String path, String body) { + return client.post() + .uri("/" + path) + .send(ByteBufMono.fromString(Mono.just(body))) + .responseSingle((response, bytes) -> bytes.asString() + .map(b -> new BulkResponse(response.status().code(), b, response.status().reasonPhrase())) + .flatMap(responseDetails -> { + if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) { + logger.error(responseDetails.getFailureMessage()); + return Mono.error(new RuntimeException(responseDetails.getFailureMessage())); + } + return Mono.just(responseDetails); + })); } - - public Response put(String path, String body, boolean quietLogging) throws Exception { - String urlString = connectionDetails.url + "/" + path; - - URL url = new URL(urlString); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - - // Set the request method - conn.setRequestMethod("PUT"); - - // Set the necessary headers - conn.setRequestProperty("Content-Type", "application/json"); - setAuthHeader(conn); - - // Enable input and output streams - conn.setDoOutput(true); - - // Write the request body - try(OutputStream os = conn.getOutputStream()) { - byte[] input = body.getBytes("utf-8"); - os.write(input, 0, input.length); - } - - // Report the results - int responseCode = conn.getResponseCode(); - - String responseBody; - if (responseCode >= HttpURLConnection.HTTP_BAD_REQUEST) { - // Read error stream if present - try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream(), StandardCharsets.UTF_8))) { - responseBody = br.lines().collect(Collectors.joining(System.lineSeparator())); - } - } else { - // Read input stream for successful requests - try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) { - responseBody = br.lines().collect(Collectors.joining(System.lineSeparator())); - } - } - if (quietLogging || (responseCode == HttpURLConnection.HTTP_CREATED || responseCode == HttpURLConnection.HTTP_OK)) { - logger.debug("Response Code: " + responseCode + ", Response Message: " + conn.getResponseMessage() + ", Response Body: " + responseBody); - } else { - logger.error("Response Code: " + responseCode + ", Response Message: " + conn.getResponseMessage() + ", Response Body: " + responseBody); - } + public Mono putAsync(String path, String body, boolean quietLogging) { + return client.put() + .uri("/" + path) + .send(ByteBufMono.fromString(Mono.just(body))) + .responseSingle((response, bytes) -> bytes.asString() + .map(b -> new Response(response.status().code(), b, response.status().reasonPhrase())) + .doOnNext(resp -> logResponse(resp, quietLogging))); + } - conn.disconnect(); - - return new Response(responseCode, responseBody, conn.getResponseMessage()); + public Response put(String path, String body, boolean quietLogging) { + return putAsync(path, body, quietLogging).block(); } - private void setAuthHeader(HttpURLConnection conn) { - if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) { - String auth = connectionDetails.username + ":" + connectionDetails.password; - byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8)); - String authHeaderValue = "Basic " + new String(encodedAuth); - conn.setRequestProperty("Authorization", authHeaderValue); + private void logResponse(Response response, boolean quietLogging) { + if (quietLogging || (response.code == 200 || response.code == 201)) { + logger.debug("Response Code: " + response.code + ", Response Message: " + response.message + ", Response Body: " + response.body); + } else { + logger.error("Response Code: " + response.code + ", Response Message: " + response.message + ", Response Body: " + response.body); } } -} +} \ No newline at end of file diff --git a/RFS/src/main/java/com/rfs/common/SnapshotCreator.java b/RFS/src/main/java/com/rfs/common/SnapshotCreator.java index 60907bd22..f999744ba 100644 --- a/RFS/src/main/java/com/rfs/common/SnapshotCreator.java +++ b/RFS/src/main/java/com/rfs/common/SnapshotCreator.java @@ -61,7 +61,7 @@ public void createSnapshot() throws Exception { String bodyString = body.toString(); RestClient.Response response = client.put(targetName, bodyString, false); if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) { - logger.info("Snapshot " + getSnapshotName() + " creation successful"); + logger.info("Snapshot " + getSnapshotName() + " creation initiated"); } else { logger.error("Snapshot " + getSnapshotName() + " creation failed"); throw new SnapshotCreationFailed(getSnapshotName()); From c3e13a18513eb39213228ac738ab8ef8f6ada22c Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Tue, 30 Apr 2024 15:32:47 -0500 Subject: [PATCH 3/4] Created an OpenSearchClient layer above the RestClient Signed-off-by: Chris Helma --- .../java/com/rfs/ReindexFromSnapshot.java | 13 +- .../com/rfs/common/DocumentReindexer.java | 10 +- .../rfs/common/FileSystemSnapshotCreator.java | 8 +- .../java/com/rfs/common/OpenSearchClient.java | 127 ++++++++++++++++++ .../main/java/com/rfs/common/RestClient.java | 36 +---- .../com/rfs/common/S3SnapshotCreator.java | 8 +- .../java/com/rfs/common/SnapshotCreator.java | 30 ++--- .../GlobalMetadataCreator_OS_2_11.java | 53 ++------ .../version_os_2_11/IndexCreator_OS_2_11.java | 20 +-- 9 files changed, 175 insertions(+), 130 deletions(-) create mode 100644 RFS/src/main/java/com/rfs/common/OpenSearchClient.java diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 797866083..ba2120d10 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -182,9 +182,10 @@ public static void main(String[] args) throws InterruptedException { // ========================================================================================================== logger.info("=================================================================="); logger.info("Attempting to create the snapshot..."); + OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection); SnapshotCreator snapshotCreator = repo instanceof S3Repo - ? new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region) - : new FileSystemSnapshotCreator(snapshotName, sourceConnection, snapshotLocalRepoDirPath.toString()); + ? new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region) + : new FileSystemSnapshotCreator(snapshotName, sourceClient, snapshotLocalRepoDirPath.toString()); snapshotCreator.registerRepo(); snapshotCreator.createSnapshot(); while (!snapshotCreator.isSnapshotFinished()) { @@ -266,14 +267,15 @@ public static void main(String[] args) throws InterruptedException { logger.info("=================================================================="); logger.info("Attempting to recreate the Global Metadata..."); + OpenSearchClient targetClient = new OpenSearchClient(targetConnection); if (sourceVersion == ClusterVersion.ES_6_8) { ObjectNode root = globalMetadata.toObjectNode(); ObjectNode transformedRoot = transformer.transformGlobalMetadata(root); - GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, Collections.emptyList(), templateWhitelist); + GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, Collections.emptyList(), templateWhitelist); } else if (sourceVersion == ClusterVersion.ES_7_10) { ObjectNode root = globalMetadata.toObjectNode(); ObjectNode transformedRoot = transformer.transformGlobalMetadata(root); - GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, componentTemplateWhitelist, templateWhitelist); + GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, componentTemplateWhitelist, templateWhitelist); } } @@ -301,6 +303,7 @@ public static void main(String[] args) throws InterruptedException { // ========================================================================================================== logger.info("=================================================================="); logger.info("Attempting to recreate the indices..."); + OpenSearchClient targetClient = new OpenSearchClient(targetConnection); for (IndexMetadata.Data indexMetadata : indexMetadatas) { String reindexName = indexMetadata.getName() + indexSuffix; logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target..."); @@ -308,7 +311,7 @@ public static void main(String[] args) throws InterruptedException { ObjectNode root = indexMetadata.toObjectNode(); ObjectNode transformedRoot = transformer.transformIndexMetadata(root); IndexMetadataData_OS_2_11 indexMetadataOS211 = new IndexMetadataData_OS_2_11(transformedRoot, indexMetadata.getId(), reindexName); - IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetConnection); + IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetClient); } } diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index 11d10fb62..f4c64e293 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -16,16 +16,14 @@ public class DocumentReindexer { private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen public static Mono reindex(String indexName, Flux documentStream, ConnectionDetails targetConnection) throws Exception { - RestClient client = new RestClient(targetConnection); - - String targetPath = indexName + "/_bulk"; + OpenSearchClient client = new OpenSearchClient(targetConnection); return documentStream .map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation .buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size .doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request")) .map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts - .flatMap(bulkJson -> client.postBulkAsync(targetPath, bulkJson) // Send the request + .flatMap(bulkJson -> client.sendBulkRequest(indexName, bulkJson) // Send the request .doOnSuccess(unused -> logger.debug("Batch succeeded")) .doOnError(error -> logger.error("Batch failed", error)) .onErrorResume(e -> Mono.empty()) // Prevent the error from stopping the entire stream @@ -53,7 +51,7 @@ private static String convertToBulkRequestBody(List bulkSections) { public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { // Send the request - RestClient client = new RestClient(targetConnection); - client.get("_refresh", false); + OpenSearchClient client = new OpenSearchClient(targetConnection); + client.refresh(); } } diff --git a/RFS/src/main/java/com/rfs/common/FileSystemSnapshotCreator.java b/RFS/src/main/java/com/rfs/common/FileSystemSnapshotCreator.java index d8a947939..a4f122b47 100644 --- a/RFS/src/main/java/com/rfs/common/FileSystemSnapshotCreator.java +++ b/RFS/src/main/java/com/rfs/common/FileSystemSnapshotCreator.java @@ -9,14 +9,14 @@ public class FileSystemSnapshotCreator extends SnapshotCreator { private static final Logger logger = LogManager.getLogger(FileSystemSnapshotCreator.class); private static final ObjectMapper mapper = new ObjectMapper(); - private final ConnectionDetails connectionDetails; + private final OpenSearchClient client; private final String snapshotName; private final String snapshotRepoDirectoryPath; - public FileSystemSnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String snapshotRepoDirectoryPath) { - super(snapshotName, connectionDetails); + public FileSystemSnapshotCreator(String snapshotName, OpenSearchClient client, String snapshotRepoDirectoryPath) { + super(snapshotName, client); this.snapshotName = snapshotName; - this.connectionDetails = connectionDetails; + this.client = client; this.snapshotRepoDirectoryPath = snapshotRepoDirectoryPath; } diff --git a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java new file mode 100644 index 000000000..a21bd9057 --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java @@ -0,0 +1,127 @@ +package com.rfs.common; + +import java.net.HttpURLConnection; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +import reactor.core.publisher.Mono; + +public class OpenSearchClient { + private static final Logger logger = LogManager.getLogger(OpenSearchClient.class); + + public static class BulkResponse extends RestClient.Response { + public BulkResponse(int responseCode, String responseBody, String responseMessage) { + super(responseCode, responseBody, responseMessage); + } + + public boolean hasBadStatusCode() { + return !(code == HttpURLConnection.HTTP_OK || code == HttpURLConnection.HTTP_CREATED); + } + + public boolean hasFailedOperations() { + return body.contains("\"errors\":true"); + } + + public String getFailureMessage() { + String failureMessage; + if (hasBadStatusCode()) { + failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body; + } else { + failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; + } + + return failureMessage; + } + } + + public final ConnectionDetails connectionDetails; + private final RestClient client; + + public OpenSearchClient(ConnectionDetails connectionDetails) { + this.connectionDetails = connectionDetails; + this.client = new RestClient(connectionDetails); + } + + /* + * Create a legacy template if it does not already exist; return true if created, false otherwise. + */ + public boolean createLegacyTemplateIdempotent(String templateName, ObjectNode settings){ + String targetPath = "_template/" + templateName; + return createObjectIdempotent(targetPath, settings); + } + + /* + * Create a component template if it does not already exist; return true if created, false otherwise. + */ + public boolean createComponentTemplateIdempotent(String templateName, ObjectNode settings){ + String targetPath = "_component_template/" + templateName; + return createObjectIdempotent(targetPath, settings); + } + + /* + * Create an index template if it does not already exist; return true if created, false otherwise. + */ + public boolean createIndexTemplateIdempotent(String templateName, ObjectNode settings){ + String targetPath = "_index_template/" + templateName; + return createObjectIdempotent(targetPath, settings); + } + + /* + * Create an index if it does not already exist; return true if created, false otherwise. + */ + public boolean createIndexIdempotent(String indexName, ObjectNode settings){ + String targetPath = indexName; + return createObjectIdempotent(targetPath, settings); + } + + private boolean createObjectIdempotent(String objectPath, ObjectNode settings){ + RestClient.Response response = client.get(objectPath, true); + if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { + client.put(objectPath, settings.toString(), false); + return true; + } else if (response.code == HttpURLConnection.HTTP_OK) { + logger.warn(objectPath + " already exists. Skipping creation."); + } else { + logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation."); + } + return false; + } + + public RestClient.Response registerSnapshotRepo(String repoName, ObjectNode settings){ + String targetPath = "_snapshot/" + repoName; + return client.put(targetPath, settings.toString(), false); + } + + public RestClient.Response createSnapshot(String repoName, String snapshotName, ObjectNode settings){ + String targetPath = "_snapshot/" + repoName + "/" + snapshotName; + return client.put(targetPath, settings.toString(), false); + } + + public RestClient.Response getSnapshotStatus(String repoName, String snapshotName){ + String targetPath = "_snapshot/" + repoName + "/" + snapshotName; + return client.get(targetPath, false); + } + + public Mono sendBulkRequest(String indexName, String body) { + String targetPath = indexName + "/_bulk"; + + return client.postAsync(targetPath, body) + .map(response -> new BulkResponse(response.code, response.body, response.message)) + .flatMap(responseDetails -> { + if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) { + logger.error(responseDetails.getFailureMessage()); + return Mono.error(new RuntimeException(responseDetails.getFailureMessage())); + } + return Mono.just(responseDetails); + }); + } + + public RestClient.Response refresh() { + String targetPath = "_refresh"; + + return client.get(targetPath, false); + } +} diff --git a/RFS/src/main/java/com/rfs/common/RestClient.java b/RFS/src/main/java/com/rfs/common/RestClient.java index 98c32826a..17fc54860 100644 --- a/RFS/src/main/java/com/rfs/common/RestClient.java +++ b/RFS/src/main/java/com/rfs/common/RestClient.java @@ -24,31 +24,6 @@ public Response(int responseCode, String responseBody, String responseMessage) { } } - public static class BulkResponse extends Response { - public BulkResponse(int responseCode, String responseBody, String responseMessage) { - super(responseCode, responseBody, responseMessage); - } - - public boolean hasBadStatusCode() { - return !(code == 200 || code == 201); - } - - public boolean hasFailedOperations() { - return body.contains("\"errors\":true"); - } - - public String getFailureMessage() { - String failureMessage; - if (hasBadStatusCode()) { - failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body; - } else { - failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; - } - - return failureMessage; - } - } - public final ConnectionDetails connectionDetails; private final HttpClient client; @@ -79,19 +54,12 @@ public Response get(String path, boolean quietLogging) { return getAsync(path, quietLogging).block(); } - public Mono postBulkAsync(String path, String body) { + public Mono postAsync(String path, String body) { return client.post() .uri("/" + path) .send(ByteBufMono.fromString(Mono.just(body))) .responseSingle((response, bytes) -> bytes.asString() - .map(b -> new BulkResponse(response.status().code(), b, response.status().reasonPhrase())) - .flatMap(responseDetails -> { - if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) { - logger.error(responseDetails.getFailureMessage()); - return Mono.error(new RuntimeException(responseDetails.getFailureMessage())); - } - return Mono.just(responseDetails); - })); + .map(b -> new Response(response.status().code(), b, response.status().reasonPhrase()))); } public Mono putAsync(String path, String body, boolean quietLogging) { diff --git a/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java b/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java index 150b3430e..7832f05b5 100644 --- a/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java +++ b/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java @@ -10,15 +10,15 @@ public class S3SnapshotCreator extends SnapshotCreator { private static final Logger logger = LogManager.getLogger(S3SnapshotCreator.class); private static final ObjectMapper mapper = new ObjectMapper(); - private final ConnectionDetails connectionDetails; + private final OpenSearchClient client; private final String snapshotName; private final String s3Uri; private final String s3Region; - public S3SnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String s3Uri, String s3Region) { - super(snapshotName, connectionDetails); + public S3SnapshotCreator(String snapshotName, OpenSearchClient client, String s3Uri, String s3Region) { + super(snapshotName, client); this.snapshotName = snapshotName; - this.connectionDetails = connectionDetails; + this.client = client; this.s3Uri = s3Uri; this.s3Region = s3Region; } diff --git a/RFS/src/main/java/com/rfs/common/SnapshotCreator.java b/RFS/src/main/java/com/rfs/common/SnapshotCreator.java index f999744ba..5ec9bbb95 100644 --- a/RFS/src/main/java/com/rfs/common/SnapshotCreator.java +++ b/RFS/src/main/java/com/rfs/common/SnapshotCreator.java @@ -13,13 +13,13 @@ public abstract class SnapshotCreator { private static final Logger logger = LogManager.getLogger(SnapshotCreator.class); private static final ObjectMapper mapper = new ObjectMapper(); - private final ConnectionDetails connectionDetails; + private final OpenSearchClient client; @Getter private final String snapshotName; - public SnapshotCreator(String snapshotName, ConnectionDetails connectionDetails) { + public SnapshotCreator(String snapshotName, OpenSearchClient client) { this.snapshotName = snapshotName; - this.connectionDetails = connectionDetails; + this.client = client; } abstract ObjectNode getRequestBodyForRegisterRepo(); @@ -29,15 +29,10 @@ public String getRepoName() { } public void registerRepo() throws Exception { - // Assemble the REST path - String targetName = "_snapshot/" + getRepoName(); - - ObjectNode body = getRequestBodyForRegisterRepo(); + ObjectNode settings = getRequestBodyForRegisterRepo(); // Register the repo; it's fine if it already exists - RestClient client = new RestClient(connectionDetails); - String bodyString = body.toString(); - RestClient.Response response = client.put(targetName, bodyString, false); + RestClient.Response response = client.registerSnapshotRepo(getRepoName(), settings); if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) { logger.info("Snapshot repo registration successful"); } else { @@ -47,19 +42,14 @@ public void registerRepo() throws Exception { } public void createSnapshot() throws Exception { - // Assemble the REST path - String targetName = "_snapshot/" + getRepoName() + "/" + getSnapshotName(); - - // Assemble the request body + // Assemble the settings ObjectNode body = mapper.createObjectNode(); body.put("indices", "_all"); body.put("ignore_unavailable", true); body.put("include_global_state", true); // Register the repo; idempotent operation - RestClient client = new RestClient(connectionDetails); - String bodyString = body.toString(); - RestClient.Response response = client.put(targetName, bodyString, false); + RestClient.Response response = client.createSnapshot(getRepoName(), getSnapshotName(), body); if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) { logger.info("Snapshot " + getSnapshotName() + " creation initiated"); } else { @@ -69,12 +59,8 @@ public void createSnapshot() throws Exception { } public boolean isSnapshotFinished() throws Exception { - // Assemble the REST path - String targetName = "_snapshot/" + getRepoName() + "/" + getSnapshotName(); - // Check if the snapshot has finished - RestClient client = new RestClient(connectionDetails); - RestClient.Response response = client.get(targetName, false); + RestClient.Response response = client.getSnapshotStatus(getRepoName(), getSnapshotName()); if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { logger.error("Snapshot " + getSnapshotName() + " does not exist"); throw new SnapshotDoesNotExist(getSnapshotName()); diff --git a/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java b/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java index 9216a4f60..4ba48c4d7 100644 --- a/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java +++ b/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java @@ -1,6 +1,5 @@ package com.rfs.version_os_2_11; -import java.net.HttpURLConnection; import java.util.ArrayList; import java.util.List; @@ -8,22 +7,21 @@ import org.apache.logging.log4j.Logger; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.rfs.common.ConnectionDetails; -import com.rfs.common.RestClient; +import com.rfs.common.OpenSearchClient; public class GlobalMetadataCreator_OS_2_11 { private static final Logger logger = LogManager.getLogger(GlobalMetadataCreator_OS_2_11.class); - public static void create(ObjectNode root, ConnectionDetails connectionDetails, List componentTemplateWhitelist, List indexTemplateWhitelist) throws Exception { + public static void create(ObjectNode root, OpenSearchClient client, List componentTemplateWhitelist, List indexTemplateWhitelist) throws Exception { logger.info("Setting Global Metadata"); GlobalMetadataData_OS_2_11 globalMetadata = new GlobalMetadataData_OS_2_11(root); - createTemplates(globalMetadata, connectionDetails, indexTemplateWhitelist); - createComponentTemplates(globalMetadata, connectionDetails, componentTemplateWhitelist); - createIndexTemplates(globalMetadata, connectionDetails, indexTemplateWhitelist); + createTemplates(globalMetadata, client, indexTemplateWhitelist); + createComponentTemplates(globalMetadata, client, componentTemplateWhitelist); + createIndexTemplates(globalMetadata, client, indexTemplateWhitelist); } - public static void createTemplates(GlobalMetadataData_OS_2_11 globalMetadata, ConnectionDetails connectionDetails, List indexTemplateWhitelist) throws Exception { + public static void createTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List indexTemplateWhitelist) throws Exception { logger.info("Setting Legacy Templates"); ObjectNode templates = globalMetadata.getTemplates(); @@ -41,8 +39,7 @@ public static void createTemplates(GlobalMetadataData_OS_2_11 globalMetadata, Co logger.info("Setting Legacy Template: " + templateName); ObjectNode settings = (ObjectNode) globalMetadata.getTemplates().get(templateName); - String path = "_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createLegacyTemplateIdempotent(templateName, settings); } } else { // Get the template names @@ -53,13 +50,12 @@ public static void createTemplates(GlobalMetadataData_OS_2_11 globalMetadata, Co for (String templateName : templateKeys) { logger.info("Setting Legacy Template: " + templateName); ObjectNode settings = (ObjectNode) templates.get(templateName); - String path = "_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createLegacyTemplateIdempotent(templateName, settings); } } } - public static void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMetadata, ConnectionDetails connectionDetails, List indexTemplateWhitelist) throws Exception { + public static void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List indexTemplateWhitelist) throws Exception { logger.info("Setting Component Templates"); ObjectNode templates = globalMetadata.getComponentTemplates(); @@ -77,8 +73,7 @@ public static void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMet logger.info("Setting Component Template: " + templateName); ObjectNode settings = (ObjectNode) templates.get(templateName); - String path = "_component_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createComponentTemplateIdempotent(templateName, settings); } } else { // Get the template names @@ -89,13 +84,12 @@ public static void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMet for (String templateName : templateKeys) { logger.info("Setting Component Template: " + templateName); ObjectNode settings = (ObjectNode) templates.get(templateName); - String path = "_component_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createComponentTemplateIdempotent(templateName, settings); } } } - public static void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadata, ConnectionDetails connectionDetails, List indexTemplateWhitelist) throws Exception { + public static void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List indexTemplateWhitelist) throws Exception { logger.info("Setting Index Templates"); ObjectNode templates = globalMetadata.getIndexTemplates(); @@ -113,8 +107,7 @@ public static void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadat logger.info("Setting Index Template: " + templateName); ObjectNode settings = (ObjectNode) globalMetadata.getIndexTemplates().get(templateName); - String path = "_index_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createIndexTemplateIdempotent(templateName, settings); } } else { // Get the template names @@ -125,26 +118,8 @@ public static void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadat for (String templateName : templateKeys) { logger.info("Setting Index Template: " + templateName); ObjectNode settings = (ObjectNode) templates.get(templateName); - String path = "_index_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createIndexTemplateIdempotent(templateName, settings); } } } - - private static void createEntity(String entityName, ObjectNode settings, ConnectionDetails connectionDetails, String path) throws Exception { - // Assemble the request details - String body = settings.toString(); - - // Confirm the index doesn't already exist, then create it - RestClient client = new RestClient(connectionDetails); - RestClient.Response response = client.get(path, true); - if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { - String bodyString = body.toString(); - client.put(path, bodyString, false); - } else if (response.code == HttpURLConnection.HTTP_OK) { - logger.warn(entityName + " already exists. Skipping creation."); - } else { - logger.warn("Could not confirm that " + entityName + " does not already exist. Skipping creation."); - } - } } diff --git a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java index c3de7b5ef..58661aa66 100644 --- a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java +++ b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java @@ -1,21 +1,18 @@ package com.rfs.version_os_2_11; -import java.net.HttpURLConnection; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.rfs.common.ConnectionDetails; import com.rfs.common.IndexMetadata; -import com.rfs.common.RestClient; +import com.rfs.common.OpenSearchClient; public class IndexCreator_OS_2_11 { private static final Logger logger = LogManager.getLogger(IndexCreator_OS_2_11.class); private static final ObjectMapper mapper = new ObjectMapper(); - public static void create(String targetName, IndexMetadata.Data indexMetadata, ConnectionDetails connectionDetails) throws Exception { + public static void create(String indexName, IndexMetadata.Data indexMetadata, OpenSearchClient client) throws Exception { // Remove some settings which will cause errors if you try to pass them to the API ObjectNode settings = indexMetadata.getSettings(); @@ -30,16 +27,7 @@ public static void create(String targetName, IndexMetadata.Data indexMetadata, C body.set("mappings", indexMetadata.getMappings()); body.set("settings", settings); - // Confirm the index doesn't already exist, then create it - RestClient client = new RestClient(connectionDetails); - RestClient.Response response = client.get(targetName, true); - if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { - String bodyString = body.toString(); - client.put(targetName, bodyString, false); - } else if (response.code == HttpURLConnection.HTTP_OK) { - logger.warn("Index " + targetName + " already exists. Skipping creation."); - } else { - logger.warn("Could not confirm that index " + targetName + " does not already exist. Skipping creation."); - } + // Idempotently create the index + client.createIndexIdempotent(indexName, body); } } From 75691b9dc9e27bc4017472204c1371cf206dc905 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Wed, 1 May 2024 11:47:23 -0500 Subject: [PATCH 4/4] Updates per PR comments Signed-off-by: Chris Helma --- RFS/src/main/java/com/rfs/ReindexFromSnapshot.java | 4 ++-- RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java | 6 +++++- RFS/src/main/java/com/rfs/common/OpenSearchClient.java | 5 ++++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index ba2120d10..cc9d7644f 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -362,10 +362,10 @@ public static void main(String[] args) throws InterruptedException { Flux documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId); String targetIndex = indexMetadata.getName() + indexSuffix; - int targetShardId = shardId; // Define in local context for the lambda + final int finalShardId = shardId; // Define in local context for the lambda DocumentReindexer.reindex(targetIndex, documents, targetConnection) .doOnError(error -> logger.error("Error during reindexing: " + error)) - .doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + targetShardId)) + .doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + finalShardId)) // Wait for the shard reindexing to complete before proceeding; fine in this demo script, but // shouldn't be done quite this way in the real RFS Worker. .block(); diff --git a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java index f4571090e..dcdf3b7ef 100644 --- a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java @@ -10,8 +10,11 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.BytesRef; + +import lombok.Lombok; import reactor.core.publisher.Flux; + public class LuceneDocumentsReader { private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class); @@ -35,7 +38,8 @@ public Flux readDocuments(Path luceneFilesBasePath, String indexName, try { reader.close(); } catch (IOException e) { - throw new RuntimeException("Failed to close IndexReader", e); + logger.error("Failed to close IndexReader", e); + Lombok.sneakyThrow(e); } } ); diff --git a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java index a21bd9057..c5b5644ac 100644 --- a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java +++ b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java @@ -22,6 +22,9 @@ public boolean hasBadStatusCode() { } public boolean hasFailedOperations() { + // The OpenSearch Bulk API response body is JSON and contains a top-level "errors" field that indicates + // whether any of the individual operations in the bulk request failed. Rather than marshalling the entire + // response as JSON, just check for the string value. return body.contains("\"errors\":true"); } @@ -83,7 +86,7 @@ private boolean createObjectIdempotent(String objectPath, ObjectNode settings){ client.put(objectPath, settings.toString(), false); return true; } else if (response.code == HttpURLConnection.HTTP_OK) { - logger.warn(objectPath + " already exists. Skipping creation."); + logger.info(objectPath + " already exists. Skipping creation."); } else { logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation."); }