-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change text embedding processor to async mode for better isolation #27
Change text embedding processor to async mode for better isolation #27
Conversation
fbef895
to
9892a78
Compare
@zane-neo Please add brief description to PR and/or issue this relates to. |
@zane-neo can you please add, some more details like when client will be sending the bulk request what would be the flow? is there anything changing? Its good that we are moving towards async just want to understand the flow. |
@zane-neo please resolve the conflicts and also can we check why the checks are failing? as per the last commit all the checks were passing. You might want to do some rebasing. |
return ingestDocument; | ||
} | ||
|
||
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an override function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is an override function, added the override annotation also added proper java doc on this method.
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Show resolved
Hide resolved
} catch (Exception e) { | ||
handler.accept(null, e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which function is throwing the exception which we are catching here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
underlying validateEmbeddingFieldsValue
is throwing IllegalArgumentException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the exception is coming from this function only, add the exception around that function only and take out predict API call from the try block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked again, not only the method validateEmbeddingFieldsValue
is throwing exception but also the mlClient.predict
, and the listener.onFailure is not invoked when exception arise, this exception could populate to execute(IngestDocument, BiConsumer)
, if not catching it, this will further populate to upper methods which could impact the pipeline execution, this is what we don't want. Also the parent method also use try catch to catch all the exceptions please refer to: https://github.com/opensearch-project/OpenSearch/blob/8c9ca4e858e6333265080972cf57809dbc086208/server/src/main/java/org/opensearch/ingest/Processor.java#L64.
4cebd9b
to
55447d9
Compare
Signed-off-by: Zan Niu <[email protected]>
5c58236
to
b9c740f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few minor comments, but overall it looks good.
} | ||
|
||
/** | ||
* When received a bulk indexing request, the pipeline will be executed in the <a href="https://github.com/opensearch-project/OpenSearch/blob/8fda187bb459757164cc80c91ca305d274ca2b53/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java#L226">doInternalExecute</a> method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Why this commit 8fda187bb459757164cc80c91ca305d274ca2b53 in the link?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this.
throw new RuntimeException("Text embedding processor failed with exception", e); | ||
validateEmbeddingFieldsValue(ingestDocument); | ||
Map<String, Object> knnMap = buildMapWithKnnKeyAndOriginalValue(ingestDocument); | ||
mlCommonsClientAccessor.inferenceSentences(this.modelId, createInferenceList(knnMap), ActionListener.wrap(x -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we keep name "vectors" instead of "x"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, change to vectors
* @param ingestDocument | ||
* @param handler | ||
*/ | ||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, could we refactor this method to batch calls to the model so we can improve throughput?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jmazanec15 what kind of batching you are suggesting here? Is it like batching the write or batching the inference calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's doable, but two things we need to consider.
- Effort, changing to bulk we need more effort and we need to consider a proper threshold either a batch number of linger time, and we need to consider if we need to expose these settings to user.
- Benefit, in a cluster each two nodes have TCP connection keeping alive, there's no overhead and the network time consuming is relative low comparing with CPU consuming. Based on the performance testing, the bottle neck is the CPU resource instead of network IO. So changing to batch may not increase performance dramatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zane-neo That makes sense. It might make sense to create a proof of concept test in future to see what the benefit would be.
@navneet1v the idea would be to batch the inference calls. For the async action, instead of calling inference directly, submit the update to some kind of queue that will then create 1 large request for multiple docs and call the handlers once it completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zane-neo That makes sense. It might make sense to create a proof of concept test in future to see what the benefit would be.
@navneet1v the idea would be to batch the inference calls. For the async action, instead of calling inference directly, submit the update to some kind of queue that will then create 1 large request for multiple docs and call the handlers once it completes.
I am not like 100% convinced on the idea of batching and the reason behind that is batching the data and making 1 call, will help if the number of threads are less at the ML node level and multiple requests are competing for the threads(api call threads).
Secondly, for creating the batch we need to put the data in a sync queue, that will further slow down the processing.
Third, batches create problem where 1 single input sentence can delay all the documents processing.
But we can discuss further on this, but before even doing the POC, we should first do some deep-dive on the ML-Commons code to know if batching can really help or not.
/** | ||
* When received a bulk indexing request, the pipeline will be executed in the <a href="https://github.com/opensearch-project/OpenSearch/blob/8fda187bb459757164cc80c91ca305d274ca2b53/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java#L226">doInternalExecute</a> method | ||
* Before the pipeline execution, the pipeline will be marked as resolved (means executed), and then this overriding method will be invoked when executing the text embedding processor. | ||
* After the inference completes, the handler will invoke the doInternalExecute method again to run actual write operation. | ||
* @param ingestDocument | ||
* @param handler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve the documentation to explain what this function is doing not that what the whole pipeline is doing, and try to remove the commit ids from the java doc and point to right function using @link of java doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, add the pipeline execution in the method body.
Signed-off-by: Zan Niu <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failure https://github.com/opensearch-project/neural-search/actions/runs/3328176376/jobs/5504257477 is unrelated to this PR. It appears to be related to ml-commons permission issue. Approving.
* @param ingestDocument | ||
* @param handler | ||
*/ | ||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zane-neo That makes sense. It might make sense to create a proof of concept test in future to see what the benefit would be.
@navneet1v the idea would be to batch the inference calls. For the async action, instead of calling inference directly, submit the update to some kind of queue that will then create 1 large request for multiple docs and call the handlers once it completes.
* Change text embedding processor to async mode Signed-off-by: Zan Niu <[email protected]> * Address review comments Signed-off-by: Zan Niu <[email protected]> Signed-off-by: Zan Niu <[email protected]> (cherry picked from commit d538ad1)
The backport to
To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-2.x 2.x
# Navigate to the new working tree
cd .worktrees/backport-2.x
# Create a new branch
git switch --create backport/backport-27-to-2.x
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 d538ad14679216eb91095143d4237ce58376b790
# Push it to GitHub
git push --set-upstream origin backport/backport-27-to-2.x
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-2.x Then, create a pull request where the |
The backport to
To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-2.x 2.x
# Navigate to the new working tree
cd .worktrees/backport-2.x
# Create a new branch
git switch --create backport/backport-27-to-2.x
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 d538ad14679216eb91095143d4237ce58376b790
# Push it to GitHub
git push --set-upstream origin backport/backport-27-to-2.x
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-2.x Then, create a pull request where the |
The backport to
To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-2.x 2.x
# Navigate to the new working tree
cd .worktrees/backport-2.x
# Create a new branch
git switch --create backport/backport-27-to-2.x
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 d538ad14679216eb91095143d4237ce58376b790
# Push it to GitHub
git push --set-upstream origin backport/backport-27-to-2.x
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-2.x Then, create a pull request where the |
The backport to
To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-2.x 2.x
# Navigate to the new working tree
cd .worktrees/backport-2.x
# Create a new branch
git switch --create backport/backport-27-to-2.x
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 d538ad14679216eb91095143d4237ce58376b790
# Push it to GitHub
git push --set-upstream origin backport/backport-27-to-2.x
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-2.x Then, create a pull request where the |
…pensearch-project#27) * Change text embedding processor to async mode Signed-off-by: Zan Niu <[email protected]> Signed-off-by: Navneet Verma <[email protected]>
…) (#46) Signed-off-by: Zan Niu <[email protected]> Signed-off-by: Navneet Verma <[email protected]>
Description
Changed text embedding processor to async mode when handling user input.
Previously since inferencing runs in async mode, and overriding only the
execute(IngestDocument)
method TextEmbeddingProcessor needs a blocking approach to make sure the document enrichment is complete and then the indexing happens. This has a drawback that the blocking happens in thewrite
thread pool, this thread pool has onlyavailableProcessors + 1
threads, and this could have impact on non text embedding indexing.Changing this to async mode by overriding
execute(IngestDocument, BiConsumer)
method, thus threads inwrite
thread pool will not be blocked and this has better isolation for non text embedding indexing.Issues Resolved
An enhancement without issue resolve.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.