-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transform] decouple task and indexer #48567
[Transform] decouple task and indexer #48567
Conversation
Pinging @elastic/ml-core (:ml/Transform) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor things.
One overall question:
Why aren't indexerState
and indexerPosition
part of the context? Both the parent task and the internal indexer refer to them.
...rm/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java
Outdated
Show resolved
Hide resolved
...rm/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java
Outdated
Show resolved
Hide resolved
...rm/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java
Outdated
Show resolved
Hide resolved
"task encountered more than " + transformTask.getNumFailureRetries() + " failures; latest failure: " + e.getMessage(); | ||
if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > context.getNumFailureRetries()) { | ||
String failureMessage = isIrrecoverableFailure(e) | ||
? "task encountered irrecoverable failure: " + e.getMessage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is how our new formatting is (putting ?
on the next line), I don't like it.
...n/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java
Outdated
Show resolved
Hide resolved
...n/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java
Outdated
Show resolved
Hide resolved
e); | ||
if (handleCircuitBreakingException(e)) { | ||
return; | ||
if (shouldStopAtCheckpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having this at the end of doSaveState
does not make sense. This means that if shouldStopAtCheckpoint
is true, then we will stop at the end of a doSaveState
call. But, doSaveState
can be called once we process 50 pages and is no indication of actually finishing a checkpoint or not.
In short, how do you know this is the doSaveState
after a checkpoint has finished?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not aware that I changed the logic, maybe this is a unclean merge? Will check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The do private doSaveState: https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java#L413
There are no checks for shouldStopAtCheckpoint
. It might be a weird merge for sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a look at #48591 again and compared the code paths. It should work again.
The merge problem was that I pulled from master when onStart/onFinish was already moved into the TransformIndexer class. git produced this weird line, actually it had it even twice.
I moved shouldStopAtCheckpoint
into the context object, I am not fully satisfied how it gets set when a transform is loaded, but decided to keep this as a todo for later. This PR is already very complex.
// This indicates an early exit since no changes were found. | ||
// So, don't treat this like a checkpoint being completed, as no work was done. | ||
if (hasSourceChanged == false) { | ||
listener.onResponse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless there is another onFinish
that I missed, it seems we are no longer stopping if shouldStopAtCheckpoint
is true here. That is a problem in the following scenario:
- Indexer kicked off to check for changes (state changed to Indexing from started)
- While the state is
Indexing
somebody called_stop?wait_for_checkpoint=true
- Stop sets
wait_for_checkpoint
to be true, and does not call stop (Indexer state is still INDEXING) - OnFinish is called with no changes and the indexer changes to started
- Never stops even though
wait_for_checkpoint
is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good spot, the merge broke it.
...n/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java
Show resolved
Hide resolved
...n/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java
Show resolved
Hide resolved
Both are members of Having that said, there might be more stuff to be moved around, it's probably not perfect but in my opinion it's now less confusing and a cleaner structure. No cyclic dependency anymore. |
Co-Authored-By: Benjamin Trent <[email protected]>
run elasticsearch-ci/packaging-sample-matrix |
decouple TransformTask and ClientTransformIndexer. Interaction between the 2 classes are now moved into a context class which holds shared information. relates elastic#45369
decouple TransformTask and ClientTransformIndexer. Interaction between the 2 classes are now moved into a context class which holds shared information.
relates #45369