Skip to content

Commit

Permalink
[Improvement](sink) optimization for parallel result sink (apache#36305)
Browse files Browse the repository at this point in the history
## Proposed changes
optimization for parallel result sink apache#36053
  • Loading branch information
BiteTheDDDDt authored Jun 19, 2024
1 parent 23cd498 commit fdb5891
Show file tree
Hide file tree
Showing 34 changed files with 206 additions and 213 deletions.
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i
if (p._is_top_sink) {
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout()));
state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout(),
state->batch_size()));
// create writer
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
p._file_opts.get(), p._storage_type, state->fragment_instance_id(),
Expand Down Expand Up @@ -175,7 +176,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
// close sender, this is normal path end
if (_sender) {
_sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows());
RETURN_IF_ERROR(_sender->close(final_status));
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status));
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/result_file_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ResultFileSinkOperatorX final : public DataSinkOperatorX<ResultFileSinkLoc

// Owned by the RuntimeState.
RowDescriptor _output_row_descriptor;
int _buf_size = 1024; // Allocated from _pool
int _buf_size = 4096; // Allocated from _pool
bool _is_top_sink = true;
std::string _header;
std::string _header_type;
Expand Down
13 changes: 7 additions & 6 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout()));
fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(),
state->batch_size()));
}
_sender->set_dependency(_dependency->shared_from_this());
_sender->set_dependency(fragment_instance_id, _dependency->shared_from_this());
return Status::OK();
}

Expand Down Expand Up @@ -122,7 +122,8 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {

if (state->query_options().enable_parallel_result_sink) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout()));
state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(),
state->batch_size()));
}
return Status::OK();
}
Expand All @@ -139,7 +140,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block,
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
}
RETURN_IF_ERROR(local_state._writer->write(*block));
RETURN_IF_ERROR(local_state._writer->write(state, *block));
if (_fetch_option.use_two_phase_fetch) {
// Block structure may be changed by calling _second_phase_fetch_data().
// So we should clear block in case of unmatched columns
Expand Down Expand Up @@ -185,7 +186,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_writer) {
_sender->update_return_rows(_writer->get_written_rows());
}
RETURN_IF_ERROR(_sender->close(final_status));
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status));
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct ResultFileOptions {
}
};

constexpr int RESULT_SINK_BUFFER_SIZE = 4096;
constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8;

class ResultSinkLocalState final : public PipelineXSinkLocalState<BasicSharedState> {
ENABLE_FACTORY_CREATOR(ResultSinkLocalState);
Expand Down
22 changes: 7 additions & 15 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes());
size_t allocated_bytes = new_block.allocated_bytes();
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->add_mem_usage(channel_id, allocated_bytes);
local_state._shared_state->set_ready_to_read(channel_id);
}

Expand All @@ -220,25 +221,16 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
} else {
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand Down
Loading

0 comments on commit fdb5891

Please sign in to comment.