-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFS now uses reactor-netty for bulk indexing #607
Conversation
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
protocol = null; | ||
} else { | ||
// Parse the URL to get the protocol, host name, and port | ||
String[] urlParts = url.split("://"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use java.net.URI
for
if (url == null) {
hostName = null;
port = -1;
protocol = null;
} else {
try {
URI uri = new URI(url);
hostName = uri.getHost();
port = uri.getPort();
protocol = uri.getScheme();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid URL format", e);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, interesting - this makes sense. I was thinking about adding regex checking for the user inputs at beginning of too, but defense-in-depth is a good approach.
|
||
public static void reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception { | ||
String targetUrl = "/" + indexName + "/_bulk"; | ||
HttpClient client = HttpClient.create() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we split the http client into a separate class which may get reused by different operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer not to at this point, actually. While I would historically agree with you, I'm trying out a new approach on this project and been really happy with how it has worked out. Specifically - avoiding being too speculative about abstractions and letting the needs of the project shape what gets created. In this case, we only have one thing that needs this reactor-netty client, and I honestly don't know what interface I would provide if I were to carve it out because I don't know how another potential part of the code might use it. Avoiding speculation on past abstractions in this project's history has been one of the key things that has enabled me to make so much progress so fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if you need a separate HttpClient interface yet, but I do think that it might help & in general, you'll want to look to the future and not think about the past.
From my view, you've got some leaky abstractions with a couple other needless Flux contaminations within your codebase.
Once you do that, it will become harder to test your code too (less tests help us write application code faster too). If you want to write test code fast too - keep it as generic as you can with the cleanest interfaces that you can strive for. Simpler pieces -> smoother integrations -> faster delivery of quality solutions.
// Assemble the request details | ||
String path = indexName + "/_doc/" + id; | ||
String body = source; | ||
private static String convertToBulkJson(List<String> bulkSections) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the function name...BulkJson
was a bit confusing since it's just delimited jsons, maybe convertToBulkBody
or convertToDelimitedJsons`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
return null; // Skip documents with missing id | ||
} | ||
if (source_bytes == null || source_bytes.bytes.length == 0) { | ||
logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be better suited for info
if this expected for deleted documents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I felt (and I guess still feel) that warn
is probably the right level. It's something that we should highlight the occurence of without being an error
, per se.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which one is it - can you tell the difference? Could the reader of the log tell the difference? Is there something in the beginning of the log that would give the user a clue?
If _source wasn't enabled, could this flood the logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any chance that docId could have PII in it? The docId could be customer generated, right? Or are they only internal ids that are separately mapped to the customer-given ones?
If they're customer driven, I'd push this to debug to promote the policy that no PII could be shown for INFO and above logs. This feels like it isn't a great spot to be in. I'm hoping that there's a way to show an identifier without risking divulging a customer value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which one is it - can you tell the difference?
I am not currently aware of how to tell the difference. We have a task to look into this more (see: https://opensearch.atlassian.net/browse/MIGRATIONS-1629)
Is there any chance that docId could have PII in it
The docId
is an integer value used by Lucene to tell which Lucene Document in the Lucene Index is being referred to. The _id
field of the Lucene Document is a user-set alphanumeric string, and so can contain whatever the user wants it to.
Regarding PII - that's a larger discussion for the team to have. I'll book a timeslot to discuss as a reminder.
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #607 +/- ##
============================================
+ Coverage 75.91% 75.93% +0.02%
- Complexity 1491 1496 +5
============================================
Files 162 165 +3
Lines 6348 6362 +14
Branches 572 573 +1
============================================
+ Hits 4819 4831 +12
+ Misses 1152 1149 -3
- Partials 377 382 +5
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Chris Helma <[email protected]>
return Flux.range(0, reader.maxDoc()) // Extract all the Documents in the IndexReader | ||
.handle((i, sink) -> { | ||
Document doc = getDocument(reader, i); | ||
if (doc != null) { // Skip malformed docs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should at least log when doc == null (or whatever malformed documents that you might be skipping).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We log that in getDocument()
continue; | ||
reader.close(); | ||
} catch (IOException e) { | ||
logger.error("Failed to close IndexReader", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it's probably a really bad exception. Why should the program keep running?
This seems like a spot where throw Lombok.sneakyThrow(e)
would be a better option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question; probably does make sense to kill the process at this point. I realized just now that Reactor was unhappy with the fact that a checked exception was being thrown but I totally could have thrown an unchecked exception here or something.
return null; // Skip documents with missing id | ||
} | ||
if (source_bytes == null || source_bytes.bytes.length == 0) { | ||
logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which one is it - can you tell the difference? Could the reader of the log tell the difference? Is there something in the beginning of the log that would give the user a clue?
If _source wasn't enabled, could this flood the logs?
return null; // Skip documents with missing id | ||
} | ||
if (source_bytes == null || source_bytes.bytes.length == 0) { | ||
logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any chance that docId could have PII in it? The docId could be customer generated, right? Or are they only internal ids that are separately mapped to the customer-given ones?
If they're customer driven, I'd push this to debug to promote the policy that no PII could be shown for INFO and above logs. This feels like it isn't a great spot to be in. I'm hoping that there's a way to show an identifier without risking divulging a customer value.
StringBuilder errorMessage = new StringBuilder(); | ||
errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); | ||
document.getFields().forEach(f -> errorMessage.append(f.name()).append(", ")); | ||
logger.error(errorMessage.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.atError().setCause(e).setMessage(()->...).log()
will do two more things for you. 1) get the exception and its backtrace into the logs and 2) use the fluent style, where everything within '...' will only be evaluated when you're logging that level. It can make your log statements tighter (all one statement rather than 4 as they are here) and much more efficient since work can often be elided. Even if you stay at warn/error, I'd like to routinely filter the repo for usages of immediate logging because its performance hit can be the single greatest impact on a program.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider PII for ERROR. I think that it's fair, but you should call it out... maybe PII possible loggers should have their own logger name convention so that operators could easily mask them out if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.atError().setCause(e).setMessage(()->...).log()
Cool - will look into that for the future.
Consider PII for ERROR
I think we need to have a larger discussion around stuff like PII concerns, because I suspect they will impact many aspects of the implementation if we're designing to address them up front.
|
||
public class LuceneDocumentsReader { | ||
private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class); | ||
|
||
public static List<Document> readDocuments(Path luceneFilesBasePath, String indexName, int shardId) throws Exception { | ||
public Flux<Document> readDocuments(Path luceneFilesBasePath, String indexName, int shardId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does your LuceneDocumentReader now take a hard dependency on your HTTP client library?
It might be better to make this a collection or stream & then adapt later so that you can switch client implementations out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, but it seems like this is how the Reactor framework wants to be used. I can see both the LuceneDocumentsReader
and DocumentReindexer
classes being implementation specific. So far it's paid off for me in this project not to speculate on stuff like this until there's a specific need.
|
||
|
||
public class DocumentReindexer { | ||
private static final Logger logger = LogManager.getLogger(DocumentReindexer.class); | ||
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen | ||
|
||
public static void reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question from above - why should this take a Flux in? What would be lost/what would the impact be if you took in a stream and adapted it within this method?
|
||
public static void reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception { | ||
String targetUrl = "/" + indexName + "/_bulk"; | ||
HttpClient client = HttpClient.create() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if you need a separate HttpClient interface yet, but I do think that it might help & in general, you'll want to look to the future and not think about the past.
From my view, you've got some leaky abstractions with a couple other needless Flux contaminations within your codebase.
Once you do that, it will become harder to test your code too (less tests help us write application code faster too). If you want to write test code fast too - keep it as generic as you can with the cleanest interfaces that you can strive for. Simpler pieces -> smoother integrations -> faster delivery of quality solutions.
@@ -34,7 +34,7 @@ public RestClient(ConnectionDetails connectionDetails) { | |||
} | |||
|
|||
public Response get(String path, boolean quietLogging) throws Exception { | |||
String urlString = connectionDetails.host + "/" + path; | |||
String urlString = connectionDetails.url + "/" + path; | |||
|
|||
URL url = new URL(urlString); | |||
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the plan is to deprecate this class, use the @deprecate annotation for it (before class RestClient
) so that we know that the plan is to rally all of the code around one HTTP client solution. As it is, it's pretty confusing with 2 different clients within one codebase/PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether we want to deprecate this class or not in the long run. I would assume so, but honestly the only place we really need to use the greater abilities of the reactor-netty client is for reindexing; this is fine elsewhere. For that reason, I left this in place for the time being.
doc5.add(new StringField("_id", new BytesRef(encodeUtf8Id("id5")), Field.Store.YES)); | ||
|
||
// Set up our mock reader | ||
IndexReader mockReader = mock(IndexReader.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's sync-up on mockito. I wonder if this could have been clearer and tighter without mockito.
* Checkpoint: improved ConnectionDetails; unit tested it Signed-off-by: Chris Helma <[email protected]> * RFS now uses reactor-netty and bulk indexing Signed-off-by: Chris Helma <[email protected]> * Fixes per PR; unit tested LuceneDocumentsReader Signed-off-by: Chris Helma <[email protected]> * Updated a unit test name Signed-off-by: Chris Helma <[email protected]> * Updated a method name per PR feedback Signed-off-by: Chris Helma <[email protected]> --------- Signed-off-by: Chris Helma <[email protected]>
* Checkpoint: improved ConnectionDetails; unit tested it Signed-off-by: Chris Helma <[email protected]> * RFS now uses reactor-netty and bulk indexing Signed-off-by: Chris Helma <[email protected]> * Fixes per PR; unit tested LuceneDocumentsReader Signed-off-by: Chris Helma <[email protected]> * Updated a unit test name Signed-off-by: Chris Helma <[email protected]> * Updated a method name per PR feedback Signed-off-by: Chris Helma <[email protected]> --------- Signed-off-by: Chris Helma <[email protected]>
* Checkpoint: improved ConnectionDetails; unit tested it Signed-off-by: Chris Helma <[email protected]> * RFS now uses reactor-netty and bulk indexing Signed-off-by: Chris Helma <[email protected]> * Fixes per PR; unit tested LuceneDocumentsReader Signed-off-by: Chris Helma <[email protected]> * Updated a unit test name Signed-off-by: Chris Helma <[email protected]> * Updated a method name per PR feedback Signed-off-by: Chris Helma <[email protected]> --------- Signed-off-by: Chris Helma <[email protected]>
Description
_id
)Issues Resolved
Testing
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.