Skip to content

Commit

Permalink
[branch-2.0](pick) support HTTP request with chunked transfer (#26520) (
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Nov 11, 2023
1 parent 275a75e commit 77d7d41
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 4 deletions.
21 changes: 18 additions & 3 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);

static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
static const string CHUNK = "chunked";

#ifdef BE_TEST
TStreamLoadPutResult k_stream_load_put_result;
#endif
Expand Down Expand Up @@ -276,6 +279,12 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
#endif
}

if (!http_req->header(HttpHeaders::TRANSFER_ENCODING).empty()) {
if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos) {
ctx->is_chunked_transfer = true;
}
}

if (!http_req->header(HTTP_TIMEOUT).empty()) {
try {
ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
Expand Down Expand Up @@ -351,9 +360,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_header_type(ctx->header_type);
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
ctx->body_bytes /* total_length */);
std::shared_ptr<io::StreamLoadPipe> pipe;
if (ctx->is_chunked_transfer) {
pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
} else {
pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */,
MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /* total_length */);
}
request.fileType = TFileType::FILE_STREAM;
ctx->body_sink = pipe;
ctx->pipe = pipe;
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* byt
return Status::OK();
}

// If _total_length == -1, this should be a Kafka routine load task,
// If _total_length == -1, this should be a Kafka routine load task or stream load with chunked transfer HTTP request,
// just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data.
// Otherwise, this should be a stream load task that needs to read the specified amount of data.
Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length) {
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class StreamLoadContext {
// only used to check if we receive whole body
size_t body_bytes = 0;
size_t receive_bytes = 0;
bool is_chunked_transfer = false;

int64_t txn_id = -1;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
2|-50|1|44|1
1|-50|1|2|1
3 changes: 3 additions & 0 deletions regression-test/data/load_p0/stream_load/test_stream_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,6 @@
33 9 8 2 1
38 1 9 2 5

-- !sql_chunked_transfer --
1 -50 1 2 1 \N
2 -50 1 44 1 \N
34 changes: 34 additions & 0 deletions regression-test/suites/load_p0/stream_load/test_stream_load.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -981,5 +981,39 @@ suite("test_stream_load", "p0") {
assertEquals(res[0][0], 1)
assertEquals(res[1][0], 1)

// test chunked transfer
def tableName16 = "test_chunked_transfer"
InetSocketAddress address = context.config.feHttpInetSocketAddress
String user = context.config.feHttpUser
String password = context.config.feHttpPassword
String db = context.config.getDbNameByFile(context.file)

try {
sql """ DROP TABLE IF EXISTS ${tableName16} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName16} (
`k1` bigint(20) NULL DEFAULT "1",
`k2` bigint(20) NULL ,
`v1` tinyint(4) NULL,
`v2` tinyint(4) NULL,
`v3` tinyint(4) NULL,
`v4` DATETIME NULL
) ENGINE=OLAP
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

def command = "curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H column_separator:| -H Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3 -T ${context.dataPath}/test_chunked_transfer.csv http://${context.config.feHttpAddress}/api/${db}/${tableName16}/_stream_load"
log.info("test chunked transfer command: ${command}")
def process = command.execute()
code = process.waitFor()
out = process.text
json2pc = parseJson(out)
log.info("test chunked transfer result: ${out}".toString())

qt_sql_chunked_transfer "select * from ${tableName16} order by k1"
} finally {
sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
}
}

0 comments on commit 77d7d41

Please sign in to comment.