Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](sink) support parallel result sink #36053

Merged
merged 10 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i
std::mt19937 g(rd());
shuffle(_channels.begin(), _channels.end(), g);

for (int i = 0; i < _channels.size(); ++i) {
RETURN_IF_ERROR(_channels[i]->init_stub(state));
for (auto& _channel : _channels) {
RETURN_IF_ERROR(_channel->init_stub(state));
}
}
_writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get());
Expand All @@ -139,9 +139,9 @@ Status ResultFileSinkLocalState::open(RuntimeState* state) {
auto& p = _parent->cast<ResultFileSinkOperatorX>();
if (!p._is_top_sink) {
int local_size = 0;
for (int i = 0; i < _channels.size(); ++i) {
RETURN_IF_ERROR(_channels[i]->open(state));
if (_channels[i]->is_local()) {
for (auto& _channel : _channels) {
RETURN_IF_ERROR(_channel->open(state));
if (_channel->is_local()) {
local_size++;
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
// close sender, this is normal path end
if (_sender) {
_sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows());
static_cast<void>(_sender->close(final_status));
RETURN_IF_ERROR(_sender->close(final_status));
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
Expand Down
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/result_file_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#pragma once

#include <stdint.h>

#include "operator.h"
#include "vec/sink/writer/vfile_result_writer.h"

Expand All @@ -39,7 +37,7 @@ class ResultFileSinkLocalState final
using Base = AsyncWriterSink<vectorized::VFileResultWriter, ResultFileSinkOperatorX>;
ENABLE_FACTORY_CREATOR(ResultFileSinkLocalState);
ResultFileSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state);
~ResultFileSinkLocalState();
~ResultFileSinkLocalState() override;

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Expand Down
20 changes: 13 additions & 7 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "result_sink_operator.h"

#include <memory>
#include <utility>

#include "common/object_pool.h"
#include "exec/rowid_fetcher.h"
Expand All @@ -44,10 +45,13 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1);

// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout()));
if (state->query_options().enable_parallel_result_sink) {
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout()));
}
_sender->set_dependency(_dependency->shared_from_this());
return Status::OK();
}
Expand Down Expand Up @@ -104,9 +108,6 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r
}

Status ResultSinkOperatorX::prepare(RuntimeState* state) {
auto fragment_instance_id = state->fragment_instance_id();
auto title = fmt::format("VDataBufferSender (dst_fragment_instance_id={:x}-{:x})",
fragment_instance_id.hi, fragment_instance_id.lo);
// prepare output_expr
// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
Expand All @@ -118,6 +119,11 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
}
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));

if (state->query_options().enable_parallel_result_sink) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout()));
}
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#pragma once

#include <stdint.h>

#include "operator.h"
#include "runtime/buffer_control_block.h"
#include "runtime/result_writer.h"
Expand Down Expand Up @@ -159,6 +157,8 @@ class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState>

// for fetch data by rowids
TFetchOption _fetch_option;

std::shared_ptr<BufferControlBlock> _sender = nullptr;
};

} // namespace pipeline
Expand Down
23 changes: 15 additions & 8 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
}
_buffer_rows += num_rows;
} else {
auto ctx = _waiting_rpc.front();
auto* ctx = _waiting_rpc.front();
_waiting_rpc.pop_front();
ctx->on_data(result, _packet_num);
_packet_num++;
Expand Down Expand Up @@ -252,6 +252,11 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*

Status BufferControlBlock::close(Status exec_status) {
std::unique_lock<std::mutex> l(_lock);
close_cnt++;
if (close_cnt < _result_sink_dependencys.size()) {
return Status::OK();
}

_is_close = true;
_status = exec_status;

Expand Down Expand Up @@ -286,16 +291,18 @@ void BufferControlBlock::cancel() {

void BufferControlBlock::set_dependency(
std::shared_ptr<pipeline::Dependency> result_sink_dependency) {
_result_sink_dependency = result_sink_dependency;
_result_sink_dependencys.push_back(result_sink_dependency);
}

void BufferControlBlock::_update_dependency() {
if (_result_sink_dependency &&
(_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled)) {
_result_sink_dependency->set_ready();
} else if (_result_sink_dependency &&
(!_batch_queue_empty && _buffer_rows < _buffer_limit && !_is_cancelled)) {
_result_sink_dependency->block();
if (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled) {
for (auto dependency : _result_sink_dependencys) {
dependency->set_ready();
}
} else if (!_batch_queue_empty && _buffer_rows < _buffer_limit && !_is_cancelled) {
for (auto dependency : _result_sink_dependencys) {
dependency->block();
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ class BufferControlBlock {
// only used for FE using return rows to check limit
std::unique_ptr<QueryStatistics> _query_statistics;
std::atomic_bool _batch_queue_empty = false;
std::shared_ptr<pipeline::Dependency> _result_sink_dependency;
std::vector<std::shared_ptr<pipeline::Dependency>> _result_sink_dependencys;
size_t close_cnt = 0;
};

} // namespace doris
26 changes: 12 additions & 14 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size

std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TUniqueId& query_id) {
std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);
auto iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
return iter->second;
}

return std::shared_ptr<BufferControlBlock>();
return {};
}

void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,
Expand All @@ -128,8 +128,7 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c
tid.__set_lo(finst_id.lo());
std::shared_ptr<BufferControlBlock> cb = find_control_block(tid);
if (cb == nullptr) {
LOG(WARNING) << "no result for this query, id=" << print_id(tid);
ctx->on_failure(Status::InternalError("no result for this query"));
ctx->on_failure(Status::InternalError("no result for this query, tid={}", print_id(tid)));
return;
}
cb->get_batch(ctx);
Expand All @@ -139,8 +138,7 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
std::shared_ptr<arrow::RecordBatch>* result) {
std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
if (cb == nullptr) {
LOG(WARNING) << "no result for this query, id=" << print_id(finst_id);
return Status::InternalError("no result for this query");
return Status::InternalError("no result for this query, finst_id={}", print_id(finst_id));
}
RETURN_IF_ERROR(cb->get_arrow_batch(result));
return Status::OK();
Expand All @@ -149,7 +147,7 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
void ResultBufferMgr::cancel(const TUniqueId& query_id) {
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);
auto iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
iter->second->cancel();
Expand All @@ -169,7 +167,7 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id) {

void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) {
std::lock_guard<std::mutex> l(_timeout_lock);
TimeoutMap::iterator iter = _timeout_map.find(cancel_time);
auto iter = _timeout_map.find(cancel_time);

if (_timeout_map.end() == iter) {
_timeout_map.insert(
Expand All @@ -189,20 +187,20 @@ void ResultBufferMgr::cancel_thread() {
time_t now_time = time(nullptr);
{
std::lock_guard<std::mutex> l(_timeout_lock);
TimeoutMap::iterator end = _timeout_map.upper_bound(now_time + 1);
auto end = _timeout_map.upper_bound(now_time + 1);

for (TimeoutMap::iterator iter = _timeout_map.begin(); iter != end; ++iter) {
for (int i = 0; i < iter->second.size(); ++i) {
query_to_cancel.push_back(iter->second[i]);
for (auto iter = _timeout_map.begin(); iter != end; ++iter) {
for (const auto& id : iter->second) {
query_to_cancel.push_back(id);
}
}

_timeout_map.erase(_timeout_map.begin(), end);
}

// cancel query
for (int i = 0; i < query_to_cancel.size(); ++i) {
cancel(query_to_cancel[i]);
for (const auto& id : query_to_cancel) {
cancel(id);
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));

Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,6 @@ class RuntimeState {
return _query_options.return_object_data_as_binary;
}

bool enable_exchange_node_parallel_merge() const {
return _query_options.enable_enable_exchange_node_parallel_merge;
}

segment_v2::CompressionTypePB fragement_transmission_compression_type() const {
if (_query_options.__isset.fragment_transmission_compression_codec) {
if (_query_options.fragment_transmission_compression_codec == "lz4") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ public Void visitPhysicalIcebergTableSink(

@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
if (context.getSessionVariable().enableParallelResultSink()
&& !context.getStatementContext().isShortCircuitQuery()) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,6 @@ public boolean hasColocatePlanNode() {
return hasColocatePlanNode;
}

public void setDataPartition(DataPartition dataPartition) {
this.dataPartition = dataPartition;
}

/**
* Finalize plan tree and create stream sink, if needed.
*/
Expand Down
41 changes: 30 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public class Coordinator implements CoordInterface {

private final Map<Pair<Integer, Long>, PipelineExecContext> pipelineExecContexts = new HashMap<>();
private final List<PipelineExecContext> needCheckPipelineExecContexts = Lists.newArrayList();
private ResultReceiver receiver;
private List<ResultReceiver> receivers = Lists.newArrayList();
protected final List<ScanNode> scanNodes;
private int scanRangeNum = 0;
// number of instances of this query, equals to
Expand Down Expand Up @@ -682,13 +682,27 @@ private void execInternal() throws Exception {
DataSink topDataSink = topParams.fragment.getSink();
this.timeoutDeadline = System.currentTimeMillis() + queryOptions.getExecutionTimeout() * 1000L;
if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) {
Boolean enableParallelResultSink = queryOptions.isEnableParallelResultSink()
&& topDataSink instanceof ResultSink;
TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId,
addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
Set<TNetworkAddress> addrs = new HashSet<>();
for (FInstanceExecParam param : topParams.instanceExecParams) {
if (addrs.contains(param.host)) {
continue;
}
addrs.add(param.host);
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
}

if (!context.isReturnResultFromLocal()) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
if (enableParallelResultSink) {
context.setFinstId(queryId);
} else {
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
}
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
Expand Down Expand Up @@ -1084,13 +1098,13 @@ private void updateStatus(Status status) {

@Override
public RowBatch getNext() throws Exception {
if (receiver == null) {
if (receivers.isEmpty()) {
throw new UserException("There is no receiver.");
}

RowBatch resultBatch;
Status status = new Status();
resultBatch = receiver.getNext(status);
resultBatch = receivers.get(receivers.size() - 1).getNext(status);
if (!status.ok()) {
LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
DebugUtil.printId(queryId), status.getErrorMsg());
Expand Down Expand Up @@ -1129,7 +1143,12 @@ public RowBatch getNext() throws Exception {
}

if (resultBatch.isEos()) {
this.returnedAllResults = true;
receivers.remove(receivers.size() - 1);
if (receivers.isEmpty()) {
returnedAllResults = true;
} else {
resultBatch.setEos(false);
}

// if this query is a block query do not cancel.
Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
Expand Down Expand Up @@ -1250,7 +1269,7 @@ private void cancelLatch() {
}

private void cancelInternal(Status cancelReason) {
if (null != receiver) {
for (ResultReceiver receiver : receivers) {
receiver.cancel(cancelReason);
}
if (null != pointExec) {
Expand Down Expand Up @@ -1814,9 +1833,9 @@ private void computeFragmentHosts() throws Exception {
leftMostNode.getNumInstances());
boolean forceToLocalShuffle = context != null
&& context.getSessionVariable().isForceToLocalShuffle();
boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream()
.allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context,
addressToBackendID.size())) && useNereids);
boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream().allMatch(
scanNode -> scanNode.ignoreStorageDataDistribution(context, addressToBackendID.size()))
&& useNereids);
if (node.isPresent() && (!node.get().shouldDisableSharedScan(context)
|| ignoreStorageDataDistribution)) {
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
Expand Down
Loading
Loading