Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei committed Dec 17, 2024
1 parent 5411659 commit b08ec78
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,14 @@ Status Channel::open(RuntimeState* state) {
auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr);
if (!st.ok()) {
// Recvr not found. Maybe downstream task is finished already.
LOG(INFO) << "Recvr is not found : " << st.to_string();
// If could not find local receiver, then it means the channel is EOF.
// Maybe downstream task is finished already.
if (_receiver_status.ok()) {
_receiver_status = Status::EndOfFile("local data stream receiver is deconstructed");
}
LOG(INFO) << "Query: " << print_id(state->query_id())
<< " recvr is not found, maybe downstream task is finished. error st is: "
<< st.to_string();
}
}
_be_number = state->be_number();
Expand Down Expand Up @@ -197,12 +203,15 @@ Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) {
// _local_recvr depdend on pipeline::ExchangeLocalState* _parent to do some memory counter settings
// but it only owns a raw pointer, so that the ExchangeLocalState object may be deconstructed.
// Lock the fragment context to ensure the runtime state and other objects are not deconstructed
auto ctx_lock = _local_recvr->task_exec_ctx();
if (receiver_status.ok() && ctx_lock == nullptr) {
TaskExecutionContextSPtr ctx_lock = nullptr;
if (receiver_status.ok()) {
ctx_lock = _local_recvr->task_exec_ctx();
// Do not return internal error, because when query finished, the downstream node
// may finish before upstream node. And the object maybe deconstructed. If return error
// then the upstream node may report error status to FE, the query is failed.
receiver_status = Status::EndOfFile("local data stream receiver is deconstructed");
if (ctx_lock == nullptr) {
receiver_status = Status::EndOfFile("local data stream receiver is deconstructed");
}
}
if (receiver_status.ok()) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
Expand Down

0 comments on commit b08ec78

Please sign in to comment.