-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFS] Refactored DocumentReindexer, HTTP client usage, added a OpenSearchClient #623
Conversation
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #623 +/- ##
============================================
- Coverage 75.93% 75.90% -0.04%
- Complexity 1496 1536 +40
============================================
Files 165 168 +3
Lines 6362 6395 +33
Branches 573 570 -3
============================================
+ Hits 4831 4854 +23
- Misses 1150 1167 +17
+ Partials 381 374 -7
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
|
||
logger.info("Shard reindexing completed"); | ||
int targetShardId = shardId; // Define in local context for the lambda |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super minor style thing - you can make this explicitly final and keep the same name other than some convention (like prepending the word 'final') to show that you're only doing it for the lambda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - done.
h.set("Authorization", "Basic " + encodedCredentials); | ||
} | ||
}); | ||
public static Mono<Void> reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning something for every future operation makes a ton of sense. Take a look at my PR to overhaul our async promises. Returning these so that they can be chained, even if just for monitoring is a good move.
I'm not familiar with the reactor client, but there may be a longer term question about whether you want/need to embrace reactor primitives vs more universal ones (like CompletableFuture), which we have significant diagnostic support for at the moment. Longer term (PR-614 switched out the netty future stuff for CompletableFutures, so it's a straightforward if borderline arduous task).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure myself, yet. I think there is a limit to how far up the call stack this stuff will propagate; I've hit that limit in some local changes I have that aren't in this PR. Definitely something to keep an eye on, agree!
Took a look at the referenced PR; don't think the RFS behavior is complex enough to need that level of abstraction. I could be proven wrong, but it feels so far like we can get by with just using the Reactor primitives?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a chain of 2-3 dependencies may be complex enough if you want to make sure that things aren't wedging forever or unintentionally holding up other calls. I'm not sure where you're at, but if you have a web-client, you'll have more than that. The reason to use something like TrackedFutures is that if you have > 1K activities per second, you're not really going to want to log enough details that if one goes wrong, you can find out what the issue is. That would be slow and expensive (in CPU/storage + user time). The approach that the replayer now takes is to keep those graphs around, then render only the ones that are becoming a problem.
It would be good to check the impact of TrackedFutures before putting it into more parts of the solution (it hasn't gone into the proxy yet).
This thread of comment came up because of the return type. If you're worried about how high your reactor client is going, you could convert a mono object to a CompletableFuture (like what is done in NettyFutureBinders).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've actually found how high up the Rector types go at this point; it's not as high as I feared and actually makes sense (to me, at least). You'll get a good look when I put out the next PR soon w/ the RfsWorker coded up to perform the Snapshot Phase.
.flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request | ||
.flatMap(bulkJson -> client.sendBulkRequest(indexName, bulkJson) // Send the request | ||
.doOnSuccess(unused -> logger.debug("Batch succeeded")) | ||
.doOnError(error -> logger.error("Batch failed", error)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll want to include some identifier so that you can determine the cause and impact of the failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the error message includes the response body of the failed POST. I think that should be sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed the error, sorry. If error is an exception, you could also do logger.atError(()->"Label").setCause(error).log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take another pass at all the logging stuff a bit later, if you don't mind; want to get the core functionality in place right now.
|
||
documentStream | ||
return documentStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reactor does look like a pretty nice and powerful library. I'm wondering if we should think about using it for the replayer, especially as we get more serious about retries. I'm wondering if it supports scheduling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it does support scheduling. It's a pub-sub model, and you can write your own subscriber model and pull from the publisher however you want. At least, that's my understanding from the docs.
@@ -9,14 +9,14 @@ public class FileSystemSnapshotCreator extends SnapshotCreator { | |||
private static final Logger logger = LogManager.getLogger(FileSystemSnapshotCreator.class); | |||
private static final ObjectMapper mapper = new ObjectMapper(); | |||
|
|||
private final ConnectionDetails connectionDetails; | |||
private final OpenSearchClient client; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what kind of client is this for - it's to upload source cluster data, source cluster metadata, or RFS metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite literally, a client to interact w/ OpenSearch as a layer above the RestClient. So, yes. We may have another client layer abstraction above it (such as the OpenSearchCmsClient class I have in local changes that uses an OpenSearchClient to interact w/ the Coordinating Metadata Store).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That tells me what the class is. I was asking about what this particular instance was being used for within this class. I was trying to understand how this class worked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it's used to take a snapshot from source cluster to the local filesytem. Should be self-evident from the class/method names? If not, open to suggestions for ways to make it more obvious.
|
||
// Report the results | ||
int responseCode = conn.getResponseCode(); | ||
public Response get(String path, boolean quietLogging) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do you use the blocking calls? Given that it looks like everything is async, why not go all the way :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the blocking calls where performance isn't important but ordering is - like whether to do a PUT based on the response from a GET.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm. I was expecting to see some of those be chained.
I'm not sure how the threading model works with reactor, but with a netty client, all IO for a given channel runs on one thread. Blocking on that thread can have profound, negative consequences (deadlock being one of them). It looks like that isn't the case here, but since netty is being used under the hood, I'm curious how its interacting with this level.
byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8)); | ||
String authHeaderValue = "Basic " + new String(encodedAuth); | ||
conn.setRequestProperty("Authorization", authHeaderValue); | ||
private void logResponse(Response response, boolean quietLogging) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be protected so that others can leverage the class with different policies on logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ripped it out in my local changes, consider this function gone.
} | ||
|
||
public boolean hasFailedOperations() { | ||
return body.contains("\"errors\":true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave a TODO to strengthen this so that you're looking in the right place or a comment as to why it works? You might already be immune to checking within a document if the documents are already json encoded/escaped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a comment; hope it's satisfactory.
client.put(objectPath, settings.toString(), false); | ||
return true; | ||
} else if (response.code == HttpURLConnection.HTTP_OK) { | ||
logger.warn(objectPath + " already exists. Skipping creation."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this was an idempotent operation, should this be a warning log message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're right, probably shouldn't be a WARN. Will update.
if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { | ||
client.put(objectPath, settings.toString(), false); | ||
return true; | ||
} else if (response.code == HttpURLConnection.HTTP_OK) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right status code? It doesn't seem like it from the log line below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be the correct status code, as it's for the GET not the PUT.
Signed-off-by: Chris Helma <[email protected]>
/* | ||
* Create a legacy template if it does not already exist; return true if created, false otherwise. | ||
*/ | ||
public boolean createLegacyTemplateIdempotent(String templateName, ObjectNode settings){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Design Nit-pick: Separate use of idempotent and non-idempotent functionality with intermediate classes so the call looks like client.idempotent().createLegacyTemplate(...)
vs client.stateful().createSnapshot(...)
Along these lines, expose the client.idempotent()
interface at certain levels of the project to be really sure [developers] aren't doing the wrong thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion; we'll see how the code develops. This isn't the only thing in the class I'm not feeling confident in so I expect there to be continuing changes to it.
.baseUrl(connectionDetails.url) | ||
.headers(h -> { | ||
h.add("Content-Type", "application/json"); | ||
if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Design Nitpick: We are going to need a better way to handle authentication in the future such as sigv4 and other methods. IMO this is the case for using some kind of preconstructed client for interacting with the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good for the sake of our users to try to keep as much unity in our command line options as possible, but I haven't thought of a good way to do that. I'm wary to try to mix and match different params into one object for a single Beust class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably; I think @gregschohn has some tasks created to look into auth. Not required for the current iteration so I didn't invest the time in it.
@@ -36,6 +39,7 @@ public Flux<Document> readDocuments(Path luceneFilesBasePath, String indexName, | |||
reader.close(); | |||
} catch (IOException e) { | |||
logger.error("Failed to close IndexReader", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can also write this as logger.atError().setCause(e).setMessage(()->"...").log().
From my read, when I've looked at this in the past, the multi-arg error function is a pretty loose convention that just happens to work with log4j2, but it isn't really an slf4j standard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do logging pass a bit later, want to get the core functionality in place first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a good starting point, happy to see this merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving on @peternied's behalf since he was not on codeowners/maintainers list at the time he approved.
Description
Issues Resolved
Testing
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.