Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/fix_multi_threaded_testing' into…
Browse files Browse the repository at this point in the history
… fix_multi_threaded_testing

# Conflicts:
#	regression-test/suites/schema_change_p0/test_unique_schema_key_change_modify.groovy
  • Loading branch information
cjj2010 committed Aug 28, 2024
2 parents 570b0e4 + fe71310 commit f296380
Show file tree
Hide file tree
Showing 277 changed files with 10,531 additions and 3,279 deletions.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,5 @@ header:
- "pytest/qe"
- "pytest/sys/data"
- "pytest/deploy/*.conf"
- "tools/jeprof"
comment: on-failure
19 changes: 10 additions & 9 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1009,13 +1009,6 @@ void report_task_callback(const TMasterInfo& master_info) {
}

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) {
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests at the same time
// and can not be processed.
if (config::report_random_wait) {
random_sleep(5);
}

TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.disks = true;
Expand Down Expand Up @@ -1081,8 +1074,16 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;

uint64_t report_version = s_report_version;
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
uint64_t report_version;
for (int i = 0; i < 5; i++) {
request.tablets.clear();
report_version = s_report_version;
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
if (report_version == s_report_version) {
break;
}
}

if (report_version < s_report_version) {
// TODO llj This can only reduce the possibility for report error, but can't avoid it.
// If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
Expand Down
10 changes: 7 additions & 3 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,13 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}
}

RETURN_IF_ERROR(sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet,
_base_tablet, _base_tablet_schema,
_new_tablet_schema));
st = sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet, _base_tablet,
_base_tablet_schema, _new_tablet_schema);
if (!st.ok()) {
return Status::InternalError(
"failed to process schema change on rowset, version=[{}-{}], status={}",
rs_reader->version().first, rs_reader->version().second, st.to_string());
}

RowsetSharedPtr new_rowset;
st = rowset_writer->build(new_rowset);
Expand Down
43 changes: 9 additions & 34 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "config.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "util/cpu_info.h"
Expand Down Expand Up @@ -124,8 +125,8 @@ DEFINE_Int64(max_sys_mem_available_low_water_mark_bytes, "6871947673");
DEFINE_Int64(memtable_limiter_reserved_memory_bytes, "838860800");

// The size of the memory that gc wants to release each time, as a percentage of the mem limit.
DEFINE_mString(process_minor_gc_size, "10%");
DEFINE_mString(process_full_gc_size, "20%");
DEFINE_mString(process_minor_gc_size, "5%");
DEFINE_mString(process_full_gc_size, "10%");

// If true, when the process does not exceed the soft mem limit, the query memory will not be limited;
// when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently
Expand All @@ -139,6 +140,8 @@ DEFINE_mBool(enable_stacktrace, "true");

DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648");

DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");

DEFINE_mBool(enable_memory_orphan_check, "false");

// The maximum time a thread waits for full GC. Currently only query will wait for full gc.
Expand Down Expand Up @@ -197,8 +200,6 @@ DEFINE_Int32(release_snapshot_worker_count, "5");
DEFINE_mBool(report_random_wait, "true");
// the interval time(seconds) for agent report tasks signature to FE
DEFINE_mInt32(report_task_interval_seconds, "10");
// the interval time(seconds) for refresh storage policy from FE
DEFINE_mInt32(storage_refresh_storage_policy_task_interval_seconds, "5");
// the interval time(seconds) for agent report disk state to FE
DEFINE_mInt32(report_disk_state_interval_seconds, "60");
// the interval time(seconds) for agent report olap table to FE
Expand Down Expand Up @@ -230,14 +231,8 @@ DEFINE_String(log_buffer_level, "");
// number of threads available to serve backend execution requests
DEFINE_Int32(be_service_threads, "64");

// interval between profile reports; in seconds
DEFINE_mInt32(status_report_interval, "5");
// The pipeline task has a high concurrency, therefore reducing its report frequency
DEFINE_mInt32(pipeline_status_report_interval, "10");
// if true, each disk will have a separate thread pool for scanner
DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, "true");
// the timeout of a work thread to wait the blocking priority queue to get a task
DEFINE_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500");
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1");
Expand All @@ -262,26 +257,18 @@ DEFINE_mInt64(thrift_client_retry_interval_ms, "1000");
// max message size of thrift request
// default: 100 * 1024 * 1024
DEFINE_mInt64(thrift_max_message_size, "104857600");
// max row count number for single scan range, used in segmentv1
DEFINE_mInt32(doris_scan_range_row_count, "524288");
// max bytes number for single scan range, used in segmentv2
DEFINE_mInt32(doris_scan_range_max_mb, "1024");
// max bytes number for single scan block, used in segmentv2
DEFINE_mInt32(doris_scan_block_max_mb, "67108864");
// single read execute fragment row number
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");

DEFINE_mInt64(column_dictionary_key_ratio_threshold, "0");
DEFINE_mInt64(column_dictionary_key_size_threshold, "0");
// memory_limitation_per_thread_for_schema_change_bytes unit bytes
DEFINE_mInt64(memory_limitation_per_thread_for_schema_change_bytes, "2147483648");
DEFINE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes, "100000000");

DEFINE_mInt32(cache_prune_interval_sec, "10");
DEFINE_mInt32(cache_periodic_prune_stale_sweep_sec, "300");
Expand Down Expand Up @@ -337,7 +324,6 @@ DEFINE_mBool(disable_storage_page_cache, "false");
DEFINE_mBool(disable_storage_row_cache, "true");
// whether to disable pk page cache feature in storage
DEFINE_Bool(disable_pk_storage_page_cache, "false");
DEFINE_Bool(enable_non_pipeline, "false");

// Cache for mow primary key storage page size
DEFINE_String(pk_storage_page_cache_limit, "10%");
Expand Down Expand Up @@ -552,8 +538,6 @@ DEFINE_Int32(fragment_mgr_asynic_work_pool_queue_size, "4096");

// Control the number of disks on the machine. If 0, this comes from the system settings.
DEFINE_Int32(num_disks, "0");
// The maximum number of the threads per disk is also the max queue depth per disk.
DEFINE_Int32(num_threads_per_disk, "0");
// The read size is the size of the reads sent to os.
// There is a trade off of latency and throughout, trying to keep disks busy but
// not introduce seeks. The literature seems to agree that with 8 MB reads, random
Expand Down Expand Up @@ -587,7 +571,7 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "100");

// After full gc, no longer full gc and minor gc during sleep.
// After minor gc, no minor gc during sleep, but full gc is possible.
DEFINE_mInt32(memory_gc_sleep_time_ms, "1000");
DEFINE_mInt32(memory_gc_sleep_time_ms, "500");

// Sleep time in milliseconds between memtbale flush mgr refresh iterations
DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5");
Expand Down Expand Up @@ -882,16 +866,9 @@ DEFINE_mInt32(string_type_length_soft_limit_bytes, "1048576");
DEFINE_Validator(string_type_length_soft_limit_bytes,
[](const int config) -> bool { return config > 0 && config <= 2147483643; });

DEFINE_mInt32(jsonb_type_length_soft_limit_bytes, "1048576");

DEFINE_Validator(jsonb_type_length_soft_limit_bytes,
[](const int config) -> bool { return config > 0 && config <= 2147483643; });

// Threshold of reading a small file into memory
DEFINE_mInt32(in_memory_file_size, "1048576"); // 1MB

// ParquetReaderWrap prefetch buffer size
DEFINE_Int32(parquet_reader_max_buffer_size, "50");
// Max size of parquet page header in bytes
DEFINE_mInt32(parquet_header_max_size_mb, "1");
// Max buffer size for parquet row group
Expand Down Expand Up @@ -1016,8 +993,6 @@ DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
// inverted index searcher cache size
DEFINE_String(inverted_index_searcher_cache_limit, "10%");
// set `true` to enable insert searcher into cache when write inverted index data
DEFINE_Bool(enable_write_index_searcher_cache, "true");
DEFINE_Bool(enable_inverted_index_cache_check_timestamp, "true");
DEFINE_Int32(inverted_index_fd_number_limit_percent, "40"); // 40%
DEFINE_Int32(inverted_index_query_cache_shards, "256");
Expand Down Expand Up @@ -1067,10 +1042,10 @@ DEFINE_mInt32(schema_cache_capacity, "1024");
DEFINE_mInt32(schema_cache_sweep_time_sec, "100");

// max number of segment cache, default -1 for backward compatibility fd_number*2/5
DEFINE_mInt32(segment_cache_capacity, "-1");
DEFINE_mInt32(estimated_num_columns_per_segment, "200");
DEFINE_Int32(segment_cache_capacity, "-1");
DEFINE_Int32(segment_cache_fd_percentage, "40");
DEFINE_mInt32(estimated_mem_per_column_reader, "1024");
DEFINE_mInt32(segment_cache_memory_percentage, "2");
DEFINE_Int32(segment_cache_memory_percentage, "2");

// enable feature binlog, default false
DEFINE_Bool(enable_feature_binlog, "false");
Expand Down
34 changes: 7 additions & 27 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ DECLARE_mBool(enable_stacktrace);
// if alloc failed using Doris Allocator, will print stacktrace in error log.
// if is -1, disable print stacktrace when alloc large memory.
DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
// when alloc memory larger than crash_in_alloc_large_memory_bytes will crash, default -1 means disabled.
// if you need a core dump to analyze large memory allocation,
// modify this parameter to crash when large memory allocation occur will help
DECLARE_mInt64(crash_in_alloc_large_memory_bytes);

// default is true. if any memory tracking in Orphan mem tracker will report error.
DECLARE_mBool(enable_memory_orphan_check);
Expand Down Expand Up @@ -252,8 +256,6 @@ DECLARE_Int32(release_snapshot_worker_count);
DECLARE_mBool(report_random_wait);
// the interval time(seconds) for agent report tasks signature to FE
DECLARE_mInt32(report_task_interval_seconds);
// the interval time(seconds) for refresh storage policy from FE
DECLARE_mInt32(storage_refresh_storage_policy_task_interval_seconds);
// the interval time(seconds) for agent report disk state to FE
DECLARE_mInt32(report_disk_state_interval_seconds);
// the interval time(seconds) for agent report olap table to FE
Expand Down Expand Up @@ -288,12 +290,7 @@ DECLARE_String(log_buffer_level);
DECLARE_Int32(be_service_threads);

// interval between profile reports; in seconds
DECLARE_mInt32(status_report_interval);
DECLARE_mInt32(pipeline_status_report_interval);
// if true, each disk will have a separate thread pool for scanner
DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk);
// the timeout of a work thread to wait the blocking priority queue to get a task
DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
Expand All @@ -313,26 +310,18 @@ DECLARE_mInt64(thrift_client_retry_interval_ms);
// max message size of thrift request
// default: 100 * 1024 * 1024
DECLARE_mInt64(thrift_max_message_size);
// max row count number for single scan range, used in segmentv1
DECLARE_mInt32(doris_scan_range_row_count);
// max bytes number for single scan range, used in segmentv2
DECLARE_mInt32(doris_scan_range_max_mb);
// max bytes number for single scan block, used in segmentv2
DECLARE_mInt32(doris_scan_block_max_mb);
// single read execute fragment row number
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
DECLARE_mInt32(min_bytes_in_scanner_queue);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);

DECLARE_mInt64(column_dictionary_key_ratio_threshold);
DECLARE_mInt64(column_dictionary_key_size_threshold);
// memory_limitation_per_thread_for_schema_change_bytes unit bytes
DECLARE_mInt64(memory_limitation_per_thread_for_schema_change_bytes);
DECLARE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes);

// all cache prune interval, used by GC and periodic thread.
DECLARE_mInt32(cache_prune_interval_sec);
Expand Down Expand Up @@ -391,7 +380,6 @@ DECLARE_Bool(disable_storage_page_cache);
DECLARE_mBool(disable_storage_row_cache);
// whether to disable pk page cache feature in storage
DECLARE_Bool(disable_pk_storage_page_cache);
DECLARE_Bool(enable_non_pipeline);

// Cache for mow primary key storage page size, it's seperated from
// storage_page_cache_limit
Expand Down Expand Up @@ -607,8 +595,6 @@ DECLARE_Int32(fragment_mgr_asynic_work_pool_queue_size);

// Control the number of disks on the machine. If 0, this comes from the system settings.
DECLARE_Int32(num_disks);
// The maximum number of the threads per disk is also the max queue depth per disk.
DECLARE_Int32(num_threads_per_disk);
// The read size is the size of the reads sent to os.
// There is a trade off of latency and throughout, trying to keep disks busy but
// not introduce seeks. The literature seems to agree that with 8 MB reads, random
Expand Down Expand Up @@ -943,13 +929,9 @@ DECLARE_String(rpc_load_balancer);
// so we set a soft limit, default is 1MB
DECLARE_mInt32(string_type_length_soft_limit_bytes);

DECLARE_mInt32(jsonb_type_length_soft_limit_bytes);

// Threshold fo reading a small file into memory
DECLARE_mInt32(in_memory_file_size);

// ParquetReaderWrap prefetch buffer size
DECLARE_Int32(parquet_reader_max_buffer_size);
// Max size of parquet page header in bytes
DECLARE_mInt32(parquet_header_max_size_mb);
// Max buffer size for parquet row group
Expand Down Expand Up @@ -1070,8 +1052,6 @@ DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);
DECLARE_mInt32(inverted_index_cache_stale_sweep_time_sec);
// inverted index searcher cache size
DECLARE_String(inverted_index_searcher_cache_limit);
// set `true` to enable insert searcher into cache when write inverted index data
DECLARE_Bool(enable_write_index_searcher_cache);
DECLARE_Bool(enable_inverted_index_cache_check_timestamp);
DECLARE_Int32(inverted_index_fd_number_limit_percent); // 50%
DECLARE_Int32(inverted_index_query_cache_shards);
Expand Down Expand Up @@ -1120,10 +1100,10 @@ DECLARE_mInt32(schema_cache_capacity);
DECLARE_mInt32(schema_cache_sweep_time_sec);

// max number of segment cache
DECLARE_mInt32(segment_cache_capacity);
DECLARE_mInt32(estimated_num_columns_per_segment);
DECLARE_mInt32(estimated_mem_per_column_reader);
DECLARE_Int32(segment_cache_capacity);
DECLARE_Int32(segment_cache_fd_percentage);
DECLARE_Int32(segment_cache_memory_percentage);
DECLARE_mInt32(estimated_mem_per_column_reader);

// enable binlog
DECLARE_Bool(enable_feature_binlog);
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,11 @@ static Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key,
}
case TExprNodeType::NULL_LITERAL: {
// insert a null literal
if (!column->is_nullable()) {
// https://github.com/apache/doris/pull/39449 have forbid this cause. always add this check as protective measures
return Status::InternalError("The column {} is not null, can't insert into NULL value.",
part_key->first->get_by_position(pos).name);
}
column->insert_data(nullptr, 0);
break;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/http/action/check_rpc_channel_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace doris {
CheckRPCChannelAction::CheckRPCChannelAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type)
: HttpHandlerWithAuth(exec_env, hier, type) {}

void CheckRPCChannelAction::handle(HttpRequest* req) {
std::string req_ip = req->param("ip");
std::string req_port = req->param("port");
Expand Down
3 changes: 0 additions & 3 deletions be/src/http/action/check_rpc_channel_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,5 @@ class CheckRPCChannelAction : public HttpHandlerWithAuth {
~CheckRPCChannelAction() override = default;

void handle(HttpRequest* req) override;

private:
ExecEnv* _exec_env;
};
} // namespace doris
6 changes: 4 additions & 2 deletions be/src/http/action/download_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ Status DownloadAction::check_token(HttpRequest* req) {
return Status::NotAuthorized("token is not specified.");
}

if (token_str != _exec_env->token()) {
return Status::NotAuthorized("invalid token.");
const std::string& local_token = _exec_env->token();
if (token_str != local_token) {
LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token;
return Status::NotAuthorized("invalid token {}", token_str);
}

return Status::OK();
Expand Down
6 changes: 4 additions & 2 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,10 @@ Status DownloadBinlogAction::_check_token(HttpRequest* req) {
return Status::InternalError("token is not specified.");
}

if (token_str != _exec_env->token()) {
return Status::InternalError("invalid token.");
const std::string& local_token = _exec_env->token();
if (token_str != local_token) {
LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token;
return Status::NotAuthorized("invalid token {}", token_str);
}

return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/action/reset_rpc_channel_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
namespace doris {
ResetRPCChannelAction::ResetRPCChannelAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type)
: HttpHandlerWithAuth(exec_env, hier, type), _exec_env(exec_env) {}
: HttpHandlerWithAuth(exec_env, hier, type) {}

void ResetRPCChannelAction::handle(HttpRequest* req) {
std::string endpoints = req->param("endpoints");
if (iequal(endpoints, "all")) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/http/action/reset_rpc_channel_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,5 @@ class ResetRPCChannelAction : public HttpHandlerWithAuth {
~ResetRPCChannelAction() override = default;

void handle(HttpRequest* req) override;

private:
ExecEnv* _exec_env;
};
} // namespace doris
Loading

0 comments on commit f296380

Please sign in to comment.