-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFS] Refactored DocumentReindexer, HTTP client usage, added a OpenSearchClient #623
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,36 @@ | ||
package com.rfs.common; | ||
|
||
import java.time.Duration; | ||
import java.util.Base64; | ||
import java.util.List; | ||
|
||
import io.netty.buffer.Unpooled; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.lucene.document.Document; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import reactor.netty.http.client.HttpClient; | ||
import reactor.util.retry.Retry; | ||
|
||
|
||
public class DocumentReindexer { | ||
private static final Logger logger = LogManager.getLogger(DocumentReindexer.class); | ||
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen | ||
|
||
public static void reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception { | ||
String targetUrl = "/" + indexName + "/_bulk"; | ||
HttpClient client = HttpClient.create() | ||
.host(targetConnection.hostName) | ||
.port(targetConnection.port) | ||
.headers(h -> { | ||
h.set("Content-Type", "application/json"); | ||
if (targetConnection.authType == ConnectionDetails.AuthType.BASIC) { | ||
String credentials = targetConnection.username + ":" + targetConnection.password; | ||
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); | ||
h.set("Authorization", "Basic " + encodedCredentials); | ||
} | ||
}); | ||
public static Mono<Void> reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception { | ||
OpenSearchClient client = new OpenSearchClient(targetConnection); | ||
|
||
documentStream | ||
return documentStream | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reactor does look like a pretty nice and powerful library. I'm wondering if we should think about using it for the replayer, especially as we get more serious about retries. I'm wondering if it supports scheduling. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think it does support scheduling. It's a pub-sub model, and you can write your own subscriber model and pull from the publisher however you want. At least, that's my understanding from the docs. |
||
.map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation | ||
.buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size | ||
.doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request")) | ||
.map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts | ||
.flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request | ||
.flatMap(bulkJson -> client.sendBulkRequest(indexName, bulkJson) // Send the request | ||
.doOnSuccess(unused -> logger.debug("Batch succeeded")) | ||
.doOnError(error -> logger.error("Batch failed", error)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You'll want to include some identifier so that you can determine the cause and impact of the failure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, the error message includes the response body of the failed POST. I think that should be sufficient? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I missed the error, sorry. If error is an exception, you could also do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll take another pass at all the logging stuff a bit later, if you don't mind; want to get the core functionality in place right now. |
||
.onErrorResume(e -> Mono.empty()) // Prevent the error from stopping the entire stream | ||
) | ||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) | ||
.subscribe( | ||
response -> logger.info("Batch uploaded successfully"), | ||
error -> logger.error("Failed to upload batch", error) | ||
); | ||
.doOnComplete(() -> logger.debug("All batches processed")) | ||
.then(); | ||
} | ||
|
||
private static String convertDocumentToBulkSection(Document document) { | ||
|
@@ -53,72 +42,16 @@ private static String convertDocumentToBulkSection(Document document) { | |
} | ||
|
||
private static String convertToBulkRequestBody(List<String> bulkSections) { | ||
logger.info(bulkSections.size() + " documents in current bulk request"); | ||
StringBuilder builder = new StringBuilder(); | ||
for (String section : bulkSections) { | ||
builder.append(section).append("\n"); | ||
} | ||
return builder.toString(); | ||
} | ||
|
||
private static Mono<Void> sendBulkRequest(HttpClient client, String url, String bulkJson) { | ||
return client.post() | ||
.uri(url) | ||
.send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes()))) | ||
.responseSingle((res, content) -> | ||
content.asString() // Convert the response content to a string | ||
.map(body -> new BulkResponseDetails(res.status().code(), body)) // Map both status code and body into a response details object | ||
) | ||
.flatMap(responseDetails -> { | ||
// Something bad happened with our request, log it | ||
if (responseDetails.hasBadStatusCode()) { | ||
logger.error(responseDetails.getFailureMessage()); | ||
} | ||
// Some of the bulk operations failed, log it | ||
else if (responseDetails.hasFailedOperations()) { | ||
logger.error(responseDetails.getFailureMessage()); | ||
} | ||
return Mono.just(responseDetails); | ||
}) | ||
.doOnError(err -> { | ||
// We weren't even able to complete the request, log it | ||
logger.error("Bulk request failed", err); | ||
}) | ||
.then(); | ||
} | ||
|
||
public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { | ||
// Send the request | ||
RestClient client = new RestClient(targetConnection); | ||
client.get("_refresh", false); | ||
} | ||
|
||
static class BulkResponseDetails { | ||
public final int statusCode; | ||
public final String body; | ||
|
||
BulkResponseDetails(int statusCode, String body) { | ||
this.statusCode = statusCode; | ||
this.body = body; | ||
} | ||
|
||
public boolean hasBadStatusCode() { | ||
return !(statusCode == 200 || statusCode == 201); | ||
} | ||
|
||
public boolean hasFailedOperations() { | ||
return body.contains("\"errors\":true"); | ||
} | ||
|
||
public String getFailureMessage() { | ||
String failureMessage; | ||
if (hasBadStatusCode()) { | ||
failureMessage = "Bulk request failed. Status code: " + statusCode + ", Response body: " + body; | ||
} else { | ||
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; | ||
} | ||
|
||
return failureMessage; | ||
} | ||
OpenSearchClient client = new OpenSearchClient(targetConnection); | ||
client.refresh(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,14 +9,14 @@ public class FileSystemSnapshotCreator extends SnapshotCreator { | |
private static final Logger logger = LogManager.getLogger(FileSystemSnapshotCreator.class); | ||
private static final ObjectMapper mapper = new ObjectMapper(); | ||
|
||
private final ConnectionDetails connectionDetails; | ||
private final OpenSearchClient client; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: what kind of client is this for - it's to upload source cluster data, source cluster metadata, or RFS metadata? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Quite literally, a client to interact w/ OpenSearch as a layer above the RestClient. So, yes. We may have another client layer abstraction above it (such as the OpenSearchCmsClient class I have in local changes that uses an OpenSearchClient to interact w/ the Coordinating Metadata Store). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That tells me what the class is. I was asking about what this particular instance was being used for within this class. I was trying to understand how this class worked. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, it's used to take a snapshot from source cluster to the local filesytem. Should be self-evident from the class/method names? If not, open to suggestions for ways to make it more obvious. |
||
private final String snapshotName; | ||
private final String snapshotRepoDirectoryPath; | ||
|
||
public FileSystemSnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String snapshotRepoDirectoryPath) { | ||
super(snapshotName, connectionDetails); | ||
public FileSystemSnapshotCreator(String snapshotName, OpenSearchClient client, String snapshotRepoDirectoryPath) { | ||
super(snapshotName, client); | ||
this.snapshotName = snapshotName; | ||
this.connectionDetails = connectionDetails; | ||
this.client = client; | ||
this.snapshotRepoDirectoryPath = snapshotRepoDirectoryPath; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,8 +10,11 @@ | |
import org.apache.lucene.index.IndexReader; | ||
import org.apache.lucene.store.FSDirectory; | ||
import org.apache.lucene.util.BytesRef; | ||
|
||
import lombok.Lombok; | ||
import reactor.core.publisher.Flux; | ||
|
||
|
||
public class LuceneDocumentsReader { | ||
private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class); | ||
|
||
|
@@ -36,6 +39,7 @@ public Flux<Document> readDocuments(Path luceneFilesBasePath, String indexName, | |
reader.close(); | ||
} catch (IOException e) { | ||
logger.error("Failed to close IndexReader", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can also write this as logger.atError().setCause(e).setMessage(()->"...").log(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll do logging pass a bit later, want to get the core functionality in place first. |
||
Lombok.sneakyThrow(e); | ||
} | ||
} | ||
); | ||
|
@@ -61,7 +65,7 @@ protected Document getDocument(IndexReader reader, int docId) { | |
} | ||
if (source_bytes == null || source_bytes.bytes.length == 0) { | ||
logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled"); | ||
return null; // Skip deleted documents or those without the _source field | ||
return null; // Skip these too | ||
} | ||
|
||
logger.debug("Document " + id + " read successfully"); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
package com.rfs.common; | ||
|
||
import java.net.HttpURLConnection; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
|
||
import reactor.core.publisher.Mono; | ||
|
||
public class OpenSearchClient { | ||
private static final Logger logger = LogManager.getLogger(OpenSearchClient.class); | ||
|
||
public static class BulkResponse extends RestClient.Response { | ||
public BulkResponse(int responseCode, String responseBody, String responseMessage) { | ||
super(responseCode, responseBody, responseMessage); | ||
} | ||
|
||
public boolean hasBadStatusCode() { | ||
return !(code == HttpURLConnection.HTTP_OK || code == HttpURLConnection.HTTP_CREATED); | ||
} | ||
|
||
public boolean hasFailedOperations() { | ||
// The OpenSearch Bulk API response body is JSON and contains a top-level "errors" field that indicates | ||
// whether any of the individual operations in the bulk request failed. Rather than marshalling the entire | ||
// response as JSON, just check for the string value. | ||
return body.contains("\"errors\":true"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leave a TODO to strengthen this so that you're looking in the right place or a comment as to why it works? You might already be immune to checking within a document if the documents are already json encoded/escaped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Left a comment; hope it's satisfactory. |
||
} | ||
|
||
public String getFailureMessage() { | ||
String failureMessage; | ||
if (hasBadStatusCode()) { | ||
failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body; | ||
} else { | ||
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; | ||
} | ||
|
||
return failureMessage; | ||
} | ||
} | ||
|
||
public final ConnectionDetails connectionDetails; | ||
private final RestClient client; | ||
|
||
public OpenSearchClient(ConnectionDetails connectionDetails) { | ||
this.connectionDetails = connectionDetails; | ||
this.client = new RestClient(connectionDetails); | ||
} | ||
|
||
/* | ||
* Create a legacy template if it does not already exist; return true if created, false otherwise. | ||
*/ | ||
public boolean createLegacyTemplateIdempotent(String templateName, ObjectNode settings){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Design Nit-pick: Separate use of idempotent and non-idempotent functionality with intermediate classes so the call looks like Along these lines, expose the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion; we'll see how the code develops. This isn't the only thing in the class I'm not feeling confident in so I expect there to be continuing changes to it. |
||
String targetPath = "_template/" + templateName; | ||
return createObjectIdempotent(targetPath, settings); | ||
} | ||
|
||
/* | ||
* Create a component template if it does not already exist; return true if created, false otherwise. | ||
*/ | ||
public boolean createComponentTemplateIdempotent(String templateName, ObjectNode settings){ | ||
String targetPath = "_component_template/" + templateName; | ||
return createObjectIdempotent(targetPath, settings); | ||
} | ||
|
||
/* | ||
* Create an index template if it does not already exist; return true if created, false otherwise. | ||
*/ | ||
public boolean createIndexTemplateIdempotent(String templateName, ObjectNode settings){ | ||
String targetPath = "_index_template/" + templateName; | ||
return createObjectIdempotent(targetPath, settings); | ||
} | ||
|
||
/* | ||
* Create an index if it does not already exist; return true if created, false otherwise. | ||
*/ | ||
public boolean createIndexIdempotent(String indexName, ObjectNode settings){ | ||
String targetPath = indexName; | ||
return createObjectIdempotent(targetPath, settings); | ||
} | ||
|
||
private boolean createObjectIdempotent(String objectPath, ObjectNode settings){ | ||
RestClient.Response response = client.get(objectPath, true); | ||
if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { | ||
client.put(objectPath, settings.toString(), false); | ||
return true; | ||
} else if (response.code == HttpURLConnection.HTTP_OK) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the right status code? It doesn't seem like it from the log line below There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be the correct status code, as it's for the GET not the PUT. |
||
logger.info(objectPath + " already exists. Skipping creation."); | ||
} else { | ||
logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation."); | ||
} | ||
return false; | ||
} | ||
|
||
public RestClient.Response registerSnapshotRepo(String repoName, ObjectNode settings){ | ||
String targetPath = "_snapshot/" + repoName; | ||
return client.put(targetPath, settings.toString(), false); | ||
} | ||
|
||
public RestClient.Response createSnapshot(String repoName, String snapshotName, ObjectNode settings){ | ||
String targetPath = "_snapshot/" + repoName + "/" + snapshotName; | ||
return client.put(targetPath, settings.toString(), false); | ||
} | ||
|
||
public RestClient.Response getSnapshotStatus(String repoName, String snapshotName){ | ||
String targetPath = "_snapshot/" + repoName + "/" + snapshotName; | ||
return client.get(targetPath, false); | ||
} | ||
|
||
public Mono<BulkResponse> sendBulkRequest(String indexName, String body) { | ||
String targetPath = indexName + "/_bulk"; | ||
|
||
return client.postAsync(targetPath, body) | ||
.map(response -> new BulkResponse(response.code, response.body, response.message)) | ||
.flatMap(responseDetails -> { | ||
if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) { | ||
logger.error(responseDetails.getFailureMessage()); | ||
return Mono.error(new RuntimeException(responseDetails.getFailureMessage())); | ||
} | ||
return Mono.just(responseDetails); | ||
}); | ||
} | ||
|
||
public RestClient.Response refresh() { | ||
String targetPath = "_refresh"; | ||
|
||
return client.get(targetPath, false); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning something for every future operation makes a ton of sense. Take a look at my PR to overhaul our async promises. Returning these so that they can be chained, even if just for monitoring is a good move.
I'm not familiar with the reactor client, but there may be a longer term question about whether you want/need to embrace reactor primitives vs more universal ones (like CompletableFuture), which we have significant diagnostic support for at the moment. Longer term (PR-614 switched out the netty future stuff for CompletableFutures, so it's a straightforward if borderline arduous task).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure myself, yet. I think there is a limit to how far up the call stack this stuff will propagate; I've hit that limit in some local changes I have that aren't in this PR. Definitely something to keep an eye on, agree!
Took a look at the referenced PR; don't think the RFS behavior is complex enough to need that level of abstraction. I could be proven wrong, but it feels so far like we can get by with just using the Reactor primitives?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a chain of 2-3 dependencies may be complex enough if you want to make sure that things aren't wedging forever or unintentionally holding up other calls. I'm not sure where you're at, but if you have a web-client, you'll have more than that. The reason to use something like TrackedFutures is that if you have > 1K activities per second, you're not really going to want to log enough details that if one goes wrong, you can find out what the issue is. That would be slow and expensive (in CPU/storage + user time). The approach that the replayer now takes is to keep those graphs around, then render only the ones that are becoming a problem.
It would be good to check the impact of TrackedFutures before putting it into more parts of the solution (it hasn't gone into the proxy yet).
This thread of comment came up because of the return type. If you're worried about how high your reactor client is going, you could convert a mono object to a CompletableFuture (like what is done in NettyFutureBinders).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've actually found how high up the Rector types go at this point; it's not as high as I feared and actually makes sense (to me, at least). You'll get a good look when I put out the next PR soon w/ the RfsWorker coded up to perform the Snapshot Phase.