Skip to content

Commit

Permalink
Merge branch 'master' into cloud_copy_into
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Mar 25, 2024
2 parents d6f5496 + ca97b91 commit 85757cf
Show file tree
Hide file tree
Showing 80 changed files with 6,653 additions and 316 deletions.
6 changes: 3 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");

// sync tablet_meta when modifying meta
DEFINE_mBool(sync_tablet_meta, "true");
DEFINE_mBool(sync_tablet_meta, "false");

// default thrift rpc timeout ms
DEFINE_mInt32(thrift_rpc_timeout_ms, "60000");
Expand All @@ -617,9 +617,9 @@ DEFINE_Bool(enable_metric_calculator, "true");
// max consumer num in one data consumer group, for routine load
DEFINE_mInt32(max_consumer_num_per_group, "3");

// the size of thread pool for routine load task.
// the max size of thread pool for routine load task.
// this should be larger than FE config 'max_routine_load_task_num_per_be' (default 5)
DEFINE_Int32(routine_load_thread_pool_size, "10");
DEFINE_Int32(max_routine_load_thread_pool_size, "1024");

// max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row
// default is 20, batch_size's default value is 1024 means 20 * 1024 rows will be cached
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,9 @@ DECLARE_Bool(enable_metric_calculator);
// max consumer num in one data consumer group, for routine load
DECLARE_mInt32(max_consumer_num_per_group);

// the size of thread pool for routine load task.
// the max size of thread pool for routine load task.
// this should be larger than FE config 'max_routine_load_task_num_per_be' (default 5)
DECLARE_Int32(routine_load_thread_pool_size);
DECLARE_Int32(max_routine_load_thread_pool_size);

// max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row
// default is 20, batch_size's default value is 1024 means 20 * 1024 rows will be cached
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <memory>
#include <optional>
#include <thread>

#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class DistinctStreamingAggOperatorX final
bool need_more_input_data(RuntimeState* state) const override;

DataDistribution required_data_distribution() const override {
if (_needs_finalize || !_probe_expr_ctxs.empty()) {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ Status JoinProbeLocalState<SharedStateArg, Derived>::_build_output_block(
// In previous versions, the join node had a separate set of project structures,
// and you could see a 'todo' in the Thrift definition.
// Here, we have refactored it, but considering upgrade compatibility, we still need to retain the old code.
*output_block = *origin_block;
if (!output_block->mem_reuse()) {
vectorized::MutableBlock tmp(
vectorized::VectorizedUtils::create_columns_with_type_and_name(p.row_desc()));
output_block->swap(tmp.to_block());
}
output_block->swap(*origin_block);
return Status::OK();
}
SCOPED_TIMER(_build_output_block_timer);
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/union_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
Defer set_eos {[&]() {
//have executing const expr, queue have no data anymore, and child could be closed
*eos = (_child_size == 0 && !local_state._need_read_for_const_expr) ||
(_child_size > 0 && local_state._shared_state->data_queue.is_all_finish() &&
!_has_data(state));
// here should check `_has_data` first, or when `is_all_finish` is false,
// the data queue will have no chance to change the `_flag_queue_idx`.
(!_has_data(state) && _child_size > 0 &&
local_state._shared_state->data_queue.is_all_finish());
}};

SCOPED_TIMER(local_state.exec_time_counter());
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
RETURN_IF_ERROR(_routine_load_task_executor->init());
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
Expand Down
24 changes: 15 additions & 9 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ using namespace ErrorCode;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(routine_load_task_count, MetricUnit::NOUNIT);

RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env)
: _exec_env(exec_env),
_thread_pool(config::routine_load_thread_pool_size, config::routine_load_thread_pool_size,
"routine_load"),
_data_consumer_pool(config::routine_load_consumer_pool_size) {
: _exec_env(exec_env), _data_consumer_pool(config::routine_load_consumer_pool_size) {
REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
// std::lock_guard<std::mutex> l(_lock);
return _task_map.size();
Expand All @@ -79,10 +76,19 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
_task_map.clear();
}

Status RoutineLoadTaskExecutor::init() {
return ThreadPoolBuilder("routine_load")
.set_min_threads(0)
.set_max_threads(config::max_routine_load_thread_pool_size)
.set_max_queue_size(config::max_routine_load_thread_pool_size)
.build(&_thread_pool);
}

void RoutineLoadTaskExecutor::stop() {
DEREGISTER_HOOK_METRIC(routine_load_task_count);
_thread_pool.shutdown();
_thread_pool.join();
if (_thread_pool) {
_thread_pool->shutdown();
}
_data_consumer_pool.stop();
}

Expand Down Expand Up @@ -180,10 +186,10 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
return Status::OK();
}

if (_task_map.size() >= config::routine_load_thread_pool_size) {
if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id)
<< ", job id: " << task.job_id
<< ", queue size: " << _thread_pool.get_queue_size()
<< ", queue size: " << _thread_pool->get_queue_size()
<< ", current tasks num: " << _task_map.size();
return Status::TooManyTasks("{}_{}", UniqueId(task.id).to_string(),
BackendOptions::get_localhost());
Expand Down Expand Up @@ -259,7 +265,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
_task_map[ctx->id] = ctx;

// offer the task to thread pool
if (!_thread_pool.offer(std::bind<void>(
if (!_thread_pool->submit_func(std::bind<void>(
&RoutineLoadTaskExecutor::exec_task, this, ctx, &_data_consumer_pool,
[this](std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/routine_load/routine_load_task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#include <vector>

#include "runtime/routine_load/data_consumer_pool.h"
#include "util/threadpool.h"
#include "util/uid_util.h"
#include "util/work_thread_pool.hpp"

namespace doris {

Expand All @@ -51,6 +51,8 @@ class RoutineLoadTaskExecutor {

~RoutineLoadTaskExecutor();

Status init();

void stop();

// submit a routine load task
Expand Down Expand Up @@ -81,7 +83,7 @@ class RoutineLoadTaskExecutor {

private:
ExecEnv* _exec_env = nullptr;
PriorityThreadPool _thread_pool;
std::unique_ptr<ThreadPool> _thread_pool;
DataConsumerPool _data_consumer_pool;

std::mutex _lock;
Expand Down
13 changes: 13 additions & 0 deletions be/src/vec/core/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,13 @@ std::string decimal_to_string(const T& value, UInt32 scale) {
return str;
}

template <typename T>
std::string decimal_to_string(const T& orig_value, UInt32 trunc_precision, UInt32 scale) {
T multiplier = decimal_scale_multiplier<T>(trunc_precision);
T value = orig_value % multiplier;
return decimal_to_string(value, scale);
}

template <typename T>
size_t decimal_to_string(const T& value, char* dst, UInt32 scale, const T& scale_multiplier) {
if (UNLIKELY(value == std::numeric_limits<T>::min())) {
Expand Down Expand Up @@ -621,6 +628,12 @@ struct Decimal {

std::string to_string(UInt32 scale) const { return decimal_to_string(value, scale); }

// truncate to specified precision and scale,
// used by runtime filter only for now.
std::string to_string(UInt32 precision, UInt32 scale) const {
return decimal_to_string(value, precision, scale);
}

/**
* Got the string representation of a decimal.
* @param dst Store the result, should be pre-allocated.
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/exec/join/vjoin_node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo
// In previous versions, the join node had a separate set of project structures,
// and you could see a 'todo' in the Thrift definition.
// Here, we have refactored it, but considering upgrade compatibility, we still need to retain the old code.
*output_block = *origin_block;
if (!output_block->mem_reuse()) {
MutableBlock tmp(VectorizedUtils::create_columns_with_type_and_name(row_desc()));
output_block->swap(tmp.to_block());
}
output_block->swap(*origin_block);
return Status::OK();
}
auto is_mem_reuse = output_block->mem_reuse();
Expand Down
15 changes: 11 additions & 4 deletions be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,28 +374,35 @@ Status create_texpr_literal_node(const void* data, TExprNode* node, int precisio
const auto* origin_value = reinterpret_cast<const vectorized::Decimal<int32_t>*>(data);
(*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
TDecimalLiteral decimal_literal;
decimal_literal.__set_value(origin_value->to_string(scale));
decimal_literal.__set_value(origin_value->to_string(precision, scale));
(*node).__set_decimal_literal(decimal_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL32, precision, scale));
} else if constexpr (T == TYPE_DECIMAL64) {
const auto* origin_value = reinterpret_cast<const vectorized::Decimal<int64_t>*>(data);
(*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
TDecimalLiteral decimal_literal;
decimal_literal.__set_value(origin_value->to_string(scale));
decimal_literal.__set_value(origin_value->to_string(precision, scale));
(*node).__set_decimal_literal(decimal_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL64, precision, scale));
} else if constexpr (T == TYPE_DECIMAL128I) {
const auto* origin_value = reinterpret_cast<const vectorized::Decimal<int128_t>*>(data);
(*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
TDecimalLiteral decimal_literal;
decimal_literal.__set_value(origin_value->to_string(scale));
// e.g. For a decimal(26,6) column, the initial value of the _min of the MinMax RF
// on the RF producer side is an int128 value with 38 digits of 9, and this is the
// final min value of the MinMax RF if the fragment instance has no data.
// Need to truncate the value to the right precision and scale here, to avoid
// error when casting string back to decimal later.
// TODO: this is a temporary solution, the best solution is to produce the
// right min max value at the producer side.
decimal_literal.__set_value(origin_value->to_string(precision, scale));
(*node).__set_decimal_literal(decimal_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL128I, precision, scale));
} else if constexpr (T == TYPE_DECIMAL256) {
const auto* origin_value = reinterpret_cast<const vectorized::Decimal<wide::Int256>*>(data);
(*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
TDecimalLiteral decimal_literal;
decimal_literal.__set_value(origin_value->to_string(scale));
decimal_literal.__set_value(origin_value->to_string(precision, scale));
(*node).__set_decimal_literal(decimal_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL256, precision, scale));
} else if constexpr (T == TYPE_FLOAT) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/runtime/vparquet_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ void ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
const std::vector<TParquetSchema>& parquet_schemas,
const TParquetCompressionType::type& compression_type,
const bool& parquet_disable_dictionary,
const TParquetVersion::type& parquet_version,
TParquetCompressionType::type compression_type,
bool parquet_disable_dictionary,
TParquetVersion::type parquet_version,
bool output_object_data)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_parquet_schemas(parquet_schemas),
Expand Down
12 changes: 6 additions & 6 deletions be/src/vec/runtime/vparquet_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ class VParquetTransformer final : public VFileFormatTransformer {
VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
const std::vector<TParquetSchema>& parquet_schemas,
const TParquetCompressionType::type& compression_type,
const bool& parquet_disable_dictionary,
const TParquetVersion::type& parquet_version, bool output_object_data);
TParquetCompressionType::type compression_type,
bool parquet_disable_dictionary, TParquetVersion::type parquet_version,
bool output_object_data);

~VParquetTransformer() override = default;

Expand All @@ -118,9 +118,9 @@ class VParquetTransformer final : public VFileFormatTransformer {
std::shared_ptr<arrow::Schema> _arrow_schema;

const std::vector<TParquetSchema>& _parquet_schemas;
const TParquetCompressionType::type& _compression_type;
const bool& _parquet_disable_dictionary;
const TParquetVersion::type& _parquet_version;
const TParquetCompressionType::type _compression_type;
const bool _parquet_disable_dictionary;
const TParquetVersion::type _parquet_version;
};

} // namespace doris::vectorized
36 changes: 20 additions & 16 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)

io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties,
{.path = _write_info.write_path + '/' + _file_name}));
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _file_name)};
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer));

switch (_file_format_type) {
case TFileFormatType::FORMAT_PARQUET: {
Expand All @@ -75,17 +77,18 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
}
}
std::vector<TParquetSchema> parquet_schemas;
parquet_schemas.reserve(_columns.size());
for (int i = 0; i < _columns.size(); i++) {
VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
TParquetSchema parquet_schema;
parquet_schema.schema_column_name = _columns[i].name;
parquet_schemas.emplace_back(std::move(parquet_schema));
}
_vfile_writer.reset(new VParquetTransformer(
state, _file_writer_impl.get(), _vec_output_expr_ctxs, parquet_schemas,
_file_format_transformer.reset(new VParquetTransformer(
state, _file_writer.get(), _vec_output_expr_ctxs, parquet_schemas,
parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0,
false));
return _vfile_writer->open();
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
orc::CompressionKind orc_compression_type;
Expand Down Expand Up @@ -122,10 +125,10 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
}
}

_vfile_writer.reset(new VOrcTransformer(state, _file_writer_impl.get(),
_vec_output_expr_ctxs, std::move(root_schema),
false, orc_compression_type));
return _vfile_writer->open();
_file_format_transformer.reset(
new VOrcTransformer(state, _file_writer.get(), _vec_output_expr_ctxs,
std::move(root_schema), false, orc_compression_type));
return _file_format_transformer->open();
}
default: {
return Status::InternalError("Unsupported file format type {}",
Expand All @@ -135,13 +138,14 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
}

Status VHivePartitionWriter::close(const Status& status) {
if (_vfile_writer != nullptr) {
Status st = _vfile_writer->close();
if (_file_format_transformer != nullptr) {
Status st = _file_format_transformer->close();
if (!st.ok()) {
LOG(WARNING) << fmt::format("_vfile_writer close failed, reason: {}", st.to_string());
LOG(WARNING) << fmt::format("_file_format_transformer close failed, reason: {}",
st.to_string());
}
}
if (!status.ok()) {
if (!status.ok() && _fs != nullptr) {
auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
Status st = _fs->delete_file(path);
if (!st.ok()) {
Expand All @@ -155,7 +159,7 @@ Status VHivePartitionWriter::close(const Status& status) {
Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn::Filter* filter) {
Block output_block;
RETURN_IF_ERROR(_projection_and_filter_block(block, filter, &output_block));
RETURN_IF_ERROR(_vfile_writer->write(output_block));
RETURN_IF_ERROR(_file_format_transformer->write(output_block));
_row_count += output_block.rows();
_input_size_in_bytes += output_block.bytes();
return Status::OK();
Expand Down Expand Up @@ -244,7 +248,7 @@ Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Blo
return status;
}
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
_vec_output_expr_ctxs, input_block, output_block));
_vec_output_expr_ctxs, input_block, output_block, true));
materialize_block_inplace(*output_block);

if (filter == nullptr) {
Expand Down Expand Up @@ -278,4 +282,4 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
}

} // namespace vectorized
} // namespace doris
} // namespace doris
Loading

0 comments on commit 85757cf

Please sign in to comment.