Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFS] Refactored DocumentReindexer, HTTP client usage, added a OpenSearchClient #623

Merged
merged 4 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,10 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to create the snapshot...");
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
SnapshotCreator snapshotCreator = repo instanceof S3Repo
? new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region)
: new FileSystemSnapshotCreator(snapshotName, sourceConnection, snapshotLocalRepoDirPath.toString());
? new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region)
: new FileSystemSnapshotCreator(snapshotName, sourceClient, snapshotLocalRepoDirPath.toString());
snapshotCreator.registerRepo();
snapshotCreator.createSnapshot();
while (!snapshotCreator.isSnapshotFinished()) {
Expand Down Expand Up @@ -266,14 +267,15 @@ public static void main(String[] args) throws InterruptedException {
logger.info("==================================================================");
logger.info("Attempting to recreate the Global Metadata...");

OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
if (sourceVersion == ClusterVersion.ES_6_8) {
ObjectNode root = globalMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformGlobalMetadata(root);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, Collections.emptyList(), templateWhitelist);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, Collections.emptyList(), templateWhitelist);
} else if (sourceVersion == ClusterVersion.ES_7_10) {
ObjectNode root = globalMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformGlobalMetadata(root);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, componentTemplateWhitelist, templateWhitelist);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, componentTemplateWhitelist, templateWhitelist);
}
}

Expand Down Expand Up @@ -301,14 +303,15 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to recreate the indices...");
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
for (IndexMetadata.Data indexMetadata : indexMetadatas) {
String reindexName = indexMetadata.getName() + indexSuffix;
logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target...");

ObjectNode root = indexMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformIndexMetadata(root);
IndexMetadataData_OS_2_11 indexMetadataOS211 = new IndexMetadataData_OS_2_11(transformedRoot, indexMetadata.getId(), reindexName);
IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetConnection);
IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetClient);
}
}

Expand Down Expand Up @@ -358,12 +361,17 @@ public static void main(String[] args) throws InterruptedException {

Flux<Document> documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId);
String targetIndex = indexMetadata.getName() + indexSuffix;
DocumentReindexer.reindex(targetIndex, documents, targetConnection);

logger.info("Shard reindexing completed");
final int finalShardId = shardId; // Define in local context for the lambda
DocumentReindexer.reindex(targetIndex, documents, targetConnection)
.doOnError(error -> logger.error("Error during reindexing: " + error))
.doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + finalShardId))
// Wait for the shard reindexing to complete before proceeding; fine in this demo script, but
// shouldn't be done quite this way in the real RFS Worker.
.block();
}
}
logger.info("Refreshing newly added documents");
logger.info("Refreshing target cluster to reflect newly added documents");
DocumentReindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
}
Expand Down
93 changes: 13 additions & 80 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
@@ -1,47 +1,36 @@
package com.rfs.common;

import java.time.Duration;
import java.util.Base64;
import java.util.List;

import io.netty.buffer.Unpooled;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Document;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.util.retry.Retry;


public class DocumentReindexer {
private static final Logger logger = LogManager.getLogger(DocumentReindexer.class);
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen

public static void reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception {
String targetUrl = "/" + indexName + "/_bulk";
HttpClient client = HttpClient.create()
.host(targetConnection.hostName)
.port(targetConnection.port)
.headers(h -> {
h.set("Content-Type", "application/json");
if (targetConnection.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = targetConnection.username + ":" + targetConnection.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.set("Authorization", "Basic " + encodedCredentials);
}
});
public static Mono<Void> reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning something for every future operation makes a ton of sense. Take a look at my PR to overhaul our async promises. Returning these so that they can be chained, even if just for monitoring is a good move.
I'm not familiar with the reactor client, but there may be a longer term question about whether you want/need to embrace reactor primitives vs more universal ones (like CompletableFuture), which we have significant diagnostic support for at the moment. Longer term (PR-614 switched out the netty future stuff for CompletableFutures, so it's a straightforward if borderline arduous task).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure myself, yet. I think there is a limit to how far up the call stack this stuff will propagate; I've hit that limit in some local changes I have that aren't in this PR. Definitely something to keep an eye on, agree!

Took a look at the referenced PR; don't think the RFS behavior is complex enough to need that level of abstraction. I could be proven wrong, but it feels so far like we can get by with just using the Reactor primitives?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a chain of 2-3 dependencies may be complex enough if you want to make sure that things aren't wedging forever or unintentionally holding up other calls. I'm not sure where you're at, but if you have a web-client, you'll have more than that. The reason to use something like TrackedFutures is that if you have > 1K activities per second, you're not really going to want to log enough details that if one goes wrong, you can find out what the issue is. That would be slow and expensive (in CPU/storage + user time). The approach that the replayer now takes is to keep those graphs around, then render only the ones that are becoming a problem.

It would be good to check the impact of TrackedFutures before putting it into more parts of the solution (it hasn't gone into the proxy yet).

This thread of comment came up because of the return type. If you're worried about how high your reactor client is going, you could convert a mono object to a CompletableFuture (like what is done in NettyFutureBinders).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've actually found how high up the Rector types go at this point; it's not as high as I feared and actually makes sense (to me, at least). You'll get a good look when I put out the next PR soon w/ the RfsWorker coded up to perform the Snapshot Phase.

OpenSearchClient client = new OpenSearchClient(targetConnection);

documentStream
return documentStream
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reactor does look like a pretty nice and powerful library. I'm wondering if we should think about using it for the replayer, especially as we get more serious about retries. I'm wondering if it supports scheduling.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it does support scheduling. It's a pub-sub model, and you can write your own subscriber model and pull from the publisher however you want. At least, that's my understanding from the docs.

.map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation
.buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size
.doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request"))
.map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts
.flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request
.flatMap(bulkJson -> client.sendBulkRequest(indexName, bulkJson) // Send the request
.doOnSuccess(unused -> logger.debug("Batch succeeded"))
.doOnError(error -> logger.error("Batch failed", error))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll want to include some identifier so that you can determine the cause and impact of the failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the error message includes the response body of the failed POST. I think that should be sufficient?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed the error, sorry. If error is an exception, you could also do logger.atError(()->"Label").setCause(error).log

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take another pass at all the logging stuff a bit later, if you don't mind; want to get the core functionality in place right now.

.onErrorResume(e -> Mono.empty()) // Prevent the error from stopping the entire stream
)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)))
.subscribe(
response -> logger.info("Batch uploaded successfully"),
error -> logger.error("Failed to upload batch", error)
);
.doOnComplete(() -> logger.debug("All batches processed"))
.then();
}

private static String convertDocumentToBulkSection(Document document) {
Expand All @@ -53,72 +42,16 @@ private static String convertDocumentToBulkSection(Document document) {
}

private static String convertToBulkRequestBody(List<String> bulkSections) {
logger.info(bulkSections.size() + " documents in current bulk request");
StringBuilder builder = new StringBuilder();
for (String section : bulkSections) {
builder.append(section).append("\n");
}
return builder.toString();
}

private static Mono<Void> sendBulkRequest(HttpClient client, String url, String bulkJson) {
return client.post()
.uri(url)
.send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes())))
.responseSingle((res, content) ->
content.asString() // Convert the response content to a string
.map(body -> new BulkResponseDetails(res.status().code(), body)) // Map both status code and body into a response details object
)
.flatMap(responseDetails -> {
// Something bad happened with our request, log it
if (responseDetails.hasBadStatusCode()) {
logger.error(responseDetails.getFailureMessage());
}
// Some of the bulk operations failed, log it
else if (responseDetails.hasFailedOperations()) {
logger.error(responseDetails.getFailureMessage());
}
return Mono.just(responseDetails);
})
.doOnError(err -> {
// We weren't even able to complete the request, log it
logger.error("Bulk request failed", err);
})
.then();
}

public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception {
// Send the request
RestClient client = new RestClient(targetConnection);
client.get("_refresh", false);
}

static class BulkResponseDetails {
public final int statusCode;
public final String body;

BulkResponseDetails(int statusCode, String body) {
this.statusCode = statusCode;
this.body = body;
}

public boolean hasBadStatusCode() {
return !(statusCode == 200 || statusCode == 201);
}

public boolean hasFailedOperations() {
return body.contains("\"errors\":true");
}

public String getFailureMessage() {
String failureMessage;
if (hasBadStatusCode()) {
failureMessage = "Bulk request failed. Status code: " + statusCode + ", Response body: " + body;
} else {
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body;
}

return failureMessage;
}
OpenSearchClient client = new OpenSearchClient(targetConnection);
client.refresh();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ public class FileSystemSnapshotCreator extends SnapshotCreator {
private static final Logger logger = LogManager.getLogger(FileSystemSnapshotCreator.class);
private static final ObjectMapper mapper = new ObjectMapper();

private final ConnectionDetails connectionDetails;
private final OpenSearchClient client;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what kind of client is this for - it's to upload source cluster data, source cluster metadata, or RFS metadata?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite literally, a client to interact w/ OpenSearch as a layer above the RestClient. So, yes. We may have another client layer abstraction above it (such as the OpenSearchCmsClient class I have in local changes that uses an OpenSearchClient to interact w/ the Coordinating Metadata Store).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That tells me what the class is. I was asking about what this particular instance was being used for within this class. I was trying to understand how this class worked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's used to take a snapshot from source cluster to the local filesytem. Should be self-evident from the class/method names? If not, open to suggestions for ways to make it more obvious.

private final String snapshotName;
private final String snapshotRepoDirectoryPath;

public FileSystemSnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String snapshotRepoDirectoryPath) {
super(snapshotName, connectionDetails);
public FileSystemSnapshotCreator(String snapshotName, OpenSearchClient client, String snapshotRepoDirectoryPath) {
super(snapshotName, client);
this.snapshotName = snapshotName;
this.connectionDetails = connectionDetails;
this.client = client;
this.snapshotRepoDirectoryPath = snapshotRepoDirectoryPath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;

import lombok.Lombok;
import reactor.core.publisher.Flux;


public class LuceneDocumentsReader {
private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class);

Expand All @@ -36,6 +39,7 @@ public Flux<Document> readDocuments(Path luceneFilesBasePath, String indexName,
reader.close();
} catch (IOException e) {
logger.error("Failed to close IndexReader", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also write this as logger.atError().setCause(e).setMessage(()->"...").log().
From my read, when I've looked at this in the past, the multi-arg error function is a pretty loose convention that just happens to work with log4j2, but it isn't really an slf4j standard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do logging pass a bit later, want to get the core functionality in place first.

Lombok.sneakyThrow(e);
}
}
);
Expand All @@ -61,7 +65,7 @@ protected Document getDocument(IndexReader reader, int docId) {
}
if (source_bytes == null || source_bytes.bytes.length == 0) {
logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled");
return null; // Skip deleted documents or those without the _source field
return null; // Skip these too
}

logger.debug("Document " + id + " read successfully");
Expand Down
130 changes: 130 additions & 0 deletions RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package com.rfs.common;

import java.net.HttpURLConnection;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.fasterxml.jackson.databind.node.ObjectNode;

import reactor.core.publisher.Mono;

public class OpenSearchClient {
private static final Logger logger = LogManager.getLogger(OpenSearchClient.class);

public static class BulkResponse extends RestClient.Response {
public BulkResponse(int responseCode, String responseBody, String responseMessage) {
super(responseCode, responseBody, responseMessage);
}

public boolean hasBadStatusCode() {
return !(code == HttpURLConnection.HTTP_OK || code == HttpURLConnection.HTTP_CREATED);
}

public boolean hasFailedOperations() {
// The OpenSearch Bulk API response body is JSON and contains a top-level "errors" field that indicates
// whether any of the individual operations in the bulk request failed. Rather than marshalling the entire
// response as JSON, just check for the string value.
return body.contains("\"errors\":true");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a TODO to strengthen this so that you're looking in the right place or a comment as to why it works? You might already be immune to checking within a document if the documents are already json encoded/escaped.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment; hope it's satisfactory.

}

public String getFailureMessage() {
String failureMessage;
if (hasBadStatusCode()) {
failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body;
} else {
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body;
}

return failureMessage;
}
}

public final ConnectionDetails connectionDetails;
private final RestClient client;

public OpenSearchClient(ConnectionDetails connectionDetails) {
this.connectionDetails = connectionDetails;
this.client = new RestClient(connectionDetails);
}

/*
* Create a legacy template if it does not already exist; return true if created, false otherwise.
*/
public boolean createLegacyTemplateIdempotent(String templateName, ObjectNode settings){
Copy link
Member

@peternied peternied May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design Nit-pick: Separate use of idempotent and non-idempotent functionality with intermediate classes so the call looks like client.idempotent().createLegacyTemplate(...) vs client.stateful().createSnapshot(...)

Along these lines, expose the client.idempotent() interface at certain levels of the project to be really sure [developers] aren't doing the wrong thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion; we'll see how the code develops. This isn't the only thing in the class I'm not feeling confident in so I expect there to be continuing changes to it.

String targetPath = "_template/" + templateName;
return createObjectIdempotent(targetPath, settings);
}

/*
* Create a component template if it does not already exist; return true if created, false otherwise.
*/
public boolean createComponentTemplateIdempotent(String templateName, ObjectNode settings){
String targetPath = "_component_template/" + templateName;
return createObjectIdempotent(targetPath, settings);
}

/*
* Create an index template if it does not already exist; return true if created, false otherwise.
*/
public boolean createIndexTemplateIdempotent(String templateName, ObjectNode settings){
String targetPath = "_index_template/" + templateName;
return createObjectIdempotent(targetPath, settings);
}

/*
* Create an index if it does not already exist; return true if created, false otherwise.
*/
public boolean createIndexIdempotent(String indexName, ObjectNode settings){
String targetPath = indexName;
return createObjectIdempotent(targetPath, settings);
}

private boolean createObjectIdempotent(String objectPath, ObjectNode settings){
RestClient.Response response = client.get(objectPath, true);
if (response.code == HttpURLConnection.HTTP_NOT_FOUND) {
client.put(objectPath, settings.toString(), false);
return true;
} else if (response.code == HttpURLConnection.HTTP_OK) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right status code? It doesn't seem like it from the log line below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be the correct status code, as it's for the GET not the PUT.

logger.info(objectPath + " already exists. Skipping creation.");
} else {
logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation.");
}
return false;
}

public RestClient.Response registerSnapshotRepo(String repoName, ObjectNode settings){
String targetPath = "_snapshot/" + repoName;
return client.put(targetPath, settings.toString(), false);
}

public RestClient.Response createSnapshot(String repoName, String snapshotName, ObjectNode settings){
String targetPath = "_snapshot/" + repoName + "/" + snapshotName;
return client.put(targetPath, settings.toString(), false);
}

public RestClient.Response getSnapshotStatus(String repoName, String snapshotName){
String targetPath = "_snapshot/" + repoName + "/" + snapshotName;
return client.get(targetPath, false);
}

public Mono<BulkResponse> sendBulkRequest(String indexName, String body) {
String targetPath = indexName + "/_bulk";

return client.postAsync(targetPath, body)
.map(response -> new BulkResponse(response.code, response.body, response.message))
.flatMap(responseDetails -> {
if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) {
logger.error(responseDetails.getFailureMessage());
return Mono.error(new RuntimeException(responseDetails.getFailureMessage()));
}
return Mono.just(responseDetails);
});
}

public RestClient.Response refresh() {
String targetPath = "_refresh";

return client.get(targetPath, false);
}
}
Loading
Loading