Skip to content

Commit

Permalink
Merge branch 'master' into fix_max_compute_replay
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz authored Nov 19, 2023
2 parents f791e48 + 767b5a1 commit 37bc705
Show file tree
Hide file tree
Showing 1,396 changed files with 38,218 additions and 26,905 deletions.
15 changes: 13 additions & 2 deletions .clang-tidy
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
---
Checks: |
-*,
clang-diagnostic-*,
clang-analyzer-*,
-*,
bugprone-redundant-branch-condition,
modernize-*,
-modernize-use-trailing-return-type,
-modernize-use-nodiscard,
-modernize-avoid-c-arrays,
misc-redundant-expression,
misc-unused-*,
-misc-unused-parameters,
readability-*,
-readability-identifier-length,
-readability-implicit-bool-conversion,
-readability-function-cognitive-complexity,
-readability-magic-numbers,
-readability-else-after-return,
-readability-inconsistent-declaration-parameter-name,
-readability-isolate-declaration,
-readability-named-parameter,
portability-simd-intrinsics,
performance-type-promotion-in-math-fn,
performance-faster-string-find,
performance-inefficient-algorithm,
performance-move-const-arg
WarningsAsErrors: '*'
CheckOptions:
- key: readability-function-size.LineThreshold
value: '80'
- key: readability-function-cognitive-complexity.Threshold
value: '50'

66 changes: 66 additions & 0 deletions .github/workflows/auto-pr-reply.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
name: Auto Reply to PR

on:
pull_request:
types: [opened]

jobs:
comment:
runs-on: ubuntu-latest
steps:
- name: Comment on PR
uses: peter-evans/create-or-update-comment@v1
with:
issue-number: ${{ github.event.pull_request.number }}
body: |
Thank you for your contribution to Apache Doris. Here are some **TODOs** after submitting your PR:
1. Trigger CI Pipeline
If your PR is ready, please **reply with `run buildall`**.
This command will automatically trigger all CI Pipelines.
Currently on the `master branch`, the following admission tests are **`REQUIRED`**:
- `License Check`: Check License Header
- `Clang Formatter`: Check BE code format
- `CheckStyle`: Check FE code format
- `P0 Regression (Doris Regression)`: P0 regression test
- `P0 Regression PipelineX (Doris Regression)`: P0 (with pipelinex) regression test
- `P1 Regression (Doris Regression)`: P1 regression test
- `External Regression (Doris External Regression)`: External table regression test
- `FE UT (Doris FE UT)`: FE unit test
- `BE UT (Doris BE UT)`: BE unit test
- `Build Broker`: Broker build
- `Build Documents`: Document build
- `ShellCheck`: Check Shell script format
- `clickbench-new (clickbench)`: Clickbench performance test
- `Build Third Party Libraries (Linux)`: Third-party library build (Linux)
- `Build Third Party Libraries (macOS)`: Third-party library build (macOS)
- `COMPILE (DORIS_COMPILE)`: Full compilation
- `Need_2_Approval`: Require approval from at least two reviewers
The code can only be merged after all the above pipelines have passed.
Besides, there are some other pipelines, but they are **`NOT`** required, meaning their pass or fail does not affect PR merging.
You can also reply with the following keywords to trigger specific pipelines individually:
- `run compile`: COMPILE (DORIS_COMPILE)
- `run p0`: P0 Regression
- `run p1`: P1 Regression
- `run feut`: FE UT
- `run beut`: BE UT
- `run external`: External Regression
- `run clickbench`: clickbench-new
- `run pipelinex_p0`: P0 Regression PipelineX
- `run arm`: P0 Regression (ARM pipeline)
- `run tpch`: tpch-sf100
2. Wait for Review
Before merging the code, it requires **`approval from at least two reviewers`**, and one of them must be a Committer of Apache Doris.
3. Merge the PR
After all pipelines and reviews are passed, a Committer will manually merge the code.
36 changes: 18 additions & 18 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ include (cmake/thirdparty.cmake)

find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin)

option(BUILD_JAVA OFF)
option(BUILD_CPP_TESTS OFF)
option(STOP_BUILD_ON_WARNING OFF)
option(BUILD_LIBHDFSPP OFF)
set(CMAKE_POLICY_DEFAULT_CMP0077 NEW)
set(BUILD_JAVA OFF)
set(BUILD_CPP_TESTS OFF)
set(STOP_BUILD_ON_WARNING OFF)
set(BUILD_LIBHDFSPP OFF)
SET(PROTOBUF_HOME "$ENV{DORIS_THIRDPARTY}/installed")
SET(SNAPPY_HOME "$ENV{DORIS_THIRDPARTY}/installed")
SET(LZ4_HOME "$ENV{DORIS_THIRDPARTY}/installed")
Expand Down
26 changes: 16 additions & 10 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ Status CgroupCpuCtl::init() {
_doris_cgroup_cpu_path = config::doris_cgroup_cpu_path;
if (_doris_cgroup_cpu_path.empty()) {
LOG(INFO) << "doris cgroup cpu path is not specify, path=" << _doris_cgroup_cpu_path;
return Status::InternalError("doris cgroup cpu path {} is not specify.",
_doris_cgroup_cpu_path);
return Status::InternalError<false>("doris cgroup cpu path {} is not specify.",
_doris_cgroup_cpu_path);
}

if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) {
LOG(ERROR) << "doris cgroup cpu path not exists, path=" << _doris_cgroup_cpu_path;
return Status::InternalError("doris cgroup cpu path {} not exists.",
_doris_cgroup_cpu_path);
return Status::InternalError<false>("doris cgroup cpu path {} not exists.",
_doris_cgroup_cpu_path);
}

if (_doris_cgroup_cpu_path.back() != '/') {
Expand All @@ -41,6 +41,12 @@ Status CgroupCpuCtl::init() {
return Status::OK();
}

void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit) {
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
*cpu_shares = this->_cpu_shares;
*cpu_hard_limit = this->_cpu_hard_limit;
}

void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) {
if (!_init_succ) {
return;
Expand Down Expand Up @@ -72,14 +78,14 @@ Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::st
int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR);
if (fd == -1) {
LOG(ERROR) << "open path failed, path=" << file_path;
return Status::InternalError("open path failed, path={}", file_path);
return Status::InternalError<false>("open path failed, path={}", file_path);
}

auto str = fmt::format("{}\n", value);
int ret = write(fd, str.c_str(), str.size());
if (ret == -1) {
LOG(ERROR) << msg << " write sys file failed";
return Status::InternalError("{} write sys file failed", msg);
return Status::InternalError<false>("{} write sys file failed", msg);
}
LOG(INFO) << msg << " success";
return Status::OK();
Expand All @@ -94,8 +100,8 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path;
return Status::InternalError("cgroup v1 mkdir query failed, path=",
_cgroup_v1_cpu_query_path);
return Status::InternalError<false>("cgroup v1 mkdir query failed, path=",
_cgroup_v1_cpu_query_path);
}
}

Expand All @@ -105,8 +111,8 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path;
return Status::InternalError("cgroup v1 mkdir workload group failed, path=",
_cgroup_v1_cpu_tg_path);
return Status::InternalError<false>("cgroup v1 mkdir workload group failed, path=",
_cgroup_v1_cpu_tg_path);
}
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class CgroupCpuCtl {

void update_cpu_soft_limit(int cpu_shares);

// for log
void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit);

protected:
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);

Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/topic_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ class TopicListener {
public:
virtual ~TopicListener() {}

virtual void handle_topic_info(const TPublishTopicRequest& topic_request) = 0;
virtual void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) = 0;
};
} // namespace doris
7 changes: 5 additions & 2 deletions be/src/agent/topic_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques
std::shared_lock lock(_listener_mtx);
LOG(INFO) << "begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
listener_pair.second->handle_topic_info(topic_request);
LOG(INFO) << "handle topic " << listener_pair.first << " succ";
if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) {
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
LOG(INFO) << "handle topic " << listener_pair.first << " succ";
}
}
}
} // namespace doris
40 changes: 33 additions & 7 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,44 @@

namespace doris {

void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest& topic_request) {
void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) {
std::set<uint64_t> current_wg_ids;
for (const TopicInfo& topic_info : topic_request.topic_list) {
if (topic_info.topic_type != doris::TTopicInfoType::type::WORKLOAD_GROUP) {
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.workload_group_info) {
continue;
}

int wg_id = 0;
auto iter2 = topic_info.info_map.find("id");
std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(), wg_id);
// 1 parse topicinfo to group info
taskgroup::TaskGroupInfo task_group_info;
Status ret = taskgroup::TaskGroupInfo::parse_topic_info(topic_info.workload_group_info,
&task_group_info);
if (!ret.ok()) {
LOG(INFO) << "parse topic info failed, tg_id=" << task_group_info.id
<< ", reason:" << ret.to_string();
continue;
}
current_wg_ids.insert(task_group_info.id);

// 2 update task group
auto tg = _exec_env->task_group_manager()->get_or_create_task_group(task_group_info);

// 3 set cpu soft hard limit switch
_exec_env->task_group_manager()->_enable_cpu_hard_limit.store(
task_group_info.enable_cpu_hard_limit);

// 4 create and update task scheduler
Status ret2 =
_exec_env->task_group_manager()->upsert_task_scheduler(&task_group_info, _exec_env);
if (!ret2.ok()) {
LOG(WARNING) << "upsert task sche failed, tg_id=" << task_group_info.id
<< ", reason=" << ret2.to_string();
}

current_wg_ids.insert(wg_id);
LOG(INFO) << "update task group success, tg info=" << tg->debug_string()
<< ", enable_cpu_hard_limit="
<< _exec_env->task_group_manager()->enable_cpu_hard_limit()
<< ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit;
}

_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/workload_group_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class WorkloadGroupListener : public TopicListener {
~WorkloadGroupListener() {}
WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {}

void handle_topic_info(const TPublishTopicRequest& topic_request) override;
void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) override;

private:
ExecEnv* _exec_env;
Expand Down
2 changes: 1 addition & 1 deletion be/src/apache-orc
2 changes: 1 addition & 1 deletion be/src/clucene
12 changes: 7 additions & 5 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ DEFINE_String(memory_mode, "moderate");
// defaults to bytes if no unit is given"
// must larger than 0. and if larger than physical memory size,
// it will be set to physical memory size.
DEFINE_String(mem_limit, "80%");
DEFINE_String(mem_limit, "90%");

// Soft memory limit as a fraction of hard memory limit.
DEFINE_Double(soft_mem_limit_frac, "0.9");
Expand Down Expand Up @@ -351,6 +351,9 @@ DEFINE_Int32(vertical_compaction_max_row_source_memory_mb, "200");
// In vertical compaction, max dest segment file size
DEFINE_mInt64(vertical_compaction_max_segment_size, "268435456");

// If enabled, segments will be flushed column by column
DEFINE_mBool(enable_vertical_segment_writer, "true");

// In ordered data compaction, min segment size for input rowset
DEFINE_mInt32(ordered_data_compaction_min_segment_size, "10485760");

Expand Down Expand Up @@ -880,7 +883,7 @@ DEFINE_Validator(file_cache_type, [](std::string_view config) -> bool {
DEFINE_Int32(s3_transfer_executor_pool_size, "2");

DEFINE_Bool(enable_time_lut, "true");
DEFINE_Bool(enable_simdjson_reader, "true");
DEFINE_mBool(enable_simdjson_reader, "true");

DEFINE_mBool(enable_query_like_bloom_filter, "true");
// number of s3 scanner thread pool size
Expand Down Expand Up @@ -1073,7 +1076,6 @@ DEFINE_Int16(bitmap_serialize_version, "1");
DEFINE_String(group_commit_replay_wal_dir, "./wal");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_sync_wal_batch, "10");
DEFINE_Bool(wait_internal_group_commit_finish, "false");

// the count of thread to group commit insert
Expand All @@ -1088,8 +1090,8 @@ DEFINE_Bool(exit_on_exception, "false");
DEFINE_Bool(enable_flush_file_cache_async, "true");

// cgroup
DEFINE_String(doris_cgroup_cpu_path, "");
DEFINE_Bool(enable_cgroup_cpu_soft_limit, "false");
DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");

DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");

Expand Down
Loading

0 comments on commit 37bc705

Please sign in to comment.