Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: use notify instead of polling for ExchangeReceiver #9073

Merged
merged 40 commits into from
Sep 27, 2024
Merged
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
fa590d8
add notify_future for mpptunnel
SeaRise May 22, 2024
62d3add
add for writer
SeaRise May 22, 2024
e7d8c9b
for exchange sender
SeaRise May 22, 2024
2ffee76
remove writable
SeaRise May 22, 2024
f76b7bd
Merge branch 'master' into notify_for_sender
SeaRise May 22, 2024
563573b
try fmt
SeaRise May 22, 2024
e1e920c
refine ut
SeaRise May 22, 2024
2f258ae
for receiver
SeaRise May 22, 2024
f8e1dc6
u
SeaRise May 22, 2024
e2adf5e
fix
SeaRise May 22, 2024
c5655c6
format
Lloyd-Pottiger May 23, 2024
da81c3c
Update gtest_mpptunnel.cpp
SeaRise May 23, 2024
249cfa9
Merge branch 'master' into notify_for_sender
SeaRise May 23, 2024
2c12e6c
Merge branch 'master' into notify_for_receiver
SeaRise May 23, 2024
03dd7b6
fix comment
SeaRise May 27, 2024
d2814b8
Merge branch 'master' into notify_for_sender
SeaRise May 30, 2024
c0d784e
Merge branch 'master' into notify_for_receiver
SeaRise May 30, 2024
d9640df
Merge branch 'master' into notify_for_sender
SeaRise Jun 5, 2024
df23506
fix may hang issue
SeaRise Jun 5, 2024
0c43a11
some fix
SeaRise Jun 5, 2024
94d954b
fix ut
SeaRise Jun 5, 2024
dd1316f
refine err msg
SeaRise Jun 5, 2024
22ea753
Merge branch 'notify_for_sender' into notify_for_receiver
SeaRise Jun 5, 2024
902291b
add cancel
SeaRise Jun 5, 2024
4ddcf98
Merge branch 'master' into notify_for_sender
SeaRise Jun 6, 2024
95fdb7f
Merge branch 'master' into notify_for_receiver
SeaRise Jun 6, 2024
ee4ad65
Merge branch 'notify_for_sender' into notify_for_receiver
SeaRise Jun 6, 2024
7f878ce
u
SeaRise Jun 6, 2024
d4eac21
Merge branch 'master' into notify_for_receiver
SeaRise Aug 26, 2024
ec2838f
Merge branch 'master' into notify_for_receiver
SeaRise Sep 2, 2024
bd66511
Update ReceivedMessageQueue.cpp
SeaRise Sep 4, 2024
d13644d
Update DAGContext.h
SeaRise Sep 18, 2024
1f2423f
Update PipelineExecutorContext.cpp
SeaRise Sep 18, 2024
8da6615
Merge branch 'master' into notify_for_receiver
windtalker Sep 23, 2024
b38899f
Merge branch 'master' into notify_for_receiver
gengliqi Sep 27, 2024
c6033a8
update
gengliqi Sep 27, 2024
bb18d78
update
gengliqi Sep 27, 2024
c85801b
update name
gengliqi Sep 27, 2024
275134b
Merge branch 'master' into notify_for_receiver
gengliqi Sep 27, 2024
af512d3
fix
gengliqi Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
for exchange sender
SeaRise committed May 22, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit e7d8c9be22c02ab2127651f7530d4c67f94b355a
14 changes: 12 additions & 2 deletions dbms/src/Operators/ExchangeSenderSinkOp.cpp
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
// limitations under the License.

#include <Operators/ExchangeSenderSinkOp.h>
#include "Flash/Coprocessor/WaitResult.h"

namespace DB
{
@@ -41,12 +42,21 @@ OperatorStatus ExchangeSenderSinkOp::writeImpl(Block && block)

OperatorStatus ExchangeSenderSinkOp::prepareImpl()
{
return writer->isWritable() ? OperatorStatus::NEED_INPUT : OperatorStatus::WAITING;
return awaitImpl();
}

OperatorStatus ExchangeSenderSinkOp::awaitImpl()
{
return writer->isWritable() ? OperatorStatus::NEED_INPUT : OperatorStatus::WAITING;
auto res = writer->waitForWritable();
switch (res)
{
case WaitResult::Ready:
return OperatorStatus::NEED_INPUT;
case WaitResult::WaitForPolling:
return OperatorStatus::WAITING;
case WaitResult::WaitForNotify:
return OperatorStatus::WAIT_FOR_NOTIFY;
}
}

} // namespace DB