Skip to content

Commit

Permalink
[case] Load json data with enable_simdjson_reader=false (#26601)
Browse files Browse the repository at this point in the history
  • Loading branch information
HowardQin authored Nov 16, 2023
1 parent f10ab4e commit 5498917
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 44 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ DEFINE_Validator(file_cache_type, [](std::string_view config) -> bool {
DEFINE_Int32(s3_transfer_executor_pool_size, "2");

DEFINE_Bool(enable_time_lut, "true");
DEFINE_Bool(enable_simdjson_reader, "true");
DEFINE_mBool(enable_simdjson_reader, "true");

DEFINE_mBool(enable_query_like_bloom_filter, "true");
// number of s3 scanner thread pool size
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ DECLARE_mString(file_cache_type);
DECLARE_Int32(s3_transfer_executor_pool_size);

DECLARE_Bool(enable_time_lut);
DECLARE_Bool(enable_simdjson_reader);
DECLARE_mBool(enable_simdjson_reader);

DECLARE_mBool(enable_query_like_bloom_filter);
// number of s3 scanner thread pool size
Expand Down
147 changes: 146 additions & 1 deletion regression-test/data/load_p0/routine_load/test_routine_load.out

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions regression-test/data/load_p0/stream_load/test_json_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,38 @@ John 30 New York {"email":"[email protected]","phone":"+1-123-456-7890"}
11324 1321313082437 1678834024274 20230315 {"base_mac_value_null":24,"base_1_value_respiratoryrate":11,"base_3_value_heartrate":51,"base_3_status_onoroutofbed":3,"base_null_count_circulation":84,"base_1_status_onoroutofbed":3,"base_1_value_heartrate":51,"base_3_value_respiratoryrate":11,"base_3_value_bodyactivityenergy":43652,"base_2_value_respiratoryrate":11,"base_2_value_bodyactivityenergy":28831,"base_2_status_onoroutofbed":3,"base_1_value_bodyactivityenergy":56758,"base_2_value_heartrate":51,"tsltype":"properties","datatimestamp":1678834024274,"command":"0105","macaddress":"405EE1805029"}

-- !select --
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
5 hangzhou 2345675
6 nanjing 2345676
7 wuhan 2345677
8 chengdu 2345678
9 xian 2345679
200 changsha 3456789

-- !select --
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
200 changsha 3456789

-- !select --
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
200 changsha 3456789

-- !select26 --
10 1454547
20 1244264
30 528369
40 594201
50 594201
60 2345672
70 2345673
80 2345674
90 2345675
100 2345676
200 755

Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,105 @@ suite("test_routine_load","p0") {
}
}
}

// disable_simdjson_reader and load json
i = 0
if (enabled != null && enabled.equalsIgnoreCase("true")) {
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

def set_be_param = { paramName, paramValue ->
// for eache be node, set paramName=paramValue
for (String id in backendId_to_backendIP.keySet()) {
def beIp = backendId_to_backendIP.get(id)
def bePort = backendId_to_backendHttpPort.get(id)
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue))
assertTrue(out.contains("OK"))
}
}

try {
set_be_param.call("enable_simdjson_reader", "false")

for (String tableName in tables) {
sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text

def name = "routine_load_" + tableName
sql """
CREATE ROUTINE LOAD ${jobs[i]} ON ${name}
COLUMNS(${columns[i]})
PROPERTIES
(
"format" = "json",
"jsonpaths" = '${jsonpaths[i]}',
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${jsonTopic[i]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
i++
}

i = 0
for (String tableName in tables) {
while (true) {
sleep(1000)
def res = sql "show routine load for ${jobs[i]}"
def state = res[0][8].toString()
if (state == "NEED_SCHEDULE") {
continue;
}
log.info("reason of state changed: ${res[0][17].toString()}".toString())
assertEquals(res[0][8].toString(), "RUNNING")
break;
}

def count = 0
def tableName1 = "routine_load_" + tableName
while (true) {
def res = sql "select count(*) from ${tableName1}"
def state = sql "show routine load for ${jobs[i]}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}

if (i <= 3) {
qt_disable_simdjson_reader "select * from ${tableName1} order by k00,k01"
} else {
qt_disable_simdjson_reader "select * from ${tableName1} order by k00"
}

sql "stop routine load for ${jobs[i]}"
i++
}
} finally {
set_be_param.call("enable_simdjson_reader", "true")
for (String tableName in tables) {
sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
}
}
}


// TODO: need update kafka script
// i = 0
Expand Down
Loading

0 comments on commit 5498917

Please sign in to comment.