-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline: use notify instead of polling for AggregateRestoreSourceOp
#9082
Conversation
/cc @windtalker @gengliqi |
@@ -66,7 +82,7 @@ void SharedSpilledBucketDataLoader::storeBucketData() | |||
RUNTIME_CHECK(status == SharedLoaderStatus::loading); | |||
|
|||
// Although the status will always stay at `SharedLoaderStatus::loading`, but because the query has stopped, it doesn't matter. | |||
if unlikely (exec_context.isCancelled()) | |||
if (checkCancelled()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why SharedQuery
and ResultQueue
need to be cancelled explicitly in PipelineExecutorContext::cancel
, but this don't need to be cancelled in PipelineExecutorContext::cancel
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it will not be stuck forever, when the query is canceled, checkCancel will always be called, because wait_for_notify must be because the queue is empty. When the queue is empty, it will trigger an asynchronous load bucket event, and the load bucket event will call checkCancel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If trigger an asynchronous load bucket event
fails, will it still work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, Event::finishImpl
is always called, regardless of failure or cancellation
void LoadBucketEvent::finishImpl() | |
{ | |
assert(loader); | |
loader->storeBucketData(); | |
loader.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I want to confirm another unrelated question here, why WorkQueue
in this pr don't need to be cancelled explicitly in PipelineExecutorContext::cancel
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When query meet error, the storage layer is not informed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the storage layer is not explicitly canceled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, the query will not be stuck, because the storage layer will continuously generate blocks, which will trigger checkCancel, thus ending the query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storage layer generate block => pipeline task is notified and finishes since it found the query is cancelled => ~UnorderedSourceOp()
is called, and in side ~UnorderedSourceOp()
, it will call task_pool->decreaseUnorderedInputStreamRefCount()
, which eventually stop the storage read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storage layer generate block => pipeline task is notified and finishes since it found the query is cancelled =>
~UnorderedSourceOp()
is called, and in side~UnorderedSourceOp()
, it will calltask_pool->decreaseUnorderedInputStreamRefCount()
, which eventually stop the storage read?
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: gengliqi, windtalker The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
What problem does this PR solve?
Issue Number: ref #8869
Problem Summary:
What is changed and how it works?
Check List
Tests
Side effects
Documentation
Release note