From 4541617ffa3a881841a6631ca754bff5182066ac Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Tue, 23 Apr 2024 10:31:21 -0500 Subject: [PATCH 1/5] Checkpoint: improved ConnectionDetails; unit tested it Signed-off-by: Chris Helma --- RFS/build.gradle | 5 ++ .../java/com/rfs/ReindexFromSnapshot.java | 2 +- .../com/rfs/common/ConnectionDetails.java | 39 ++++++++++++-- .../main/java/com/rfs/common/RestClient.java | 4 +- .../com/rfs/common/ConnectionDetailsTest.java | 54 +++++++++++++++++++ 5 files changed, 98 insertions(+), 6 deletions(-) create mode 100644 RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java diff --git a/RFS/build.gradle b/RFS/build.gradle index 092ad5897..a13b4b446 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -37,6 +37,11 @@ dependencies { implementation 'org.apache.lucene:lucene-core:8.11.3' implementation 'org.apache.lucene:lucene-analyzers-common:8.11.3' implementation 'org.apache.lucene:lucene-backward-codecs:8.11.3' + + implementation platform('io.projectreactor:reactor-bom:2023.0.5') + implementation 'io.projectreactor.netty:reactor-netty-core' + implementation 'io.projectreactor.netty:reactor-netty-http' +} implementation 'software.amazon.awssdk:s3:2.25.16' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2' diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 1cd897ffc..3eba094a2 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -125,7 +125,7 @@ public static void main(String[] args) throws InterruptedException { Logging.setLevel(logLevel); - ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); + ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); // Sanity checks diff --git a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java index c9c8b1b44..e9159ef92 100644 --- a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java +++ b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java @@ -9,13 +9,21 @@ public static enum AuthType { NONE } - public final String host; + public static enum Protocol { + HTTP, + HTTPS + } + + public final String url; + public final Protocol protocol; + public final String hostName; + public final String port; public final String username; public final String password; public final AuthType authType; - public ConnectionDetails(String host, String username, String password) { - this.host = host; // http://localhost:9200 + public ConnectionDetails(String url, String username, String password) { + this.url = url; // http://localhost:9200 // If the username is provided, the password must be as well, and vice versa if ((username == null && password != null) || (username != null && password == null)) { @@ -28,5 +36,30 @@ public ConnectionDetails(String host, String username, String password) { this.username = username; this.password = password; + + if (url == null) { + hostName = null; + port = null; + protocol = null; + } else { + // Parse the URL to get the protocol, host name, and port + String[] urlParts = url.split("://"); + if (urlParts.length != 2) { + throw new IllegalArgumentException("Invalid URL format"); + } + + hostName = urlParts[1].split(":")[0]; + + String [] portParts = urlParts[1].split(":"); + port = portParts.length == 1 ? null : portParts[1].split("/")[0]; + + if (urlParts[0].equals("http")) { + protocol = Protocol.HTTP; + } else if (urlParts[0].equals("https")) { + protocol = Protocol.HTTPS; + } else { + throw new IllegalArgumentException("Invalid protocol"); + } + } } } diff --git a/RFS/src/main/java/com/rfs/common/RestClient.java b/RFS/src/main/java/com/rfs/common/RestClient.java index 0a4ad1849..55bf36ea4 100644 --- a/RFS/src/main/java/com/rfs/common/RestClient.java +++ b/RFS/src/main/java/com/rfs/common/RestClient.java @@ -34,7 +34,7 @@ public RestClient(ConnectionDetails connectionDetails) { } public Response get(String path, boolean quietLogging) throws Exception { - String urlString = connectionDetails.host + "/" + path; + String urlString = connectionDetails.url + "/" + path; URL url = new URL(urlString); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); @@ -76,7 +76,7 @@ public Response get(String path, boolean quietLogging) throws Exception { } public Response put(String path, String body, boolean quietLogging) throws Exception { - String urlString = connectionDetails.host + "/" + path; + String urlString = connectionDetails.url + "/" + path; URL url = new URL(urlString); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); diff --git a/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java b/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java new file mode 100644 index 000000000..7d6d598bc --- /dev/null +++ b/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java @@ -0,0 +1,54 @@ +package com.rfs.common; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + + +public class ConnectionDetailsTest { + static Stream happyPathArgs() { + return Stream.of( + Arguments.of("https://localhost:9200", "username", "pass", "https://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTPS, "localhost", "9200"), + Arguments.of("http://localhost:9200", "username", "pass", "http://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", "9200"), + Arguments.of("http://localhost:9200", null, null, "http://localhost:9200", null, null, ConnectionDetails.Protocol.HTTP, "localhost", "9200"), + Arguments.of("http://localhost", "username", "pass", "http://localhost", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", null), + Arguments.of("http://localhost:9200/longer/path", "username", "pass", "http://localhost:9200/longer/path", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", "9200"), + Arguments.of(null, "username", "pass", null, "username", "pass", null, null, null) + ); + } + + @ParameterizedTest + @MethodSource("happyPathArgs") + void ConnectionDetails_HappyPath_AsExpected(String url, String username, String password, + String expectedUrl, String expectedUsername, String expectedPassword, + ConnectionDetails.Protocol expectedProtocal, String expectedHostName, String expectedPort) { + ConnectionDetails details = new ConnectionDetails(url, username, password); + assertEquals(expectedUrl, details.url); + assertEquals(expectedUsername, details.username); + assertEquals(expectedPassword, details.password); + assertEquals(expectedProtocal, details.protocol); + assertEquals(expectedHostName, details.hostName); + assertEquals(expectedPort, details.port); + } + + static Stream unhappyPathArgs() { + return Stream.of( + Arguments.of("localhost:9200", "username", "pass", IllegalArgumentException.class), + Arguments.of("http://localhost:9200", "username", null, IllegalArgumentException.class), + Arguments.of("http://localhost:9200", null, "pass", IllegalArgumentException.class), + Arguments.of("ftp://localhost:9200", null, "pass", IllegalArgumentException.class) + ); + } + + @ParameterizedTest + @MethodSource("unhappyPathArgs") + void ConnectionDetails_UnhappyPath_AsExpected(String url, String username, String password, + Class expectedException) { + assertThrows(expectedException, () -> new ConnectionDetails(url, username, password)); + } +} From c690be2f93d6af3e6d0b03756345126bffe648a9 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Tue, 23 Apr 2024 16:33:48 -0500 Subject: [PATCH 2/5] RFS now uses reactor-netty and bulk indexing Signed-off-by: Chris Helma --- RFS/build.gradle | 1 - .../java/com/rfs/ReindexFromSnapshot.java | 16 +-- .../com/rfs/common/ConnectionDetails.java | 8 +- .../com/rfs/common/DocumentReindexer.java | 110 ++++++++++++++++-- .../com/rfs/common/LuceneDocumentsReader.java | 66 +++++++---- .../com/rfs/common/ConnectionDetailsTest.java | 8 +- 6 files changed, 156 insertions(+), 53 deletions(-) diff --git a/RFS/build.gradle b/RFS/build.gradle index a13b4b446..322c868d1 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -41,7 +41,6 @@ dependencies { implementation platform('io.projectreactor:reactor-bom:2023.0.5') implementation 'io.projectreactor.netty:reactor-netty-core' implementation 'io.projectreactor.netty:reactor-netty-http' -} implementation 'software.amazon.awssdk:s3:2.25.16' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2' diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 3eba094a2..4afc22cfa 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -14,6 +14,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import reactor.core.publisher.Flux; import com.rfs.common.*; import com.rfs.transformers.*; @@ -355,18 +356,13 @@ public static void main(String[] args) throws InterruptedException { for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ==="); - List documents = LuceneDocumentsReader.readDocuments(luceneDirPath, indexMetadata.getName(), shardId); - logger.info("Documents read successfully"); - - for (Document document : documents) { - String targetIndex = indexMetadata.getName() + indexSuffix; - DocumentReindexer.reindex(targetIndex, document, targetConnection); - } + Flux documents = LuceneDocumentsReader.readDocuments(luceneDirPath, indexMetadata.getName(), shardId); + String targetIndex = indexMetadata.getName() + indexSuffix; + DocumentReindexer.reindex(targetIndex, documents, targetConnection); + + logger.info("Shard reindexing completed"); } } - - logger.info("Documents reindexed successfully"); - logger.info("Refreshing newly added documents"); DocumentReindexer.refreshAllDocuments(targetConnection); logger.info("Refresh complete"); diff --git a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java index e9159ef92..838d9dbd6 100644 --- a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java +++ b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java @@ -17,7 +17,7 @@ public static enum Protocol { public final String url; public final Protocol protocol; public final String hostName; - public final String port; + public final int port; public final String username; public final String password; public final AuthType authType; @@ -39,7 +39,7 @@ public ConnectionDetails(String url, String username, String password) { if (url == null) { hostName = null; - port = null; + port = -1; protocol = null; } else { // Parse the URL to get the protocol, host name, and port @@ -50,8 +50,8 @@ public ConnectionDetails(String url, String username, String password) { hostName = urlParts[1].split(":")[0]; - String [] portParts = urlParts[1].split(":"); - port = portParts.length == 1 ? null : portParts[1].split("/")[0]; + String[] portParts = urlParts[1].split(":"); + port = portParts.length == 1 ? -1 : Integer.parseInt(portParts[1].split("/")[0]); if (urlParts[0].equals("http")) { protocol = Protocol.HTTP; diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index 4a09a381c..bdd04cb86 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -1,27 +1,90 @@ package com.rfs.common; +import java.time.Duration; +import java.util.Base64; +import java.util.List; + +import io.netty.buffer.Unpooled; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.document.Document; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.util.retry.Retry; public class DocumentReindexer { private static final Logger logger = LogManager.getLogger(DocumentReindexer.class); + private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen + + public static void reindex(String indexName, Flux documentStream, ConnectionDetails targetConnection) throws Exception { + String targetUrl = "/" + indexName + "/_bulk"; + HttpClient client = HttpClient.create() + .host(targetConnection.hostName) + .port(targetConnection.port) + .headers(h -> { + h.set("Content-Type", "application/json"); + if (targetConnection.authType == ConnectionDetails.AuthType.BASIC) { + String credentials = targetConnection.username + ":" + targetConnection.password; + String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); + h.set("Authorization", "Basic " + encodedCredentials); + } + }); + + documentStream + .map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation + .buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size + .map(DocumentReindexer::convertToBulkJson) // Assemble the bulk request body from the parts + .flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) + .subscribe( + response -> logger.info("Batch uploaded successfully"), + error -> logger.error("Failed to upload batch", error) + ); + } - public static void reindex(String indexName, Document document, ConnectionDetails targetConnection) throws Exception { - // Get the document details + private static String convertDocumentToBulkSection(Document document) { String id = Uid.decodeId(document.getBinaryValue("_id").bytes); String source = document.getBinaryValue("_source").utf8ToString(); + String action = "{\"index\": {\"_id\": \"" + id + "\"}}"; - logger.info("Reindexing document - Index: " + indexName + ", Document ID: " + id); + return action + "\n" + source; + } - // Assemble the request details - String path = indexName + "/_doc/" + id; - String body = source; + private static String convertToBulkJson(List bulkSections) { + logger.info(bulkSections.size() + " documents in current bulk request"); + StringBuilder builder = new StringBuilder(); + for (String section : bulkSections) { + builder.append(section).append("\n"); + } + return builder.toString(); + } - // Send the request - RestClient client = new RestClient(targetConnection); - client.put(path, body, false); + private static Mono sendBulkRequest(HttpClient client, String url, String bulkJson) { + return client.post() + .uri(url) + .send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes()))) + .responseSingle((res, content) -> + content.asString() // Convert the response content to a string + .map(body -> new BulkResponseDetails(res.status().code(), body)) // Map both status code and body into a response details object + ) + .flatMap(responseDetails -> { + // Something bad happened with our request, log it + if (responseDetails.hasBadStatusCode()) { + logger.error(responseDetails.getFailureMessage()); + } + // Some of the bulk operations failed, log it + else if (responseDetails.hasFailedOperations()) { + logger.error(responseDetails.getFailureMessage()); + } + return Mono.just(responseDetails); + }) + .doOnError(err -> { + // We weren't even able to complete the request, log it + logger.error("Bulk request failed", err); + }) + .then(); } public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { @@ -29,4 +92,33 @@ public static void refreshAllDocuments(ConnectionDetails targetConnection) throw RestClient client = new RestClient(targetConnection); client.get("_refresh", false); } + + static class BulkResponseDetails { + public final int statusCode; + public final String body; + + BulkResponseDetails(int statusCode, String body) { + this.statusCode = statusCode; + this.body = body; + } + + public boolean hasBadStatusCode() { + return !(statusCode == 200 || statusCode == 201); + } + + public boolean hasFailedOperations() { + return body.contains("\"errors\":true"); + } + + public String getFailureMessage() { + String failureMessage; + if (hasBadStatusCode()) { + failureMessage = "Bulk request failed. Status code: " + statusCode + ", Response body: " + body; + } else { + failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; + } + + return failureMessage; + } + } } diff --git a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java index 6f17cd9a1..e78bfe000 100644 --- a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java @@ -1,8 +1,7 @@ package com.rfs.common; +import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -11,40 +10,57 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.BytesRef; +import reactor.core.publisher.Flux; public class LuceneDocumentsReader { private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class); - public static List readDocuments(Path luceneFilesBasePath, String indexName, int shardId) throws Exception { + public static Flux readDocuments(Path luceneFilesBasePath, String indexName, int shardId) { Path indexDirectoryPath = luceneFilesBasePath.resolve(indexName).resolve(String.valueOf(shardId)); - List documents = new ArrayList<>(); + return Flux.using( + () -> DirectoryReader.open(FSDirectory.open(indexDirectoryPath)), + reader -> { + logger.info(reader.maxDoc() + " documents found in the current Lucene index"); - try (FSDirectory directory = FSDirectory.open(indexDirectoryPath); - IndexReader reader = DirectoryReader.open(directory)) { - - // Add all documents to our output that have the _source field set and filled in - for (int i = 0; i < reader.maxDoc(); i++) { - Document document = reader.document(i); - BytesRef source_bytes = document.getBinaryValue("_source"); - String id; - // TODO Improve handling of missing document id (https://opensearch.atlassian.net/browse/MIGRATIONS-1649) + return Flux.range(0, reader.maxDoc()) // Extract all the Documents in the IndexReader + .map(i -> getDocument(reader, i)) + .cast(Document.class); + }, + reader -> { // Close the IndexReader when done try { - id = Uid.decodeId(reader.document(i).getBinaryValue("_id").bytes); - } catch (Exception e) { - logger.warn("Unable to parse Document id from Document"); - id = "unknown-id"; - } - if (source_bytes == null || source_bytes.bytes.length == 0) { // Skip deleted documents - logger.info("Document " + id + " is deleted or doesn't have the _source field enabled"); - continue; + reader.close(); + } catch (IOException e) { + logger.error("Failed to close IndexReader", e); } + } + ).filter(doc -> doc != null); // Skip docs that failed to read + } - documents.add(document); - logger.info("Document " + id + " read successfully"); + private static Document getDocument(IndexReader reader, int docId) { + try { + Document document = reader.document(docId); + BytesRef source_bytes = document.getBinaryValue("_source"); + String id; + try { + id = Uid.decodeId(reader.document(docId).getBinaryValue("_id").bytes); + } catch (Exception e) { + StringBuilder errorMessage = new StringBuilder(); + errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); + document.getFields().forEach(f -> errorMessage.append(f.name()).append(", ")); + logger.error(errorMessage.toString()); + return null; // Skip documents with missing id + } + if (source_bytes == null || source_bytes.bytes.length == 0) { + logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled"); + return null; // Skip deleted documents or those without the _source field } - } - return documents; + logger.debug("Document " + id + " read successfully"); + return document; + } catch (Exception e) { + logger.error("Failed to read document at Lucene index location " + docId, e); + return null; + } } } diff --git a/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java b/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java index 7d6d598bc..fa31ca9bb 100644 --- a/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java +++ b/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java @@ -13,11 +13,11 @@ public class ConnectionDetailsTest { static Stream happyPathArgs() { return Stream.of( - Arguments.of("https://localhost:9200", "username", "pass", "https://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTPS, "localhost", "9200"), - Arguments.of("http://localhost:9200", "username", "pass", "http://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", "9200"), - Arguments.of("http://localhost:9200", null, null, "http://localhost:9200", null, null, ConnectionDetails.Protocol.HTTP, "localhost", "9200"), + Arguments.of("https://localhost:9200", "username", "pass", "https://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTPS, "localhost", 9200), + Arguments.of("http://localhost:9200", "username", "pass", "http://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", 9200), + Arguments.of("http://localhost:9200", null, null, "http://localhost:9200", null, null, ConnectionDetails.Protocol.HTTP, "localhost", 9200), Arguments.of("http://localhost", "username", "pass", "http://localhost", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", null), - Arguments.of("http://localhost:9200/longer/path", "username", "pass", "http://localhost:9200/longer/path", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", "9200"), + Arguments.of("http://localhost:9200/longer/path", "username", "pass", "http://localhost:9200/longer/path", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", 9200), Arguments.of(null, "username", "pass", null, "username", "pass", null, null, null) ); } From 482b9b83984ff7d65cff1806cb0e6e210275824c Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Wed, 24 Apr 2024 08:27:29 -0500 Subject: [PATCH 3/5] Fixes per PR; unit tested LuceneDocumentsReader Signed-off-by: Chris Helma --- RFS/build.gradle | 11 +++ .../java/com/rfs/ReindexFromSnapshot.java | 4 +- .../com/rfs/common/ConnectionDetails.java | 22 +++-- .../com/rfs/common/LuceneDocumentsReader.java | 22 +++-- RFS/src/main/java/com/rfs/common/Uid.java | 6 +- .../com/rfs/common/ConnectionDetailsTest.java | 6 +- .../rfs/common/LuceneDocumentsReaderTest.java | 95 +++++++++++++++++++ RFS/src/test/resources/log4j2-test.xml | 13 +++ 8 files changed, 154 insertions(+), 25 deletions(-) create mode 100644 RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java create mode 100644 RFS/src/test/resources/log4j2-test.xml diff --git a/RFS/build.gradle b/RFS/build.gradle index 322c868d1..2cc9c1477 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -43,6 +43,8 @@ dependencies { implementation 'io.projectreactor.netty:reactor-netty-http' implementation 'software.amazon.awssdk:s3:2.25.16' + testImplementation 'io.projectreactor:reactor-test:3.6.5' + testImplementation 'org.apache.logging.log4j:log4j-core:2.23.1' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2' testImplementation 'org.junit.jupiter:junit-jupiter-params:5.10.2' testImplementation 'org.mockito:mockito-core:5.11.0' @@ -124,4 +126,13 @@ tasks.getByName('composeUp') test { useJUnitPlatform() + + testLogging { + exceptionFormat = 'full' + events "failed" + showExceptions true + showCauses true + showStackTraces true + showStandardStreams = true + } } \ No newline at end of file diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 4afc22cfa..f19c7aeea 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -356,10 +356,10 @@ public static void main(String[] args) throws InterruptedException { for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ==="); - Flux documents = LuceneDocumentsReader.readDocuments(luceneDirPath, indexMetadata.getName(), shardId); + Flux documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId); String targetIndex = indexMetadata.getName() + indexSuffix; DocumentReindexer.reindex(targetIndex, documents, targetConnection); - + logger.info("Shard reindexing completed"); } } diff --git a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java index 838d9dbd6..4e2edb7b9 100644 --- a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java +++ b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java @@ -1,5 +1,8 @@ package com.rfs.common; +import java.net.URI; +import java.net.URISyntaxException; + /** * Stores the connection details (assuming basic auth) for an Elasticsearch/OpenSearch cluster */ @@ -42,20 +45,19 @@ public ConnectionDetails(String url, String username, String password) { port = -1; protocol = null; } else { - // Parse the URL to get the protocol, host name, and port - String[] urlParts = url.split("://"); - if (urlParts.length != 2) { - throw new IllegalArgumentException("Invalid URL format"); + URI uri; + try { + uri = new URI(url); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid URL format", e); } - hostName = urlParts[1].split(":")[0]; - - String[] portParts = urlParts[1].split(":"); - port = portParts.length == 1 ? -1 : Integer.parseInt(portParts[1].split("/")[0]); + hostName = uri.getHost(); + port = uri.getPort(); // Default port can be set here if -1 - if (urlParts[0].equals("http")) { + if (uri.getScheme().equals("http")) { protocol = Protocol.HTTP; - } else if (urlParts[0].equals("https")) { + } else if (uri.getScheme().equals("https")) { protocol = Protocol.HTTPS; } else { throw new IllegalArgumentException("Invalid protocol"); diff --git a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java index e78bfe000..d8698c2e2 100644 --- a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java @@ -15,17 +15,21 @@ public class LuceneDocumentsReader { private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class); - public static Flux readDocuments(Path luceneFilesBasePath, String indexName, int shardId) { + public Flux readDocuments(Path luceneFilesBasePath, String indexName, int shardId) { Path indexDirectoryPath = luceneFilesBasePath.resolve(indexName).resolve(String.valueOf(shardId)); return Flux.using( - () -> DirectoryReader.open(FSDirectory.open(indexDirectoryPath)), + () -> openIndexReader(indexDirectoryPath), reader -> { logger.info(reader.maxDoc() + " documents found in the current Lucene index"); return Flux.range(0, reader.maxDoc()) // Extract all the Documents in the IndexReader - .map(i -> getDocument(reader, i)) - .cast(Document.class); + .handle((i, sink) -> { + Document doc = getDocument(reader, i); + if (doc != null) { // Skip malformed docs + sink.next(doc); + } + }).cast(Document.class); }, reader -> { // Close the IndexReader when done try { @@ -34,16 +38,20 @@ public static Flux readDocuments(Path luceneFilesBasePath, String inde logger.error("Failed to close IndexReader", e); } } - ).filter(doc -> doc != null); // Skip docs that failed to read + ); } - private static Document getDocument(IndexReader reader, int docId) { + protected IndexReader openIndexReader(Path indexDirectoryPath) throws IOException { + return DirectoryReader.open(FSDirectory.open(indexDirectoryPath)); + } + + protected Document getDocument(IndexReader reader, int docId) { try { Document document = reader.document(docId); BytesRef source_bytes = document.getBinaryValue("_source"); String id; try { - id = Uid.decodeId(reader.document(docId).getBinaryValue("_id").bytes); + id = Uid.decodeId(document.getBinaryValue("_id").bytes); } catch (Exception e) { StringBuilder errorMessage = new StringBuilder(); errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); diff --git a/RFS/src/main/java/com/rfs/common/Uid.java b/RFS/src/main/java/com/rfs/common/Uid.java index 22e624f6d..fcec17a30 100644 --- a/RFS/src/main/java/com/rfs/common/Uid.java +++ b/RFS/src/main/java/com/rfs/common/Uid.java @@ -10,9 +10,9 @@ * See: https://github.com/elastic/elasticsearch/blob/6.8/server/src/main/java/org/elasticsearch/index/mapper/Uid.java#L32 */ public class Uid { - private static final int UTF8 = 0xff; - private static final int NUMERIC = 0xfe; - private static final int BASE64_ESCAPE = 0xfd; + public static final int UTF8 = 0xff; + public static final int NUMERIC = 0xfe; + public static final int BASE64_ESCAPE = 0xfd; private static String decodeNumericId(byte[] idBytes, int offset, int len) { assert Byte.toUnsignedInt(idBytes[offset]) == NUMERIC; diff --git a/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java b/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java index fa31ca9bb..b76710bf4 100644 --- a/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java +++ b/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java @@ -16,9 +16,9 @@ static Stream happyPathArgs() { Arguments.of("https://localhost:9200", "username", "pass", "https://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTPS, "localhost", 9200), Arguments.of("http://localhost:9200", "username", "pass", "http://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", 9200), Arguments.of("http://localhost:9200", null, null, "http://localhost:9200", null, null, ConnectionDetails.Protocol.HTTP, "localhost", 9200), - Arguments.of("http://localhost", "username", "pass", "http://localhost", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", null), + Arguments.of("http://localhost", "username", "pass", "http://localhost", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", -1), Arguments.of("http://localhost:9200/longer/path", "username", "pass", "http://localhost:9200/longer/path", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", 9200), - Arguments.of(null, "username", "pass", null, "username", "pass", null, null, null) + Arguments.of(null, "username", "pass", null, "username", "pass", null, null, -1) ); } @@ -26,7 +26,7 @@ static Stream happyPathArgs() { @MethodSource("happyPathArgs") void ConnectionDetails_HappyPath_AsExpected(String url, String username, String password, String expectedUrl, String expectedUsername, String expectedPassword, - ConnectionDetails.Protocol expectedProtocal, String expectedHostName, String expectedPort) { + ConnectionDetails.Protocol expectedProtocal, String expectedHostName, int expectedPort) { ConnectionDetails details = new ConnectionDetails(url, username, password); assertEquals(expectedUrl, details.url); assertEquals(expectedUsername, details.username); diff --git a/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java new file mode 100644 index 000000000..3bb9d5557 --- /dev/null +++ b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java @@ -0,0 +1,95 @@ +package com.rfs.common; + +import static org.mockito.Mockito.*; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.util.BytesRef; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.nio.file.Paths; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; + +class TestLuceneDocumentsReader extends LuceneDocumentsReader { + // Helper method to correctly encode the Document IDs for test + public static byte[] encodeUtf8Id(String id) { + byte[] idBytes = id.getBytes(StandardCharsets.UTF_8); + byte[] encoded = new byte[idBytes.length + 1]; + encoded[0] = (byte) Uid.UTF8; + System.arraycopy(idBytes, 0, encoded, 1, idBytes.length); + return encoded; + } + + // Override to return a mocked IndexReader + @Override + protected IndexReader openIndexReader(Path indexDirectoryPath) throws IOException { + // Set up our test docs + Document doc1 = new Document(); + doc1.add(new StringField("_id", new BytesRef(encodeUtf8Id("id1")), Field.Store.YES)); + doc1.add(new StoredField("_source", new BytesRef("source1"))); + + Document doc2 = new Document(); + doc2.add(new StringField("_id", new BytesRef(encodeUtf8Id("id2")), Field.Store.YES)); + doc2.add(new StoredField("_source", new BytesRef("source2"))); + + Document doc3 = new Document(); + doc3.add(new StringField("_id", new BytesRef(encodeUtf8Id("id3")), Field.Store.YES)); + doc3.add(new StoredField("_source", new BytesRef("source3"))); + + Document doc4 = new Document(); // Doc w/ no fields + + Document doc5 = new Document(); // Doc w/ missing _source + doc5.add(new StringField("_id", new BytesRef(encodeUtf8Id("id5")), Field.Store.YES)); + + // Set up our mock reader + IndexReader mockReader = mock(IndexReader.class); + when(mockReader.maxDoc()).thenReturn(5); + when(mockReader.document(0)).thenReturn(doc1); + when(mockReader.document(1)).thenReturn(doc2); + when(mockReader.document(2)).thenReturn(doc3); + when(mockReader.document(3)).thenReturn(doc4); + when(mockReader.document(4)).thenReturn(doc5); + + return mockReader; + } +} + +public class LuceneDocumentsReaderTest { + @Test + void testReadDocuments() { + // Use the TestLuceneDocumentsReader to get the mocked documents + Flux documents = new TestLuceneDocumentsReader().readDocuments(Paths.get("/fake/path"), "testIndex", 1); + + // Verify that the results are as expected + StepVerifier.create(documents) + .expectNextMatches(doc -> { + String testId = Uid.decodeId(doc.getBinaryValue("_id").bytes); + String testSource = doc.getBinaryValue("_source").utf8ToString(); + return "id1".equals(testId) && "source1".equals(testSource); + }) + .expectNextMatches(doc -> { + String testId = Uid.decodeId(doc.getBinaryValue("_id").bytes); + String testSource = doc.getBinaryValue("_source").utf8ToString(); + return "id2".equals(testId) && "source2".equals(testSource); + }) + .expectNextMatches(doc -> { + String testId = Uid.decodeId(doc.getBinaryValue("_id").bytes); + String testSource = doc.getBinaryValue("_source").utf8ToString(); + return "id3".equals(testId) && "source3".equals(testSource); + }) + .expectComplete() + .verify(); + } +} + + + + + diff --git a/RFS/src/test/resources/log4j2-test.xml b/RFS/src/test/resources/log4j2-test.xml new file mode 100644 index 000000000..5940527db --- /dev/null +++ b/RFS/src/test/resources/log4j2-test.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + From 060fdbee6988343cda0aead4db61a4265ce9d203 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Wed, 24 Apr 2024 08:33:18 -0500 Subject: [PATCH 4/5] Updated a unit test name Signed-off-by: Chris Helma --- RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java index 3bb9d5557..e28daff81 100644 --- a/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java +++ b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java @@ -63,7 +63,7 @@ protected IndexReader openIndexReader(Path indexDirectoryPath) throws IOExceptio public class LuceneDocumentsReaderTest { @Test - void testReadDocuments() { + void ReadDocuments_AsExpected() { // Use the TestLuceneDocumentsReader to get the mocked documents Flux documents = new TestLuceneDocumentsReader().readDocuments(Paths.get("/fake/path"), "testIndex", 1); From bd87ce59232fe7bc427304da28c7b19f9687b941 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Wed, 24 Apr 2024 10:52:06 -0500 Subject: [PATCH 5/5] Updated a method name per PR feedback Signed-off-by: Chris Helma --- RFS/src/main/java/com/rfs/common/DocumentReindexer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index bdd04cb86..6c84488e7 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -35,7 +35,7 @@ public static void reindex(String indexName, Flux documentStream, Conn documentStream .map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation .buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size - .map(DocumentReindexer::convertToBulkJson) // Assemble the bulk request body from the parts + .map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts .flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) .subscribe( @@ -52,7 +52,7 @@ private static String convertDocumentToBulkSection(Document document) { return action + "\n" + source; } - private static String convertToBulkJson(List bulkSections) { + private static String convertToBulkRequestBody(List bulkSections) { logger.info(bulkSections.size() + " documents in current bulk request"); StringBuilder builder = new StringBuilder(); for (String section : bulkSections) {