Skip to content

Commit

Permalink
RFS now uses reactor-netty for bulk indexing (opensearch-project#607)
Browse files Browse the repository at this point in the history
* Checkpoint: improved ConnectionDetails; unit tested it

Signed-off-by: Chris Helma <[email protected]>

* RFS now uses reactor-netty and bulk indexing

Signed-off-by: Chris Helma <[email protected]>

* Fixes per PR; unit tested LuceneDocumentsReader

Signed-off-by: Chris Helma <[email protected]>

* Updated a unit test name

Signed-off-by: Chris Helma <[email protected]>

* Updated a method name per PR feedback

Signed-off-by: Chris Helma <[email protected]>

---------

Signed-off-by: Chris Helma <[email protected]>
  • Loading branch information
chelma authored and AndreKurait committed Apr 25, 2024
1 parent ca5c157 commit 9cdf46c
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 52 deletions.
15 changes: 15 additions & 0 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ dependencies {
implementation 'org.apache.lucene:lucene-core:8.11.3'
implementation 'org.apache.lucene:lucene-analyzers-common:8.11.3'
implementation 'org.apache.lucene:lucene-backward-codecs:8.11.3'

implementation platform('io.projectreactor:reactor-bom:2023.0.5')
implementation 'io.projectreactor.netty:reactor-netty-core'
implementation 'io.projectreactor.netty:reactor-netty-http'
implementation 'software.amazon.awssdk:s3:2.25.16'

testImplementation 'io.projectreactor:reactor-test:3.6.5'
testImplementation 'org.apache.logging.log4j:log4j-core:2.23.1'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.10.2'
testImplementation 'org.mockito:mockito-core:5.11.0'
Expand Down Expand Up @@ -120,4 +126,13 @@ tasks.getByName('composeUp')

test {
useJUnitPlatform()

testLogging {
exceptionFormat = 'full'
events "failed"
showExceptions true
showCauses true
showStackTraces true
showStandardStreams = true
}
}
16 changes: 6 additions & 10 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import reactor.core.publisher.Flux;

import com.rfs.common.*;
import com.rfs.transformers.*;
Expand Down Expand Up @@ -125,7 +126,7 @@ public static void main(String[] args) throws InterruptedException {

Logging.setLevel(logLevel);

ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

// Sanity checks
Expand Down Expand Up @@ -355,18 +356,13 @@ public static void main(String[] args) throws InterruptedException {
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ===");

List<Document> documents = LuceneDocumentsReader.readDocuments(luceneDirPath, indexMetadata.getName(), shardId);
logger.info("Documents read successfully");
Flux<Document> documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId);
String targetIndex = indexMetadata.getName() + indexSuffix;
DocumentReindexer.reindex(targetIndex, documents, targetConnection);

for (Document document : documents) {
String targetIndex = indexMetadata.getName() + indexSuffix;
DocumentReindexer.reindex(targetIndex, document, targetConnection);
}
logger.info("Shard reindexing completed");
}
}

logger.info("Documents reindexed successfully");

logger.info("Refreshing newly added documents");
DocumentReindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
Expand Down
41 changes: 38 additions & 3 deletions RFS/src/main/java/com/rfs/common/ConnectionDetails.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.rfs.common;

import java.net.URI;
import java.net.URISyntaxException;

/**
* Stores the connection details (assuming basic auth) for an Elasticsearch/OpenSearch cluster
*/
Expand All @@ -9,13 +12,21 @@ public static enum AuthType {
NONE
}

public final String host;
public static enum Protocol {
HTTP,
HTTPS
}

public final String url;
public final Protocol protocol;
public final String hostName;
public final int port;
public final String username;
public final String password;
public final AuthType authType;

public ConnectionDetails(String host, String username, String password) {
this.host = host; // http://localhost:9200
public ConnectionDetails(String url, String username, String password) {
this.url = url; // http://localhost:9200

// If the username is provided, the password must be as well, and vice versa
if ((username == null && password != null) || (username != null && password == null)) {
Expand All @@ -28,5 +39,29 @@ public ConnectionDetails(String host, String username, String password) {

this.username = username;
this.password = password;

if (url == null) {
hostName = null;
port = -1;
protocol = null;
} else {
URI uri;
try {
uri = new URI(url);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid URL format", e);
}

hostName = uri.getHost();
port = uri.getPort(); // Default port can be set here if -1

if (uri.getScheme().equals("http")) {
protocol = Protocol.HTTP;
} else if (uri.getScheme().equals("https")) {
protocol = Protocol.HTTPS;
} else {
throw new IllegalArgumentException("Invalid protocol");
}
}
}
}
110 changes: 101 additions & 9 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,124 @@
package com.rfs.common;

import java.time.Duration;
import java.util.Base64;
import java.util.List;

import io.netty.buffer.Unpooled;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Document;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.util.retry.Retry;


public class DocumentReindexer {
private static final Logger logger = LogManager.getLogger(DocumentReindexer.class);
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen

public static void reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception {
String targetUrl = "/" + indexName + "/_bulk";
HttpClient client = HttpClient.create()
.host(targetConnection.hostName)
.port(targetConnection.port)
.headers(h -> {
h.set("Content-Type", "application/json");
if (targetConnection.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = targetConnection.username + ":" + targetConnection.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.set("Authorization", "Basic " + encodedCredentials);
}
});

documentStream
.map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation
.buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size
.map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts
.flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)))
.subscribe(
response -> logger.info("Batch uploaded successfully"),
error -> logger.error("Failed to upload batch", error)
);
}

public static void reindex(String indexName, Document document, ConnectionDetails targetConnection) throws Exception {
// Get the document details
private static String convertDocumentToBulkSection(Document document) {
String id = Uid.decodeId(document.getBinaryValue("_id").bytes);
String source = document.getBinaryValue("_source").utf8ToString();
String action = "{\"index\": {\"_id\": \"" + id + "\"}}";

logger.info("Reindexing document - Index: " + indexName + ", Document ID: " + id);
return action + "\n" + source;
}

// Assemble the request details
String path = indexName + "/_doc/" + id;
String body = source;
private static String convertToBulkRequestBody(List<String> bulkSections) {
logger.info(bulkSections.size() + " documents in current bulk request");
StringBuilder builder = new StringBuilder();
for (String section : bulkSections) {
builder.append(section).append("\n");
}
return builder.toString();
}

// Send the request
RestClient client = new RestClient(targetConnection);
client.put(path, body, false);
private static Mono<Void> sendBulkRequest(HttpClient client, String url, String bulkJson) {
return client.post()
.uri(url)
.send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes())))
.responseSingle((res, content) ->
content.asString() // Convert the response content to a string
.map(body -> new BulkResponseDetails(res.status().code(), body)) // Map both status code and body into a response details object
)
.flatMap(responseDetails -> {
// Something bad happened with our request, log it
if (responseDetails.hasBadStatusCode()) {
logger.error(responseDetails.getFailureMessage());
}
// Some of the bulk operations failed, log it
else if (responseDetails.hasFailedOperations()) {
logger.error(responseDetails.getFailureMessage());
}
return Mono.just(responseDetails);
})
.doOnError(err -> {
// We weren't even able to complete the request, log it
logger.error("Bulk request failed", err);
})
.then();
}

public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception {
// Send the request
RestClient client = new RestClient(targetConnection);
client.get("_refresh", false);
}

static class BulkResponseDetails {
public final int statusCode;
public final String body;

BulkResponseDetails(int statusCode, String body) {
this.statusCode = statusCode;
this.body = body;
}

public boolean hasBadStatusCode() {
return !(statusCode == 200 || statusCode == 201);
}

public boolean hasFailedOperations() {
return body.contains("\"errors\":true");
}

public String getFailureMessage() {
String failureMessage;
if (hasBadStatusCode()) {
failureMessage = "Bulk request failed. Status code: " + statusCode + ", Response body: " + body;
} else {
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body;
}

return failureMessage;
}
}
}
74 changes: 49 additions & 25 deletions RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.rfs.common;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -11,40 +10,65 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;
import reactor.core.publisher.Flux;

public class LuceneDocumentsReader {
private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class);

public static List<Document> readDocuments(Path luceneFilesBasePath, String indexName, int shardId) throws Exception {
public Flux<Document> readDocuments(Path luceneFilesBasePath, String indexName, int shardId) {
Path indexDirectoryPath = luceneFilesBasePath.resolve(indexName).resolve(String.valueOf(shardId));

List<Document> documents = new ArrayList<>();
return Flux.using(
() -> openIndexReader(indexDirectoryPath),
reader -> {
logger.info(reader.maxDoc() + " documents found in the current Lucene index");

try (FSDirectory directory = FSDirectory.open(indexDirectoryPath);
IndexReader reader = DirectoryReader.open(directory)) {

// Add all documents to our output that have the _source field set and filled in
for (int i = 0; i < reader.maxDoc(); i++) {
Document document = reader.document(i);
BytesRef source_bytes = document.getBinaryValue("_source");
String id;
// TODO Improve handling of missing document id (https://opensearch.atlassian.net/browse/MIGRATIONS-1649)
return Flux.range(0, reader.maxDoc()) // Extract all the Documents in the IndexReader
.handle((i, sink) -> {
Document doc = getDocument(reader, i);
if (doc != null) { // Skip malformed docs
sink.next(doc);
}
}).cast(Document.class);
},
reader -> { // Close the IndexReader when done
try {
id = Uid.decodeId(reader.document(i).getBinaryValue("_id").bytes);
} catch (Exception e) {
logger.warn("Unable to parse Document id from Document");
id = "unknown-id";
}
if (source_bytes == null || source_bytes.bytes.length == 0) { // Skip deleted documents
logger.info("Document " + id + " is deleted or doesn't have the _source field enabled");
continue;
reader.close();
} catch (IOException e) {
logger.error("Failed to close IndexReader", e);
}
}
);
}

documents.add(document);
logger.info("Document " + id + " read successfully");
protected IndexReader openIndexReader(Path indexDirectoryPath) throws IOException {
return DirectoryReader.open(FSDirectory.open(indexDirectoryPath));
}

protected Document getDocument(IndexReader reader, int docId) {
try {
Document document = reader.document(docId);
BytesRef source_bytes = document.getBinaryValue("_source");
String id;
try {
id = Uid.decodeId(document.getBinaryValue("_id").bytes);
} catch (Exception e) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Unable to parse Document id from Document. The Document's Fields: ");
document.getFields().forEach(f -> errorMessage.append(f.name()).append(", "));
logger.error(errorMessage.toString());
return null; // Skip documents with missing id
}
if (source_bytes == null || source_bytes.bytes.length == 0) {
logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled");
return null; // Skip deleted documents or those without the _source field
}
}

return documents;
logger.debug("Document " + id + " read successfully");
return document;
} catch (Exception e) {
logger.error("Failed to read document at Lucene index location " + docId, e);
return null;
}
}
}
4 changes: 2 additions & 2 deletions RFS/src/main/java/com/rfs/common/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public RestClient(ConnectionDetails connectionDetails) {
}

public Response get(String path, boolean quietLogging) throws Exception {
String urlString = connectionDetails.host + "/" + path;
String urlString = connectionDetails.url + "/" + path;

URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
Expand Down Expand Up @@ -76,7 +76,7 @@ public Response get(String path, boolean quietLogging) throws Exception {
}

public Response put(String path, String body, boolean quietLogging) throws Exception {
String urlString = connectionDetails.host + "/" + path;
String urlString = connectionDetails.url + "/" + path;

URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
Expand Down
6 changes: 3 additions & 3 deletions RFS/src/main/java/com/rfs/common/Uid.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
* See: https://github.com/elastic/elasticsearch/blob/6.8/server/src/main/java/org/elasticsearch/index/mapper/Uid.java#L32
*/
public class Uid {
private static final int UTF8 = 0xff;
private static final int NUMERIC = 0xfe;
private static final int BASE64_ESCAPE = 0xfd;
public static final int UTF8 = 0xff;
public static final int NUMERIC = 0xfe;
public static final int BASE64_ESCAPE = 0xfd;

private static String decodeNumericId(byte[] idBytes, int offset, int len) {
assert Byte.toUnsignedInt(idBytes[offset]) == NUMERIC;
Expand Down
Loading

0 comments on commit 9cdf46c

Please sign in to comment.