-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFS now uses reactor-netty for bulk indexing #607
Changes from 2 commits
4541617
c690be2
482b9b8
060fdbe
bd87ce5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,124 @@ | ||
package com.rfs.common; | ||
|
||
import java.time.Duration; | ||
import java.util.Base64; | ||
import java.util.List; | ||
|
||
import io.netty.buffer.Unpooled; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.lucene.document.Document; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import reactor.netty.http.client.HttpClient; | ||
import reactor.util.retry.Retry; | ||
|
||
|
||
public class DocumentReindexer { | ||
private static final Logger logger = LogManager.getLogger(DocumentReindexer.class); | ||
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen | ||
|
||
public static void reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question from above - why should this take a Flux in? What would be lost/what would the impact be if you took in a stream and adapted it within this method? |
||
String targetUrl = "/" + indexName + "/_bulk"; | ||
HttpClient client = HttpClient.create() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we split the http client into a separate class which may get reused by different operations There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer not to at this point, actually. While I would historically agree with you, I'm trying out a new approach on this project and been really happy with how it has worked out. Specifically - avoiding being too speculative about abstractions and letting the needs of the project shape what gets created. In this case, we only have one thing that needs this reactor-netty client, and I honestly don't know what interface I would provide if I were to carve it out because I don't know how another potential part of the code might use it. Avoiding speculation on past abstractions in this project's history has been one of the key things that has enabled me to make so much progress so fast. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know if you need a separate HttpClient interface yet, but I do think that it might help & in general, you'll want to look to the future and not think about the past. |
||
.host(targetConnection.hostName) | ||
.port(targetConnection.port) | ||
.headers(h -> { | ||
h.set("Content-Type", "application/json"); | ||
if (targetConnection.authType == ConnectionDetails.AuthType.BASIC) { | ||
String credentials = targetConnection.username + ":" + targetConnection.password; | ||
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); | ||
h.set("Authorization", "Basic " + encodedCredentials); | ||
} | ||
}); | ||
|
||
documentStream | ||
.map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation | ||
.buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size | ||
.map(DocumentReindexer::convertToBulkJson) // Assemble the bulk request body from the parts | ||
.flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request | ||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) | ||
.subscribe( | ||
response -> logger.info("Batch uploaded successfully"), | ||
error -> logger.error("Failed to upload batch", error) | ||
); | ||
} | ||
|
||
public static void reindex(String indexName, Document document, ConnectionDetails targetConnection) throws Exception { | ||
// Get the document details | ||
private static String convertDocumentToBulkSection(Document document) { | ||
String id = Uid.decodeId(document.getBinaryValue("_id").bytes); | ||
String source = document.getBinaryValue("_source").utf8ToString(); | ||
String action = "{\"index\": {\"_id\": \"" + id + "\"}}"; | ||
|
||
logger.info("Reindexing document - Index: " + indexName + ", Document ID: " + id); | ||
return action + "\n" + source; | ||
} | ||
|
||
// Assemble the request details | ||
String path = indexName + "/_doc/" + id; | ||
String body = source; | ||
private static String convertToBulkJson(List<String> bulkSections) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: the function name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will do. |
||
logger.info(bulkSections.size() + " documents in current bulk request"); | ||
StringBuilder builder = new StringBuilder(); | ||
for (String section : bulkSections) { | ||
builder.append(section).append("\n"); | ||
} | ||
return builder.toString(); | ||
} | ||
|
||
// Send the request | ||
RestClient client = new RestClient(targetConnection); | ||
client.put(path, body, false); | ||
private static Mono<Void> sendBulkRequest(HttpClient client, String url, String bulkJson) { | ||
return client.post() | ||
.uri(url) | ||
.send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes()))) | ||
.responseSingle((res, content) -> | ||
content.asString() // Convert the response content to a string | ||
.map(body -> new BulkResponseDetails(res.status().code(), body)) // Map both status code and body into a response details object | ||
) | ||
.flatMap(responseDetails -> { | ||
// Something bad happened with our request, log it | ||
if (responseDetails.hasBadStatusCode()) { | ||
logger.error(responseDetails.getFailureMessage()); | ||
} | ||
// Some of the bulk operations failed, log it | ||
else if (responseDetails.hasFailedOperations()) { | ||
logger.error(responseDetails.getFailureMessage()); | ||
} | ||
return Mono.just(responseDetails); | ||
}) | ||
.doOnError(err -> { | ||
// We weren't even able to complete the request, log it | ||
logger.error("Bulk request failed", err); | ||
}) | ||
.then(); | ||
} | ||
|
||
public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { | ||
// Send the request | ||
RestClient client = new RestClient(targetConnection); | ||
client.get("_refresh", false); | ||
} | ||
|
||
static class BulkResponseDetails { | ||
public final int statusCode; | ||
public final String body; | ||
|
||
BulkResponseDetails(int statusCode, String body) { | ||
this.statusCode = statusCode; | ||
this.body = body; | ||
} | ||
|
||
public boolean hasBadStatusCode() { | ||
return !(statusCode == 200 || statusCode == 201); | ||
} | ||
|
||
public boolean hasFailedOperations() { | ||
return body.contains("\"errors\":true"); | ||
} | ||
|
||
public String getFailureMessage() { | ||
String failureMessage; | ||
if (hasBadStatusCode()) { | ||
failureMessage = "Bulk request failed. Status code: " + statusCode + ", Response body: " + body; | ||
} else { | ||
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; | ||
} | ||
|
||
return failureMessage; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,7 @@ | ||
package com.rfs.common; | ||
|
||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
@@ -11,40 +10,57 @@ | |
import org.apache.lucene.index.IndexReader; | ||
import org.apache.lucene.store.FSDirectory; | ||
import org.apache.lucene.util.BytesRef; | ||
import reactor.core.publisher.Flux; | ||
|
||
public class LuceneDocumentsReader { | ||
private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class); | ||
|
||
public static List<Document> readDocuments(Path luceneFilesBasePath, String indexName, int shardId) throws Exception { | ||
public static Flux<Document> readDocuments(Path luceneFilesBasePath, String indexName, int shardId) { | ||
Path indexDirectoryPath = luceneFilesBasePath.resolve(indexName).resolve(String.valueOf(shardId)); | ||
|
||
List<Document> documents = new ArrayList<>(); | ||
return Flux.using( | ||
() -> DirectoryReader.open(FSDirectory.open(indexDirectoryPath)), | ||
reader -> { | ||
logger.info(reader.maxDoc() + " documents found in the current Lucene index"); | ||
|
||
try (FSDirectory directory = FSDirectory.open(indexDirectoryPath); | ||
IndexReader reader = DirectoryReader.open(directory)) { | ||
|
||
// Add all documents to our output that have the _source field set and filled in | ||
for (int i = 0; i < reader.maxDoc(); i++) { | ||
Document document = reader.document(i); | ||
BytesRef source_bytes = document.getBinaryValue("_source"); | ||
String id; | ||
// TODO Improve handling of missing document id (https://opensearch.atlassian.net/browse/MIGRATIONS-1649) | ||
return Flux.range(0, reader.maxDoc()) // Extract all the Documents in the IndexReader | ||
.map(i -> getDocument(reader, i)) | ||
.cast(Document.class); | ||
}, | ||
reader -> { // Close the IndexReader when done | ||
try { | ||
id = Uid.decodeId(reader.document(i).getBinaryValue("_id").bytes); | ||
} catch (Exception e) { | ||
logger.warn("Unable to parse Document id from Document"); | ||
id = "unknown-id"; | ||
} | ||
if (source_bytes == null || source_bytes.bytes.length == 0) { // Skip deleted documents | ||
logger.info("Document " + id + " is deleted or doesn't have the _source field enabled"); | ||
continue; | ||
reader.close(); | ||
} catch (IOException e) { | ||
logger.error("Failed to close IndexReader", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like it's probably a really bad exception. Why should the program keep running? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question; probably does make sense to kill the process at this point. I realized just now that Reactor was unhappy with the fact that a checked exception was being thrown but I totally could have thrown an unchecked exception here or something. |
||
} | ||
} | ||
).filter(doc -> doc != null); // Skip docs that failed to read | ||
} | ||
|
||
documents.add(document); | ||
logger.info("Document " + id + " read successfully"); | ||
private static Document getDocument(IndexReader reader, int docId) { | ||
try { | ||
Document document = reader.document(docId); | ||
BytesRef source_bytes = document.getBinaryValue("_source"); | ||
String id; | ||
try { | ||
id = Uid.decodeId(reader.document(docId).getBinaryValue("_id").bytes); | ||
} catch (Exception e) { | ||
StringBuilder errorMessage = new StringBuilder(); | ||
errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); | ||
document.getFields().forEach(f -> errorMessage.append(f.name()).append(", ")); | ||
logger.error(errorMessage.toString()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider PII for ERROR. I think that it's fair, but you should call it out... maybe PII possible loggers should have their own logger name convention so that operators could easily mask them out if necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Cool - will look into that for the future.
I think we need to have a larger discussion around stuff like PII concerns, because I suspect they will impact many aspects of the implementation if we're designing to address them up front. |
||
return null; // Skip documents with missing id | ||
} | ||
if (source_bytes == null || source_bytes.bytes.length == 0) { | ||
logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this be better suited for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I felt (and I guess still feel) that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which one is it - can you tell the difference? Could the reader of the log tell the difference? Is there something in the beginning of the log that would give the user a clue? If _source wasn't enabled, could this flood the logs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any chance that docId could have PII in it? The docId could be customer generated, right? Or are they only internal ids that are separately mapped to the customer-given ones? If they're customer driven, I'd push this to debug to promote the policy that no PII could be shown for INFO and above logs. This feels like it isn't a great spot to be in. I'm hoping that there's a way to show an identifier without risking divulging a customer value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I am not currently aware of how to tell the difference. We have a task to look into this more (see: https://opensearch.atlassian.net/browse/MIGRATIONS-1629)
The Regarding PII - that's a larger discussion for the team to have. I'll book a timeslot to discuss as a reminder. |
||
return null; // Skip deleted documents or those without the _source field | ||
} | ||
} | ||
|
||
return documents; | ||
logger.debug("Document " + id + " read successfully"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. atDebug().setMessage(()->...).log() |
||
return document; | ||
} catch (Exception e) { | ||
logger.error("Failed to read document at Lucene index location " + docId, e); | ||
return null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would you be better off propagating the exception (or a new exception) upward? Catching Exception is probably going to eventually bite you too. I'd recommend making new checked exceptions, using Illegal*Exception from java's libraries, or using SneakyThrows. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe? I'm honestly unsure how to handle this particular exception situation, so I decided to just log and circle back on it later. I don't know what you'd do with an exception for this at a higher level. You probably wouldn't kill the entire process, right? Maybe you kill yourself if you see a large number of the same exception? Seems like something for a later iteration. |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ public RestClient(ConnectionDetails connectionDetails) { | |
} | ||
|
||
public Response get(String path, boolean quietLogging) throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this be clearer if you passed an Slf4j.Level in here instead. Why would you want the caller to dictate the log level? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense; I'll think about that for future iterations. |
||
String urlString = connectionDetails.host + "/" + path; | ||
String urlString = connectionDetails.url + "/" + path; | ||
|
||
URL url = new URL(urlString); | ||
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the plan is to deprecate this class, use the @deprecate annotation for it (before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure whether we want to deprecate this class or not in the long run. I would assume so, but honestly the only place we really need to use the greater abilities of the reactor-netty client is for reindexing; this is fine elsewhere. For that reason, I left this in place for the time being. |
||
|
@@ -76,7 +76,7 @@ public Response get(String path, boolean quietLogging) throws Exception { | |
} | ||
|
||
public Response put(String path, String body, boolean quietLogging) throws Exception { | ||
String urlString = connectionDetails.host + "/" + path; | ||
String urlString = connectionDetails.url + "/" + path; | ||
|
||
URL url = new URL(urlString); | ||
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use
java.net.URI
forThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, interesting - this makes sense. I was thinking about adding regex checking for the user inputs at beginning of too, but defense-in-depth is a good approach.