Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pick](branch-2.0) Pick from branch-2.0 #27916

Merged
merged 50 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
e4b0303
[chore](case) Use correct insert stmt for cold heat separation case #…
hello-stephen Nov 27, 2023
9efff15
[enhance](S3) Print the error detail for every s3 operation (#27572) …
ByteYue Nov 27, 2023
c0e4e12
[nereids] fix stats error when using dateTime type filter #27571 (#27…
xzj7019 Nov 27, 2023
c75ceb1
[fix](planner)sort node should materialized required slots for itself…
starocean999 Nov 27, 2023
ec55ad0
[fix](Nereids) non-deterministic expression should not be constant (#…
morrySnow Nov 27, 2023
66042d0
[enhancement](stats) Add process for aggstate type #27640 (#27642)
Kikyou1997 Nov 27, 2023
f2b1c4e
[Fix](statistics)Fix bug and improve auto analyze. (#27626) (#27657)
Jibing-Li Nov 28, 2023
e953e8c
[profile](bugfix) should not cache profile content because the profil…
yiguolei Nov 28, 2023
eed37d2
[Enhance](fe) Support setting initial root password when FE firstly l…
WinkerDu Nov 28, 2023
15d7b17
[opt](plan) only lock olap table when query plan #27639 (#27656)
morningman Nov 28, 2023
f1aedd1
select coordinator node from user's tag when exec streaming load (#27…
wangbo Nov 28, 2023
fa04587
[fix](statistics)Need to recalculate health value when table row coun…
Jibing-Li Nov 28, 2023
fa5baeb
[fix](statistics)Fix sample min max npe bug #27702 (#27707)
Jibing-Li Nov 28, 2023
d791420
[Bug](join) try fix wrong _has_null_in_build_side setted (#27684) (#2…
BiteTheDDDDt Nov 28, 2023
f7b8021
[Fix](show-load)Show load npe(userinfo is null) (#27698) (#27719)
CalvinKirs Nov 28, 2023
e44f574
[pick](nereids)temporary partition is always pruned #27636 (#27722)
englefly Nov 28, 2023
7b17f24
[enhancement](stats) limit bq cap size for analyze task #27685 (#27687)
Kikyou1997 Nov 28, 2023
008b95c
[improvement](statistics) Add config for the threshold of column coun…
Jibing-Li Nov 28, 2023
8fd4aa2
[doc](fix) k8s operator docs fix to 2.0 (#27476)
catpineapple Nov 29, 2023
dd20b15
[Improvement](planner)support select tablets with nereids optimize #2…
Jibing-Li Nov 29, 2023
1ee2750
[FIX](complextype)fix complex type hash equals (#27743)
amorynan Nov 29, 2023
b5eb1f0
[fix](statistics) Fix show auto analyze missing jobs bug (#27761)
Jibing-Li Nov 29, 2023
86d179c
[bugfix](topn) fix coredump in copy_column_data_to_block when nullabl…
xiaokang Nov 29, 2023
46c1b7a
[opt](stats) Use escape rather than base64 for min/max value #27746 (…
Kikyou1997 Nov 29, 2023
4f15abd
[refactor](http) disable snapshot and get_log_file api (#27724) (#27770)
morningman Nov 29, 2023
2643671
[branch-2.0](pick 27738) Warning log to trace send fragment #27738 (#…
zhiqiang-hhhh Nov 30, 2023
1ae45cf
[branch-2.0](pick #27771) Add more detail msg for waitRPC exception (…
zhiqiang-hhhh Nov 30, 2023
07ad06d
[Bug](pipeline) prevent PipelineFragmentContext destruct early (#27790)
BiteTheDDDDt Nov 30, 2023
77f3052
[deps](compression) Opt gzip decompress by libdeflate on X86 and X86_…
kaka11chen Nov 30, 2023
5d98711
[FIX](case)fix case truncate table first #27792
amorynan Nov 30, 2023
3026226
[doc](stats) add auto_analyze_table_width_threshold description. (#27…
Jibing-Li Nov 30, 2023
5e9404a
[fix](bdbje) Fix bdbje logging level not work (#27597) (#27788)
SWJTU-ZhangLei Nov 30, 2023
5da3ba0
[Opt](compression) Opt gzip decompress by libdeflate on X86 and X86_6…
kaka11chen Nov 30, 2023
dbd4012
[branch-2.0](fix) Fix broken exception message #27836
zhiqiang-hhhh Dec 1, 2023
7da0b14
[Bug](func) coredump in equal for null in function (#27843)
HappenLee Dec 1, 2023
455fc07
[minor](stats) Update olap table row count after analyze (#27858)
Kikyou1997 Dec 1, 2023
82ca1c4
[fix](stats)min and max return NaN when table is empty (#27863)
Jibing-Li Dec 1, 2023
d3ecff9
[minor](stats) Throw error when sync analyze failed (#27846)
Kikyou1997 Dec 1, 2023
3135f5f
[fix](stats) Don't save colToPartitions anymore to save mem (#27880)
Kikyou1997 Dec 1, 2023
b51dc70
[fix](nereids) set operation's result type is wrong if decimal overfl…
starocean999 Dec 1, 2023
4890d40
[Config] Modify the default value of tablet_schema_cache_recycle_inte…
Lchangliang Dec 1, 2023
175868d
[fix](like_func) incorrect result of like with 'NO_BACKSLASH_ESCAPES'…
mrhhsg Dec 1, 2023
b1fbdfd
[fix](fe) Fix show frontends npt in some situations (#27295) (#27789)
SWJTU-ZhangLei Dec 1, 2023
3a6efdb
[branch-2.0](fix) Fix extremely high CPU usage caused by rf merge #27…
zhiqiang-hhhh Dec 1, 2023
1e14a92
[fix](stacktrace) ignore stacktrace for error code INVALID_ARGUMENT I…
xiaokang Dec 1, 2023
ab33103
[opt](nereids) Branch-2.0: remove partition & histogram from col stat…
englefly Dec 2, 2023
8a385c8
[pick](Nereids) temporary partition is selected only if user manually…
englefly Dec 2, 2023
27731da
[fix](multi-catalog)support the max compute partition prune (#27154) …
wsjz Dec 3, 2023
264735b
[fix](Nereids) should not push down project to the nullable side of o…
starocean999 Dec 3, 2023
8b66024
fix compile
eldenmoon Dec 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ set(COMMON_THIRDPARTY
xml2
lzma
simdjson
deflate
)

if ((ARCH_AMD64 OR ARCH_AARCH64) AND OS_LINUX)
Expand Down
6 changes: 6 additions & 0 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,9 @@ if (OS_MACOSX)
add_library(intl STATIC IMPORTED)
set_target_properties(intl PROPERTIES IMPORTED_LOCATION "${THIRDPARTY_DIR}/lib/libintl.a")
endif()

# Only used on x86 or x86_64
if ("${CMAKE_BUILD_TARGET_ARCH}" STREQUAL "x86" OR "${CMAKE_BUILD_TARGET_ARCH}" STREQUAL "x86_64")
add_library(deflate STATIC IMPORTED)
set_target_properties(deflate PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libdeflate.a)
endif()
3 changes: 2 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ DEFINE_Int32(group_commit_insert_threads, "10");

DEFINE_mInt32(scan_thread_nice_value, "0");

DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600");

DEFINE_Bool(exit_on_exception, "false");

Expand All @@ -1113,6 +1113,7 @@ DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048");
// then the new created tablet will not locate in the high use disk.
// range: 0 ~ 100
DEFINE_mInt32(disk_diff_usage_percentage_for_create_tablet, "20");
DEFINE_Bool(enable_snapshot_action, "false");

// clang-format off
#ifdef BE_TEST
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,8 @@ DECLARE_mInt32(variant_max_merged_tablet_schema_size);
// then the new created tablet will not locate in the high use disk.
// range: 0 ~ 100
DECLARE_mInt32(disk_diff_usage_percentage_for_create_tablet);
// whether to enable /api/snapshot api
DECLARE_Bool(enable_snapshot_action);

#ifdef BE_TEST
// test s3
Expand Down
9 changes: 6 additions & 3 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,10 @@ E(INVERTED_INDEX_NO_TERMS, -6005);
E(INVERTED_INDEX_RENAME_FILE_FAILED, -6006);
E(INVERTED_INDEX_EVALUATE_SKIPPED, -6007);
E(INVERTED_INDEX_BUILD_WAITTING, -6008);
E(KEY_NOT_FOUND, -6009);
E(KEY_ALREADY_EXISTS, -6010);
E(ENTRY_NOT_FOUND, -6011);
E(INVERTED_INDEX_NOT_IMPLEMENTED, -6009);
E(KEY_NOT_FOUND, -7000);
E(KEY_ALREADY_EXISTS, -7001);
E(ENTRY_NOT_FOUND, -7002);
#undef E
} // namespace ErrorCode

Expand All @@ -300,6 +301,7 @@ constexpr bool capture_stacktrace(int code) {
&& code != ErrorCode::SEGCOMPACTION_INIT_READER
&& code != ErrorCode::SEGCOMPACTION_INIT_WRITER
&& code != ErrorCode::SEGCOMPACTION_FAILED
&& code != ErrorCode::INVALID_ARGUMENT
&& code != ErrorCode::INVERTED_INDEX_INVALID_PARAMETERS
&& code != ErrorCode::INVERTED_INDEX_NOT_SUPPORTED
&& code != ErrorCode::INVERTED_INDEX_CLUCENE_ERROR
Expand All @@ -308,6 +310,7 @@ constexpr bool capture_stacktrace(int code) {
&& code != ErrorCode::INVERTED_INDEX_NO_TERMS
&& code != ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED
&& code != ErrorCode::INVERTED_INDEX_BUILD_WAITTING
&& code != ErrorCode::INVERTED_INDEX_NOT_IMPLEMENTED
&& code != ErrorCode::META_KEY_NOT_FOUND
&& code != ErrorCode::PUSH_VERSION_ALREADY_EXIST
&& code != ErrorCode::VERSION_NOT_EXIST
Expand Down
4 changes: 4 additions & 0 deletions be/src/http/action/snapshot_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ SnapshotAction::SnapshotAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
: HttpHandlerWithAuth(exec_env, hier, type) {}

void SnapshotAction::handle(HttpRequest* req) {
if (!config::enable_snapshot_action) {
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "feature disabled");
return;
}
LOG(INFO) << "accept one request " << req->debug_string();
// Get tablet id
const std::string& tablet_id_str = req->param(TABLET_ID);
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ void PrefetchBuffer::prefetch_buffer() {
return;
}
if (!s.ok() && _offset < _reader->size()) {
// We should print the error msg since this buffer might not be accessed by the consumer
// which would result in the status being missed
LOG_WARNING("prefetch path {} failed, offset {}, error {}", _reader->path().native(),
_offset, s.to_string());
_prefetch_status = std::move(s);
}
_buffer_status = BufferStatus::PREFETCHED;
Expand Down
6 changes: 4 additions & 2 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
auto outcome = client->GetObject(request);
s3_bvar::s3_get_total << 1;
if (!outcome.IsSuccess()) {
return Status::IOError("failed to read from {}: {}", _path.native(),
outcome.GetError().GetMessage());
return Status::IOError("failed to read from {}: {}, exception {}, error code {}",
_path.native(), outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(),
outcome.GetError().GetResponseCode());
}
*bytes_read = outcome.GetResult().GetContentLength();
if (*bytes_read != bytes_req) {
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,10 @@ Status S3FileSystem::get_key(const Path& path, std::string* key) const {

template <typename AwsOutcome>
std::string S3FileSystem::error_msg(const std::string& key, const AwsOutcome& outcome) const {
return fmt::format("(endpoint: {}, bucket: {}, key:{}, {}), {}", _s3_conf.endpoint,
_s3_conf.bucket, key, outcome.GetError().GetExceptionName(),
outcome.GetError().GetMessage());
return fmt::format("(endpoint: {}, bucket: {}, key:{}, {}), {}, error code {}",
_s3_conf.endpoint, _s3_conf.bucket, key,
outcome.GetError().GetExceptionName(), outcome.GetError().GetMessage(),
outcome.GetError().GetResponseCode());
}

std::string S3FileSystem::error_msg(const std::string& key, const std::string& err) const {
Expand Down
38 changes: 25 additions & 13 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ Status S3FileWriter::_create_multi_upload_request() {
_upload_id = outcome.GetResult().GetUploadId();
return Status::OK();
}
return Status::IOError("failed to create multipart upload(bucket={}, key={}, upload_id={}): {}",
_bucket, _path.native(), _upload_id, outcome.GetError().GetMessage());
return Status::IOError(
"failed to create multipart upload(bucket={}, key={}, upload_id={}): {}, exception {}, "
"error code {}",
_bucket, _path.native(), _upload_id, outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode());
}

void S3FileWriter::_wait_until_finish(std::string_view task_name) {
Expand Down Expand Up @@ -171,8 +174,11 @@ Status S3FileWriter::abort() {
_aborted = true;
return Status::OK();
}
return Status::IOError("failed to abort multipart upload(bucket={}, key={}, upload_id={}): {}",
_bucket, _path.native(), _upload_id, outcome.GetError().GetMessage());
return Status::IOError(
"failed to abort multipart upload(bucket={}, key={}, upload_id={}): {}, exception {}, "
"error code {}",
_bucket, _path.native(), _upload_id, outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode());
}

Status S3FileWriter::close() {
Expand Down Expand Up @@ -281,9 +287,12 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) {
UploadPartOutcome upload_part_outcome = upload_part_callable.get();
if (!upload_part_outcome.IsSuccess()) {
auto s = Status::IOError(
"failed to upload part (bucket={}, key={}, part_num={}, up_load_id={}): {}",
"failed to upload part (bucket={}, key={}, part_num={}, up_load_id={}): {}, "
"exception {}, error code {}",
_bucket, _path.native(), part_num, _upload_id,
upload_part_outcome.GetError().GetMessage());
upload_part_outcome.GetError().GetMessage(),
upload_part_outcome.GetError().GetExceptionName(),
upload_part_outcome.GetError().GetResponseCode());
LOG_WARNING(s.to_string());
buf._on_failed(s);
return;
Expand Down Expand Up @@ -331,8 +340,11 @@ Status S3FileWriter::_complete() {

if (!compute_outcome.IsSuccess()) {
auto s = Status::IOError(
"failed to create complete multi part upload (bucket={}, key={}): {}", _bucket,
_path.native(), compute_outcome.GetError().GetMessage());
"failed to create complete multi part upload (bucket={}, key={}): {}, exception "
"{}, error code {}",
_bucket, _path.native(), compute_outcome.GetError().GetMessage(),
compute_outcome.GetError().GetExceptionName(),
compute_outcome.GetError().GetResponseCode());
LOG_WARNING(s.to_string());
return s;
}
Expand Down Expand Up @@ -371,12 +383,12 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
auto response = _client->PutObject(request);
s3_bvar::s3_put_total << 1;
if (!response.IsSuccess()) {
_st = Status::InternalError("Error: [{}:{}, responseCode:{}]",
response.GetError().GetExceptionName(),
response.GetError().GetMessage(),
static_cast<int>(response.GetError().GetResponseCode()));
buf._on_failed(_st);
_st = Status::InternalError(
"failed to put object (bucket={}, key={}), Error: [{}:{}, responseCode:{}]",
_bucket, _path.native(), response.GetError().GetExceptionName(),
response.GetError().GetMessage(), response.GetError().GetResponseCode());
LOG(WARNING) << _st;
buf._on_failed(_st);
return;
}
_bytes_written += buf.get_size();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/block_column_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void AndBlockColumnPredicate::evaluate_vec(vectorized::MutableColumns& block, ui
Status AndBlockColumnPredicate::evaluate(const std::string& column_name,
InvertedIndexIterator* iterator, uint32_t num_rows,
roaring::Roaring* bitmap) const {
return Status::NotSupported(
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_IMPLEMENTED>(
"Not Implemented evaluate with inverted index, please check the predicate");
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/block_column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class BlockColumnPredicate {
//evaluate predicate on inverted
virtual Status evaluate(const std::string& column_name, InvertedIndexIterator* iterator,
uint32_t num_rows, roaring::Roaring* bitmap) const {
return Status::NotSupported(
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_IMPLEMENTED>(
"Not Implemented evaluate with inverted index, please check the predicate");
}
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ Status SegmentIterator::_apply_inverted_index_on_block_column_predicate(
return res;
} else {
//TODO:mock until AndBlockColumnPredicate evaluate is ok.
if (res.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) {
if (res.code() == ErrorCode::INVERTED_INDEX_NOT_IMPLEMENTED) {
return Status::OK();
}
LOG(WARNING) << "failed to evaluate index"
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ void TaskScheduler::_do_work(size_t index) {
}

void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) {
// close_a_pipeline may delete fragment context and will core in some defer
// code, because the defer code will access fragment context it self.
std::shared_ptr<PipelineFragmentContext> lock_for_context =
task->fragment_context()->shared_from_this();

if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
_blocked_task_scheduler->add_blocked_task(task);
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde
_table(tdesc.mcTable.table),
_access_key(tdesc.mcTable.access_key),
_secret_key(tdesc.mcTable.secret_key),
_partition_spec(tdesc.mcTable.partition_spec),
_public_access(tdesc.mcTable.public_access) {}

MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ class MaxComputeTableDescriptor : public TableDescriptor {
const std::string table() const { return _table; }
const std::string access_key() const { return _access_key; }
const std::string secret_key() const { return _secret_key; }
const std::string partition_spec() const { return _partition_spec; }
const std::string public_access() const { return _public_access; }

private:
Expand All @@ -247,7 +246,6 @@ class MaxComputeTableDescriptor : public TableDescriptor {
std::string _table;
std::string _access_key;
std::string _secret_key;
std::string _partition_spec;
std::string _public_access;
};

Expand Down
7 changes: 3 additions & 4 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/brpc_client_cache.h"
#include "util/spinlock.h"

namespace doris {

Expand Down Expand Up @@ -227,7 +226,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
auto filter_id = runtime_filter_desc->filter_id;
// LOG(INFO) << "entity filter id:" << filter_id;
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, -1, false);
_filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<SpinLock>()});
_filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<std::mutex>()});
return Status::OK();
}

Expand All @@ -249,7 +248,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
auto filter_id = runtime_filter_desc->filter_id;
// LOG(INFO) << "entity filter id:" << filter_id;
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options);
_filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<SpinLock>()});
_filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<std::mutex>()});
return Status::OK();
}

Expand Down Expand Up @@ -323,7 +322,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
// iter->second = pair{CntlVal,SpinLock}
cntVal = iter->second.first;
{
std::lock_guard<SpinLock> l(*iter->second.second);
std::lock_guard<std::mutex> l(*iter->second.second);
MergeRuntimeFilterParams params(request, attach_data);
ObjectPool* pool = cntVal->pool.get();
RuntimeFilterWrapperHolder holder;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class RuntimeFilterMergeControllerEntity {
std::shared_mutex _filter_map_mutex;
std::shared_ptr<MemTracker> _mem_tracker;
using CntlValwithLock =
std::pair<std::shared_ptr<RuntimeFilterCntlVal>, std::unique_ptr<SpinLock>>;
std::pair<std::shared_ptr<RuntimeFilterCntlVal>, std::unique_ptr<std::mutex>>;
std::map<int, CntlValwithLock> _filter_map;
RuntimeState* _state;
bool _opt_remote_rf = true;
Expand Down
28 changes: 26 additions & 2 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,22 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
}
const auto& fragment_list = t_request.paramsList;
MonotonicStopWatch timer;
timer.start();

for (const TExecPlanFragmentParams& params : t_request.paramsList) {
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
}

timer.stop();
double cost_secs = static_cast<double>(timer.elapsed_time()) / 1000000000ULL;
if (cost_secs > 5) {
LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it costs too much",
fragment_list.size(), print_id(fragment_list.front().params.query_id),
cost_secs);
}

return Status::OK();
} else if (version == PFragmentRequestVersion::VERSION_3) {
TPipelineFragmentParamsList t_request;
Expand All @@ -474,9 +486,21 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
}

for (const TPipelineFragmentParams& params : t_request.params_list) {
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
const auto& fragment_list = t_request.params_list;
MonotonicStopWatch timer;
timer.start();

for (const TPipelineFragmentParams& fragment : fragment_list) {
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment));
}

timer.stop();
double cost_secs = static_cast<double>(timer.elapsed_time()) / 1000000000ULL;
if (cost_secs > 5) {
LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it costs too much",
fragment_list.size(), print_id(fragment_list.front().query_id), cost_secs);
}

return Status::OK();
} else {
return Status::InternalError("invalid version");
Expand Down
Loading