Skip to content

Commit

Permalink
Merge branch 'branch-2.1' into sync_unique_case
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 authored Jul 15, 2024
2 parents 7eb2194 + ce4983e commit 813f0f1
Show file tree
Hide file tree
Showing 51 changed files with 1,964 additions and 458 deletions.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ header:
- "**/*.sql"
- "**/*.lock"
- "**/*.out"
- "**/*.parquet"
- "docs/.markdownlintignore"
- "fe/fe-core/src/test/resources/data/net_snmp_normal"
- "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaLexer.g4"
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,8 @@ DEFINE_mInt64(fetch_remote_schema_rpc_timeout_ms, "60000");
// filter wrong data.
DEFINE_mBool(enable_parquet_page_index, "true");

DEFINE_mBool(ignore_not_found_file_in_external_table, "true");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,10 @@ DECLARE_Int64(min_row_group_size);

DECLARE_mBool(enable_parquet_page_index);

// Wheather to ignore not found file in external teble(eg, hive)
// Default is true, if set to false, the not found file will result in query failure.
DECLARE_mBool(ignore_not_found_file_in_external_table);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
86 changes: 62 additions & 24 deletions be/src/exec/es/es_scan_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,32 @@ ESScanReader::ESScanReader(const std::string& target,
std::string filter_path =
_doc_value_mode ? DOCVALUE_SCROLL_SEARCH_FILTER_PATH : SOURCE_SCROLL_SEARCH_FILTER_PATH;

// When shard_id is negative(-1), the request will be sent to ES without shard preference.
int32 shard_id = std::stoi(_shards);
if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
_exactly_once = true;
std::stringstream scratch;
// just send a normal search against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect
if (_type.empty()) {
// `terminate_after` and `size` can not be used together in scroll request of ES 8.x
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
if (shard_id < 0) {
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" << filter_path;
} else {
// `terminate_after` and `size` can not be used together in scroll request of ES 8.x
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
}
} else {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "terminate_after=" << props.at(KEY_TERMINATE_AFTER)
<< REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
if (shard_id < 0) {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "terminate_after=" << props.at(KEY_TERMINATE_AFTER) << "&"
<< filter_path;
} else {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "terminate_after=" << props.at(KEY_TERMINATE_AFTER)
<< REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
}
}
_search_url = scratch.str();
} else {
Expand All @@ -95,15 +108,27 @@ ESScanReader::ESScanReader(const std::string& target,
// scroll request for scanning
// add terminate_after for the first scroll to avoid decompress all postings list
if (_type.empty()) {
// `terminate_after` and `size` can not be used together in scroll request of ES 8.x
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path;
if (shard_id < 0) {
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< "scroll=" << _scroll_keep_alive << "&" << filter_path;
} else {
// `terminate_after` and `size` can not be used together in scroll request of ES 8.x
scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
<< "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path;
}
} else {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path << "&terminate_after=" << batch_size_str;
if (shard_id < 0) {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "scroll=" << _scroll_keep_alive << "&" << filter_path
<< "&terminate_after=" << batch_size_str;
} else {
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
<< "/_search?"
<< "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << filter_path << "&terminate_after=" << batch_size_str;
}
}
_init_scroll_url = scratch.str();
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path;
Expand All @@ -115,11 +140,13 @@ ESScanReader::~ESScanReader() {}

Status ESScanReader::open() {
_is_first = true;
// we do not enable set_fail_on_error for ES http request to get more detail error messages
bool set_fail_on_error = false;
if (_exactly_once) {
RETURN_IF_ERROR(_network_client.init(_search_url));
RETURN_IF_ERROR(_network_client.init(_search_url, set_fail_on_error));
LOG(INFO) << "search request URL: " << _search_url;
} else {
RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
RETURN_IF_ERROR(_network_client.init(_init_scroll_url, set_fail_on_error));
LOG(INFO) << "First scroll request URL: " << _init_scroll_url;
}
_network_client.set_basic_auth(_user_name, _passwd);
Expand All @@ -132,7 +159,8 @@ Status ESScanReader::open() {
Status status = _network_client.execute_post_request(_query, &_cached_response);
if (!status.ok() || _network_client.get_http_status() != 200) {
std::stringstream ss;
ss << "Failed to connect to ES server, errmsg is: " << status;
ss << "Failed to connect to ES server, errmsg is: " << status
<< ", response: " << _cached_response;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
Expand All @@ -155,7 +183,9 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
if (_exactly_once) {
return Status::OK();
}
RETURN_IF_ERROR(_network_client.init(_next_scroll_url));
// we do not enable set_fail_on_error for ES http request to get more detail error messages
bool set_fail_on_error = false;
RETURN_IF_ERROR(_network_client.init(_next_scroll_url, set_fail_on_error));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(_http_timeout_ms);
Expand All @@ -168,13 +198,15 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
long status = _network_client.get_http_status();
if (status == 404) {
LOG(WARNING) << "request scroll search failure 404["
<< ", response: " << (response.empty() ? "empty response" : response);
<< ", response: " << (response.empty() ? "empty response" : response)
<< "]";
return Status::InternalError("No search context found for {}", _scroll_id);
}
if (status != 200) {
LOG(WARNING) << "request scroll search failure["
<< "http status" << status
<< ", response: " << (response.empty() ? "empty response" : response);
<< "http status: " << status
<< ", response: " << (response.empty() ? "empty response" : response)
<< "]";
return Status::InternalError("request scroll search failure: {}",
(response.empty() ? "empty response" : response));
}
Expand Down Expand Up @@ -211,7 +243,9 @@ Status ESScanReader::close() {
}

std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH;
RETURN_IF_ERROR(_network_client.init(scratch_target));
// we do not enable set_fail_on_error for ES http request to get more detail error messages
bool set_fail_on_error = false;
RETURN_IF_ERROR(_network_client.init(scratch_target, set_fail_on_error));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_method(DELETE);
_network_client.set_content_type("application/json");
Expand All @@ -222,9 +256,13 @@ Status ESScanReader::close() {
std::string response;
RETURN_IF_ERROR(_network_client.execute_delete_request(
ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response));
if (_network_client.get_http_status() == 200) {
long status = _network_client.get_http_status();
if (status == 200) {
return Status::OK();
} else {
LOG(WARNING) << "es_scan_reader delete scroll context failure["
<< "http status: " << status
<< ", response: " << (response.empty() ? "empty response" : response) << "]";
return Status::InternalError("es_scan_reader delete scroll context failure");
}
}
Expand Down
18 changes: 13 additions & 5 deletions be/src/exec/es/es_scroll_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,19 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>
} else {
size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str());
}
rapidjson::Value sort_node(rapidjson::kArrayType);
// use the scroll-scan mode for scan index documents
rapidjson::Value field("_doc", allocator);
sort_node.PushBack(field, allocator);
es_query_dsl.AddMember("sort", sort_node, allocator);

std::string shard_id;
if (properties.find(ESScanReader::KEY_SHARD) != properties.end()) {
shard_id = properties.at(ESScanReader::KEY_SHARD);
}
// To maintain consistency with the query, when shard_id is negative, do not add sort node in scroll request body.
if (!shard_id.empty() && std::stoi(shard_id) >= 0) {
rapidjson::Value sort_node(rapidjson::kArrayType);
// use the scroll-scan mode for scan index documents
rapidjson::Value field("_doc", allocator);
sort_node.PushBack(field, allocator);
es_query_dsl.AddMember("sort", sort_node, allocator);
}
// number of documents returned
es_query_dsl.AddMember("size", size, allocator);
rapidjson::StringBuffer buffer;
Expand Down
14 changes: 9 additions & 5 deletions be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ HttpClient::~HttpClient() {
}
}

Status HttpClient::init(const std::string& url) {
Status HttpClient::init(const std::string& url, bool set_fail_on_error) {
if (_curl == nullptr) {
_curl = curl_easy_init();
if (_curl == nullptr) {
Expand Down Expand Up @@ -94,10 +94,14 @@ Status HttpClient::init(const std::string& url) {
return Status::InternalError("fail to set CURLOPT_NOSIGNAL");
}
// set fail on error
code = curl_easy_setopt(_curl, CURLOPT_FAILONERROR, 1L);
if (code != CURLE_OK) {
LOG(WARNING) << "fail to set CURLOPT_FAILONERROR, msg=" << _to_errmsg(code);
return Status::InternalError("fail to set CURLOPT_FAILONERROR");
// When this option is set to `1L` (enabled), libcurl will return an error directly
// when encountering HTTP error codes (>= 400), without reading the body of the error response.
if (set_fail_on_error) {
code = curl_easy_setopt(_curl, CURLOPT_FAILONERROR, 1L);
if (code != CURLE_OK) {
LOG(WARNING) << "fail to set CURLOPT_FAILONERROR, msg=" << _to_errmsg(code);
return Status::InternalError("fail to set CURLOPT_FAILONERROR");
}
}
// set redirect
code = curl_easy_setopt(_curl, CURLOPT_FOLLOWLOCATION, 1L);
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class HttpClient {

// this function must call before other function,
// you can call this multiple times to reuse this object
Status init(const std::string& url);
Status init(const std::string& url, bool set_fail_on_error = true);

void set_method(HttpMethod method);

Expand Down
18 changes: 8 additions & 10 deletions be/src/pipeline/exec/table_function_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,14 @@ Status TableFunctionLocalState::get_expanded_block(RuntimeState* state,
if (skip_child_row = _is_inner_and_empty(); skip_child_row) {
continue;
}
if (p._fn_num == 1) {
_current_row_insert_times += _fns[0]->get_value(
columns[p._child_slots.size()],
state->batch_size() - columns[p._child_slots.size()]->size());
} else {
for (int i = 0; i < p._fn_num; i++) {
_fns[i]->get_value(columns[i + p._child_slots.size()]);
}
_current_row_insert_times++;
_fns[p._fn_num - 1]->forward();

DCHECK_LE(1, p._fn_num);
auto repeat_times = _fns[p._fn_num - 1]->get_value(
columns[p._child_slots.size() + p._fn_num - 1],
state->batch_size() - columns[p._child_slots.size()]->size());
_current_row_insert_times += repeat_times;
for (int i = 0; i < p._fn_num - 1; i++) {
_fns[i]->get_same_many_values(columns[i + p._child_slots.size()], repeat_times);
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ Status VFileScanner::prepare(
_convert_to_output_block_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
_empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT);
_not_found_file_counter =
ADD_COUNTER(_parent->_scanner_profile, "NotFoundFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT);
Expand All @@ -180,6 +182,8 @@ Status VFileScanner::prepare(
ADD_TIMER(_local_state->scanner_profile(), "FileScannerConvertOuputBlockTime");
_empty_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT);
_not_found_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "NotFoundFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT);
Expand Down Expand Up @@ -329,9 +333,9 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool*
// And the file may already be removed from storage.
// Just ignore not found files.
Status st = _get_next_reader();
if (st.is<ErrorCode::NOT_FOUND>()) {
if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
_cur_reader_eof = true;
COUNTER_UPDATE(_empty_file_counter, 1);
COUNTER_UPDATE(_not_found_file_counter, 1);
continue;
} else if (!st) {
return st;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class VFileScanner : public VScanner {
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;

Expand Down
18 changes: 8 additions & 10 deletions be/src/vec/exec/vtable_function_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,14 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
if (skip_child_row = _is_inner_and_empty(); skip_child_row) {
continue;
}
if (_fn_num == 1) {
_current_row_insert_times += _fns[0]->get_value(
columns[_child_slots.size()],
state->batch_size() - columns[_child_slots.size()]->size());
} else {
for (int i = 0; i < _fn_num; i++) {
_fns[i]->get_value(columns[i + _child_slots.size()]);
}
_current_row_insert_times++;
_fns[_fn_num - 1]->forward();

DCHECK_LE(1, _fn_num);
auto repeat_times = _fns[_fn_num - 1]->get_value(
columns[_child_slots.size()],
state->batch_size() - columns[_child_slots.size()]->size());
_current_row_insert_times += repeat_times;
for (int i = 0; i < _fn_num - 1; i++) {
_fns[i]->get_same_many_values(columns[i + _child_slots.size()], repeat_times);
}
}
}
Expand Down
13 changes: 2 additions & 11 deletions be/src/vec/exprs/table_function/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,8 @@ class TableFunction {
_cur_offset = 0;
}

virtual void get_value(MutableColumnPtr& column) = 0;

virtual int get_value(MutableColumnPtr& column, int max_step) {
max_step = std::max(1, std::min(max_step, (int)(_cur_size - _cur_offset)));
int i = 0;
for (; i < max_step && !eos(); i++) {
get_value(column);
forward();
}
return i;
}
virtual void get_same_many_values(MutableColumnPtr& column, int length = 0) = 0;
virtual int get_value(MutableColumnPtr& column, int max_step) = 0;

virtual Status close() { return Status::OK(); }

Expand Down
Loading

0 comments on commit 813f0f1

Please sign in to comment.