Skip to content

Commit

Permalink
Merge branch 'branch-2.1' into sync_unique_case
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 authored Jul 11, 2024
2 parents 4ca8e8e + e66ffc1 commit 07cd25e
Show file tree
Hide file tree
Showing 629 changed files with 26,946 additions and 14,345 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ docker/thirdparties/docker-compose/hive/scripts/paimon1
fe_plugins/output
fe_plugins/**/.factorypath

docker/thirdparties/docker-compose/hive/scripts/data/*/*/data

fs_brokers/apache_hdfs_broker/src/main/resources/
fs_brokers/apache_hdfs_broker/src/main/thrift/

Expand Down
7 changes: 1 addition & 6 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ header:
- "docs/package-lock.json"
- "regression-test/script/README"
- "regression-test/suites/load_p0/stream_load/data"
- "docker/thirdparties/docker-compose/hive/scripts/README"
- "docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql"
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_orc.hql"
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_parquet.hql"
- "docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/"
- "docker/thirdparties/docker-compose/hive/scripts/data/**"
- "docker/thirdparties/docker-compose/hive/scripts/**"
- "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
- "conf/mysql_ssl_default_certificate/*"
- "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
Expand Down
1 change: 0 additions & 1 deletion be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ add_thirdparty(crypto)
add_thirdparty(openssl LIBNAME "lib/libssl.a")
add_thirdparty(leveldb)
add_thirdparty(jemalloc LIBNAME "lib/libjemalloc_doris.a")
add_thirdparty(jemalloc_arrow LIBNAME "lib/libjemalloc_arrow.a")

if (WITH_MYSQL)
add_thirdparty(mysql LIBNAME "lib/libmysqlclient.a")
Expand Down
7 changes: 4 additions & 3 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Status CgroupV1CpuCtl::init() {
return Status::InternalError<false>("invalid cgroup path, not find cpu quota file");
}

if (_tg_id == -1) {
if (_wg_id == -1) {
// means current cgroup cpu ctl is just used to clear dir,
// it does not contains workload group.
// todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
Expand All @@ -140,7 +140,7 @@ Status CgroupV1CpuCtl::init() {
}

// workload group path
_cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_tg_id);
_cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_wg_id);
if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
if (ret != 0) {
Expand Down Expand Up @@ -186,7 +186,8 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
return Status::OK();
#else
int tid = static_cast<int>(syscall(SYS_gettid));
std::string msg = "add thread " + std::to_string(tid) + " to group";
std::string msg =
"add thread " + std::to_string(tid) + " to group" + " " + std::to_string(_wg_id);
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true);
#endif
Expand Down
4 changes: 2 additions & 2 deletions be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CgroupCpuCtl {
public:
virtual ~CgroupCpuCtl() = default;
CgroupCpuCtl() = default;
CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }
CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; }

virtual Status init();

Expand Down Expand Up @@ -63,7 +63,7 @@ class CgroupCpuCtl {
int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
uint64_t _tg_id = -1; // workload group id
uint64_t _wg_id = -1; // workload group id
uint64_t _cpu_shares = 0;
};

Expand Down
14 changes: 6 additions & 8 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,29 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
current_wg_ids.insert(workload_group_info.id);
}
if (!ret.ok()) {
LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id="
LOG(INFO) << "[topic_publish_wg]parse topic info failed, wg_id="
<< workload_group_info.id << ", reason:" << ret.to_string();
continue;
}

// 2 update workload group
auto tg =
auto wg =
_exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info);

// 3 set cpu soft hard limit switch
_exec_env->workload_group_mgr()->_enable_cpu_hard_limit.store(
workload_group_info.enable_cpu_hard_limit);

// 4 create and update task scheduler
tg->upsert_task_scheduler(&workload_group_info, _exec_env);
wg->upsert_task_scheduler(&workload_group_info, _exec_env);

LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info="
<< tg->debug_string() << ", enable_cpu_hard_limit="
LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info="
<< wg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit
<< ", enable_cgroup_cpu_soft_limit="
<< (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
<< ", cgroup home path=" << config::doris_cgroup_cpu_path
<< ", list size=" << list_size;
<< ", list size=" << list_size << ", thread info=" << wg->thread_debug_info();
}

// NOTE(wb) when is_set_workload_group_info=false, it means FE send a empty workload group list
Expand Down
9 changes: 5 additions & 4 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ DEFINE_mInt32(garbage_sweep_batch_size, "100");
DEFINE_mInt32(snapshot_expire_time_sec, "172800");
// It is only a recommended value. When the disk space is insufficient,
// the file storage period under trash dose not have to comply with this parameter.
DEFINE_mInt32(trash_file_expire_time_sec, "259200");
DEFINE_mInt32(trash_file_expire_time_sec, "86400");
// minimum file descriptor number
// modify them upon necessity
DEFINE_Int32(min_file_descriptor_number, "60000");
Expand Down Expand Up @@ -427,6 +427,8 @@ DEFINE_Validator(compaction_task_num_per_disk,
[](const int config) -> bool { return config >= 2; });
DEFINE_Validator(compaction_task_num_per_fast_disk,
[](const int config) -> bool { return config >= 2; });
DEFINE_Validator(low_priority_compaction_task_num_per_disk,
[](const int config) -> bool { return config >= 2; });

// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
Expand All @@ -448,8 +450,8 @@ DEFINE_mInt64(pick_rowset_to_compact_interval_sec, "86400");

// Compaction priority schedule
DEFINE_mBool(enable_compaction_priority_scheduling, "true");
DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "1");
DEFINE_mDouble(low_priority_tablet_version_num_ratio, "0.7");
DEFINE_mInt32(low_priority_compaction_task_num_per_disk, "2");
DEFINE_mInt32(low_priority_compaction_score_threshold, "200");

// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
DEFINE_Int32(max_meta_checkpoint_threads, "-1");
Expand Down Expand Up @@ -1145,7 +1147,6 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");

// cgroup
DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");

DEFINE_mBool(enable_workload_group_memory_gc, "true");

Expand Down
3 changes: 1 addition & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ DECLARE_mInt64(pick_rowset_to_compact_interval_sec);
// Compaction priority schedule
DECLARE_mBool(enable_compaction_priority_scheduling);
DECLARE_mInt32(low_priority_compaction_task_num_per_disk);
DECLARE_mDouble(low_priority_tablet_version_num_ratio);
DECLARE_mInt32(low_priority_compaction_score_threshold);

// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
DECLARE_Int32(max_meta_checkpoint_threads);
Expand Down Expand Up @@ -1219,7 +1219,6 @@ DECLARE_mBool(exit_on_exception);

// cgroup
DECLARE_mString(doris_cgroup_cpu_path);
DECLARE_mBool(enable_cgroup_cpu_soft_limit);

DECLARE_mBool(enable_workload_group_memory_gc);

Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/jsonb_value.h"
#include "runtime/primitive_type.h"
#include "runtime/types.h"
#include "util/binary_cast.hpp"
Expand Down Expand Up @@ -799,6 +800,12 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
col_ptr->insert(array);
break;
}
case TYPE_JSONB: {
JsonBinaryValue binary_val(json_value_to_string(col));
vectorized::JsonbField json(binary_val.value(), binary_val.size());
col_ptr->insert(json);
break;
}
default: {
LOG(ERROR) << "Unsupported data type: " << type_to_string(type);
DCHECK(false);
Expand Down
17 changes: 13 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,25 +1033,34 @@ Status IRuntimeFilter::publish(bool publish_local) {
class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>> {
std::shared_ptr<pipeline::Dependency> _dependency;
IRuntimeFilter* _filter;
using Base =
AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>;
ENABLE_FACTORY_CREATOR(SyncSizeClosure);

void _process_if_rpc_failed() override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
LOG(WARNING) << "sync filter size meet rpc error, filter=" << _filter->debug_string();
Base::_process_if_rpc_failed();
}

void _process_if_meet_error_status(const Status& status) override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
Base::_process_if_meet_error_status(status);
if (status.is<ErrorCode::END_OF_FILE>()) {
// rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case.
_filter->set_ignored();
} else {
LOG(WARNING) << "sync filter size meet error status, filter="
<< _filter->debug_string();
Base::_process_if_meet_error_status(status);
}
}

public:
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency)
: Base(req, callback), _dependency(std::move(dependency)) {}
std::shared_ptr<pipeline::Dependency> dependency, IRuntimeFilter* filter)
: Base(req, callback), _dependency(std::move(dependency)), _filter(filter) {}
};

Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) {
Expand Down Expand Up @@ -1094,7 +1103,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt

auto request = std::make_shared<PSendFilterSizeRequest>();
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency);
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, this);
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
Expand Down
9 changes: 9 additions & 0 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class VRuntimeFilterSlots {
// process ignore duplicate IN_FILTER
std::unordered_set<int> has_in_filter;
for (auto* filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
continue;
}
Expand All @@ -83,6 +86,9 @@ class VRuntimeFilterSlots {

// process ignore filter when it has IN_FILTER on same expr, and init bloom filter size
for (auto* filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
!has_in_filter.contains(filter->expr_order())) {
continue;
Expand All @@ -95,6 +101,9 @@ class VRuntimeFilterSlots {
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto* filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) {
RETURN_IF_ERROR(filter->change_to_bloom_filter());
Expand Down
21 changes: 9 additions & 12 deletions be/src/http/action/download_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

#include "http/action/download_action.h"

#include <algorithm>
#include <memory>
#include <sstream>
#include <string>
#include <utility>

Expand All @@ -34,10 +32,11 @@

namespace doris {
namespace {
static const std::string FILE_PARAMETER = "file";
static const std::string TOKEN_PARAMETER = "token";
static const std::string CHANNEL_PARAMETER = "channel";
static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
const std::string FILE_PARAMETER = "file";
const std::string TOKEN_PARAMETER = "token";
const std::string CHANNEL_PARAMETER = "channel";
const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
const std::string ACQUIRE_MD5_PARAMETER = "acquire_md5";
} // namespace

DownloadAction::DownloadAction(ExecEnv* exec_env,
Expand All @@ -47,7 +46,7 @@ DownloadAction::DownloadAction(ExecEnv* exec_env,
_download_type(NORMAL),
_num_workers(num_workers),
_rate_limit_group(std::move(rate_limit_group)) {
for (auto& dir : allow_dirs) {
for (const auto& dir : allow_dirs) {
std::string p;
Status st = io::global_local_filesystem()->canonicalize(dir, &p);
if (!st.ok()) {
Expand Down Expand Up @@ -116,11 +115,9 @@ void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_par
} else {
const auto& channel = req->param(CHANNEL_PARAMETER);
bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
if (ingest_binlog) {
do_file_response(file_param, req, _rate_limit_group.get());
} else {
do_file_response(file_param, req);
}
bool is_acquire_md5 = !req->param(ACQUIRE_MD5_PARAMETER).empty();
auto* rate_limit_group = ingest_binlog ? _rate_limit_group.get() : nullptr;
do_file_response(file_param, req, rate_limit_group, is_acquire_md5);
}
}

Expand Down
9 changes: 7 additions & 2 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const std::string kBinlogVersionParameter = "binlog_version";
const std::string kRowsetIdParameter = "rowset_id";
const std::string kSegmentIndexParameter = "segment_index";
const std::string kSegmentIndexIdParameter = "segment_index_id";
const std::string kAcquireMD5Parameter = "acquire_md5";

// get http param, if no value throw exception
const auto& get_http_param(HttpRequest* req, const std::string& param_name) {
Expand Down Expand Up @@ -102,12 +103,14 @@ void handle_get_binlog_info(HttpRequest* req) {
void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_file_path;
bool is_acquire_md5 = false;
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(tablet_id);
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
Expand All @@ -128,14 +131,15 @@ void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rat
LOG(WARNING) << "file not exist, file path: " << segment_file_path;
return;
}
do_file_response(segment_file_path, req, rate_limit_group);
do_file_response(segment_file_path, req, rate_limit_group, is_acquire_md5);
}

/// handle get segment index file, need tablet_id, rowset_id, segment_index && segment_index_id
void handle_get_segment_index_file(HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_index_file_path;
bool is_acquire_md5 = false;
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(tablet_id);
Expand All @@ -144,6 +148,7 @@ void handle_get_segment_index_file(HttpRequest* req,
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
segment_index_file_path =
tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id);
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
Expand All @@ -164,7 +169,7 @@ void handle_get_segment_index_file(HttpRequest* req,
LOG(WARNING) << "file not exist, file path: " << segment_index_file_path;
return;
}
do_file_response(segment_index_file_path, req, rate_limit_group);
do_file_response(segment_index_file_path, req, rate_limit_group, is_acquire_md5);
}

void handle_get_rowset_meta(HttpRequest* req) {
Expand Down
Loading

0 comments on commit 07cd25e

Please sign in to comment.