From 77d7d41c0c3838e03f24b83f853f4b9b4cab39cc Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Sat, 11 Nov 2023 15:31:18 +0800 Subject: [PATCH] [branch-2.0](pick) support HTTP request with chunked transfer (#26520) (#26785) --- be/src/http/action/stream_load.cpp | 21 ++++++++++-- be/src/io/fs/stream_load_pipe.cpp | 2 +- .../runtime/stream_load/stream_load_context.h | 1 + .../stream_load/test_chunked_transfer.csv | 2 ++ .../load_p0/stream_load/test_stream_load.out | 3 ++ .../stream_load/test_stream_load.groovy | 34 +++++++++++++++++++ 6 files changed, 59 insertions(+), 4 deletions(-) create mode 100644 regression-test/data/load_p0/stream_load/test_chunked_transfer.csv diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 341c5ed3f2149f..d8c49f7c147cd2 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -75,6 +75,9 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit:: DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS); +static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024; +static const string CHUNK = "chunked"; + #ifdef BE_TEST TStreamLoadPutResult k_stream_load_put_result; #endif @@ -276,6 +279,12 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrheader(HttpHeaders::TRANSFER_ENCODING).empty()) { + if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos) { + ctx->is_chunked_transfer = true; + } + } + if (!http_req->header(HTTP_TIMEOUT).empty()) { try { ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT)); @@ -351,9 +360,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_header_type(ctx->header_type); request.__set_loadId(ctx->id.to_thrift()); if (ctx->use_streaming) { - auto pipe = std::make_shared( - io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - ctx->body_bytes /* total_length */); + std::shared_ptr pipe; + if (ctx->is_chunked_transfer) { + pipe = std::make_shared( + io::kMaxPipeBufferedBytes /* max_buffered_bytes */); + } else { + pipe = std::make_shared( + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, + MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /* total_length */); + } request.fileType = TFileType::FILE_STREAM; ctx->body_sink = pipe; ctx->pipe = pipe; diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index 23a4e3c6044cdf..a1d5fbe90b21a9 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -89,7 +89,7 @@ Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* byt return Status::OK(); } -// If _total_length == -1, this should be a Kafka routine load task, +// If _total_length == -1, this should be a Kafka routine load task or stream load with chunked transfer HTTP request, // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. // Otherwise, this should be a stream load task that needs to read the specified amount of data. Status StreamLoadPipe::read_one_message(std::unique_ptr* data, size_t* length) { diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 0e004b12f55f1e..0e821956470901 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -154,6 +154,7 @@ class StreamLoadContext { // only used to check if we receive whole body size_t body_bytes = 0; size_t receive_bytes = 0; + bool is_chunked_transfer = false; int64_t txn_id = -1; diff --git a/regression-test/data/load_p0/stream_load/test_chunked_transfer.csv b/regression-test/data/load_p0/stream_load/test_chunked_transfer.csv new file mode 100644 index 00000000000000..26831f07e769c4 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_chunked_transfer.csv @@ -0,0 +1,2 @@ +2|-50|1|44|1 +1|-50|1|2|1 diff --git a/regression-test/data/load_p0/stream_load/test_stream_load.out b/regression-test/data/load_p0/stream_load/test_stream_load.out index 505b3110cf2253..5619c0a7c26297 100644 --- a/regression-test/data/load_p0/stream_load/test_stream_load.out +++ b/regression-test/data/load_p0/stream_load/test_stream_load.out @@ -105,3 +105,6 @@ 33 9 8 2 1 38 1 9 2 5 +-- !sql_chunked_transfer -- +1 -50 1 2 1 \N +2 -50 1 44 1 \N diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index 0884330f2d3b50..1b6376bdb6c751 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -981,5 +981,39 @@ suite("test_stream_load", "p0") { assertEquals(res[0][0], 1) assertEquals(res[1][0], 1) + // test chunked transfer + def tableName16 = "test_chunked_transfer" + InetSocketAddress address = context.config.feHttpInetSocketAddress + String user = context.config.feHttpUser + String password = context.config.feHttpPassword + String db = context.config.getDbNameByFile(context.file) + + try { + sql """ DROP TABLE IF EXISTS ${tableName16} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName16} ( + `k1` bigint(20) NULL DEFAULT "1", + `k2` bigint(20) NULL , + `v1` tinyint(4) NULL, + `v2` tinyint(4) NULL, + `v3` tinyint(4) NULL, + `v4` DATETIME NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def command = "curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H column_separator:| -H Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3 -T ${context.dataPath}/test_chunked_transfer.csv http://${context.config.feHttpAddress}/api/${db}/${tableName16}/_stream_load" + log.info("test chunked transfer command: ${command}") + def process = command.execute() + code = process.waitFor() + out = process.text + json2pc = parseJson(out) + log.info("test chunked transfer result: ${out}".toString()) + + qt_sql_chunked_transfer "select * from ${tableName16} order by k1" + } finally { + sql """ DROP TABLE IF EXISTS ${tableName16} FORCE""" + } }