Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](multi-table) fix unknown source slot descriptor when load multi table #25762

Merged
merged 4 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,19 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
fragment_context->set_is_report_success(request.query_options.is_report_success);
}

auto* desc_tbl = _query_ctx->desc_tbl;
_runtime_state->set_desc_tbl(desc_tbl);
if (request.is_simplified_param) {
_desc_tbl = _query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(
sollhui marked this conversation as resolved.
Show resolved Hide resolved
DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl));
}
_runtime_state->set_desc_tbl(_desc_tbl);

// 2. Create ExecNode to build pipeline with PipelineFragmentContext
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
ExecNode::create_tree(_runtime_state.get(), _runtime_state->obj_pool(),
request.fragment.plan, *desc_tbl, &_root_plan));
request.fragment.plan, *_desc_tbl, &_root_plan));

// Set senders of exchange nodes before pipeline build
std::vector<ExecNode*> exch_nodes;
Expand Down Expand Up @@ -311,7 +317,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, idx, _root_plan->row_desc(),
_runtime_state.get(), &_sink, *desc_tbl));
_runtime_state.get(), &_sink, *_desc_tbl));
}

_root_pipeline = fragment_context->add_pipeline();
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
// profile reporting-related
report_status_callback _report_status_cb;

DescriptorTbl* _desc_tbl;

private:
std::vector<std::unique_ptr<PipelineTask>> _tasks;
bool _group_commit;
Expand Down
14 changes: 10 additions & 4 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,14 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state->set_load_job_id(request.load_job_id);
}

auto* desc_tbl = _query_ctx->desc_tbl;
_runtime_state->set_desc_tbl(desc_tbl);
if (request.is_simplified_param) {
_desc_tbl = _query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(
DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl));
}
_runtime_state->set_desc_tbl(_desc_tbl);
_runtime_state->set_num_per_fragment_instances(request.num_senders);

// 2. Build pipelines with operators in this fragment.
Expand All @@ -215,7 +221,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs,
request, root_pipeline->output_row_desc(), _runtime_state.get(), *desc_tbl,
request, root_pipeline->output_row_desc(), _runtime_state.get(), *_desc_tbl,
root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
static_cast<void>(root_pipeline->set_sink(_sink));
Expand Down Expand Up @@ -402,7 +408,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_load_job_id(request.load_job_id);
}

_runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl);
_runtime_states[i]->set_desc_tbl(_desc_tbl);
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
Expand Down
14 changes: 10 additions & 4 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,19 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
}

// set up desc tbl
DescriptorTbl* desc_tbl = _query_ctx->desc_tbl;
_runtime_state->set_desc_tbl(desc_tbl);
if (request.is_simplified_param) {
_desc_tbl = _query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(
DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl));
}
_runtime_state->set_desc_tbl(_desc_tbl);

// set up plan
DCHECK(request.__isset.fragment);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree(
_runtime_state.get(), obj_pool(), request.fragment.plan, *desc_tbl, &_plan));
_runtime_state.get(), obj_pool(), request.fragment.plan, *_desc_tbl, &_plan));

// set #senders of exchange nodes before calling Prepare()
std::vector<ExecNode*> exch_nodes;
Expand Down Expand Up @@ -222,7 +228,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, params,
row_desc(), runtime_state(), &_sink, *desc_tbl));
row_desc(), runtime_state(), &_sink, *_desc_tbl));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state()));

RuntimeProfile* sink_profile = _sink->profile();
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ class PlanFragmentExecutor {

bool _group_commit = false;

DescriptorTbl* _desc_tbl;

ObjectPool* obj_pool() { return _runtime_state->obj_pool(); }

// typedef for TPlanFragmentExecParams.per_node_scan_ranges
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
routine_load_dup_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
routine_load_uniq_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
routine_load_mow_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
routine_load_dup_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"}
routine_load_uniq_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"}
routine_load_mow_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"}
4 changes: 2 additions & 2 deletions docker/thirdparties/run-thirdparties-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
local ip_host="$2"
local backup_dir=/home/work/pipline/backup_center

declare -a topics=("basic_data" "basic_array_data" "basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" "basic_array_data_timezone" "multi_table_csv")
declare -a topics=("basic_data" "basic_array_data" "basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" "basic_array_data_timezone" "multi_table_csv" "multi_table_csv1")

for topic in "${topics[@]}"; do
while IFS= read -r line; do
Expand All @@ -267,7 +267,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
done < "${ROOT}/docker-compose/kafka/scripts/${topic}.csv"
done

declare -a json_topics=("basic_data_json" "basic_array_data_json" "basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json")
declare -a json_topics=("basic_data_json" "basic_array_data_json" "basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json" "multi_table_json1")

for json_topic in "${json_topics[@]}"; do
echo ${json_topics}
Expand Down
14 changes: 13 additions & 1 deletion regression-test/data/load_p0/routine_load/test_routine_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -986,4 +986,16 @@
49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}

-- !sql_multi_table_one_data --
8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N
8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N

-- !sql_multi_table --
49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 {}

-- !sql_multi_table --
49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 {}

-- !sql_multi_table --
8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N

-- !sql_multi_table --
8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ suite("test_routine_load","p0") {
"dup_tbl_basic_multi_table",
]

def multiTables1 = [
"dup_tbl_basic",
"uniq_tbl_basic",
]

def jobs = [
"dup_tbl_basic_job",
"uniq_tbl_basic_job",
Expand Down Expand Up @@ -127,6 +132,11 @@ suite("test_routine_load","p0") {
"multi_table_json",
]

def multiTableJobName1 = [
"multi_table_csv1",
"multi_table_json1",
]

def formats = [
"csv",
"json",
Expand Down Expand Up @@ -980,4 +990,83 @@ suite("test_routine_load","p0") {
j++
}
}

if (enabled != null && enabled.equalsIgnoreCase("true")) {
def j = 0
for (String jobName in multiTableJobName1) {
try {
for (String tableName in multiTables1) {
sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
}

sql """
CREATE ROUTINE LOAD ${jobName}
COLUMNS TERMINATED BY "|"
PROPERTIES
(
"max_batch_interval" = "5",
"format" = "${formats[j]}",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${jobName}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"

i = 0
for (String tableName in multiTables1) {
while (true) {
sleep(1000)
def res = sql "show routine load for ${jobName}"
def state = res[0][8].toString()
if (state == "NEED_SCHEDULE") {
continue;
}
assertEquals(res[0][8].toString(), "RUNNING")
break;
}

def count = 0
def tableName1 = "routine_load_" + tableName
while (true) {
def res = sql "select count(*) from ${tableName1}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}

if (i <= 3) {
qt_sql_multi_table "select * from ${tableName1} order by k00,k01"
} else {
qt_sql_multi_table "select * from ${tableName1} order by k00"
}

i++
}
} finally {
sql "stop routine load for ${jobName}"
for (String tableName in multiTables1) {
sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
}
}
j++
}
}
}