Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature-wip](parquet-vec) Support parquet scanner in vectorized engine #9433

Merged
merged 29 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
000bc5e
Support parquet scanner in vectorized engine
yinzhijian May 7, 2022
3c1bd53
code format
yinzhijian May 7, 2022
7f8ba23
fix compile problem
yinzhijian May 7, 2022
67064d6
add unittest for parquet vec
yinzhijian May 8, 2022
c515fdc
code format
yinzhijian May 8, 2022
e414bb4
code format
yinzhijian May 8, 2022
4ff3651
Merge branch 'dev.parquet_vec' of https://github.com/yinzhijian/incub…
yinzhijian May 8, 2022
4e3e21b
fix fe unittest & be parquet numeric overflow bug
yinzhijian May 9, 2022
64c8598
code format
yinzhijian May 9, 2022
5c3906c
Merge branch 'master' into dev.parquet_vec
yinzhijian May 9, 2022
cdc0aa8
fix complile problem
yinzhijian May 9, 2022
fd83065
adapter new get_next interface for vec parquet
yinzhijian May 9, 2022
3a545dc
code format
yinzhijian May 9, 2022
3fa8413
code format
yinzhijian May 9, 2022
fc2c6ae
Merge branch 'dev.parquet_vec' of https://github.com/yinzhijian/incub…
yinzhijian May 9, 2022
e79aec1
fix bug and add more comments
yinzhijian May 10, 2022
1d1875b
remove unused code
yinzhijian May 11, 2022
00bdb8f
Merge branch 'master' into dev.parquet_vec
yinzhijian May 11, 2022
0ff901f
format code
yinzhijian May 12, 2022
ea39ed9
Merge branch 'apache:master' into dev.parquet_vec
yinzhijian May 12, 2022
ed7957e
code format
yinzhijian May 12, 2022
80f1e0d
code format
yinzhijian May 12, 2022
8712bc9
Merge branch 'master' into dev.parquet_vec
yinzhijian May 12, 2022
3378872
update fe
yinzhijian May 12, 2022
24235e6
Merge branch 'master' into dev.parquet_vec
yinzhijian May 13, 2022
a7529b2
format code
yinzhijian May 13, 2022
738a778
fix unit test
yinzhijian May 13, 2022
0339fb5
fix bug
yinzhijian May 13, 2022
d1cd090
Merge branch 'master' into dev.parquet_vec
yinzhijian May 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class RuntimeState;
class ExprContext;

namespace vectorized {
class VExprContext;
class IColumn;
using MutableColumnPtr = IColumn::MutablePtr;
} // namespace vectorized
Expand Down
13 changes: 10 additions & 3 deletions be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "common/object_pool.h"
#include "vec/exec/vbroker_scanner.h"
#include "vec/exec/vparquet_scanner.h"
#include "exec/json_scanner.h"
#include "exec/orc_scanner.h"
#include "exec/parquet_scanner.h"
Expand Down Expand Up @@ -224,9 +225,15 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
BaseScanner* scan = nullptr;
switch (scan_range.ranges[0].format_type) {
case TFileFormatType::FORMAT_PARQUET:
scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
if (_vectorized) {
scan = new vectorized::VParquetScanner(
_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges,
scan_range.broker_addresses, _pre_filter_texprs, counter);
} else {
scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
}
break;
case TFileFormatType::FORMAT_ORC:
scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params,
Expand Down
16 changes: 16 additions & 0 deletions be/src/exec/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,22 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
return Status::OK();
}

Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
const std::vector<SlotDescriptor*>& tuple_slot_descs,
bool* eof) {
if (_batch->num_rows() == 0 || _current_line_of_batch != 0 || _current_line_of_group != 0) {
RETURN_IF_ERROR(read_record_batch(tuple_slot_descs, eof));
}
*batch = get_batch();
return Status::OK();
}

const std::shared_ptr<arrow::RecordBatch>& ParquetReaderWrap::get_batch() {
_current_line_of_batch += _batch->num_rows();
_current_line_of_group += _batch->num_rows();
return _batch;
}

Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array,
uint8_t* buf, int32_t* wbytes) {
const auto type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/parquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,16 @@ class ParquetReaderWrap {
Status size(int64_t* size);
Status init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::string& timezone);
Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof);

private:
void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value,
int32_t len);
Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc);
Status read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof);
const std::shared_ptr<arrow::RecordBatch>& get_batch();
Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
int32_t* wbtyes);

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/parquet_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class ParquetScanner : public BaseScanner {
// Close this scanner
virtual void close();

private:
protected:
// Read next buffer from reader
Status open_next_reader();

private:
protected:
//const TBrokerScanRangeParams& _params;
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;
Expand Down
40 changes: 21 additions & 19 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,29 @@ static TFileFormatType::type parse_format(const std::string& format_str,
return parse_format("CSV", compress_type);
}
TFileFormatType::type format_type = TFileFormatType::FORMAT_UNKNOWN;
if (boost::iequals(format_str, "CSV")) {
if (iequal(format_str, "CSV")) {
if (compress_type.empty()) {
format_type = TFileFormatType::FORMAT_CSV_PLAIN;
}
if (boost::iequals(compress_type, "GZ")) {
if (iequal(compress_type, "GZ")) {
format_type = TFileFormatType::FORMAT_CSV_GZ;
} else if (boost::iequals(compress_type, "LZO")) {
} else if (iequal(compress_type, "LZO")) {
format_type = TFileFormatType::FORMAT_CSV_LZO;
} else if (boost::iequals(compress_type, "BZ2")) {
} else if (iequal(compress_type, "BZ2")) {
format_type = TFileFormatType::FORMAT_CSV_BZ2;
} else if (boost::iequals(compress_type, "LZ4FRAME")) {
} else if (iequal(compress_type, "LZ4FRAME")) {
format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
} else if (boost::iequals(compress_type, "LZOP")) {
} else if (iequal(compress_type, "LZOP")) {
format_type = TFileFormatType::FORMAT_CSV_LZOP;
} else if (boost::iequals(compress_type, "DEFLATE")) {
} else if (iequal(compress_type, "DEFLATE")) {
format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
}
} else if (boost::iequals(format_str, "JSON")) {
} else if (iequal(format_str, "JSON")) {
if (compress_type.empty()) {
format_type = TFileFormatType::FORMAT_JSON;
}
} else if (iequal(format_str, "PARQUET")) {
format_type = TFileFormatType::FORMAT_PARQUET;
}
return format_type;
}
Expand Down Expand Up @@ -264,12 +266,12 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct

// get format of this put
if (!http_req->header(HTTP_COMPRESS_TYPE).empty() &&
boost::iequals(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
return Status::InternalError("compress data of JSON format is not supported.");
}
std::string format_str = http_req->header(HTTP_FORMAT_KEY);
if (boost::iequals(format_str, BeConsts::CSV_WITH_NAMES) ||
boost::iequals(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) {
if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
iequal(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) {
ctx->header_type = format_str;
//treat as CSV
format_str = BeConsts::CSV;
Expand All @@ -291,7 +293,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024;
bool read_json_by_line = false;
if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
read_json_by_line = true;
}
}
Expand Down Expand Up @@ -440,9 +442,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_negative(false);
}
if (!http_req->header(HTTP_STRICT_MODE).empty()) {
if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "false")) {
if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) {
request.__set_strictMode(false);
} else if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "true")) {
} else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) {
request.__set_strictMode(true);
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
Expand All @@ -465,7 +467,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_json_root(http_req->header(HTTP_JSONROOT));
}
if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
if (boost::iequals(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
request.__set_strip_outer_array(true);
} else {
request.__set_strip_outer_array(false);
Expand All @@ -474,7 +476,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_strip_outer_array(false);
}
if (!http_req->header(HTTP_NUM_AS_STRING).empty()) {
if (boost::iequals(http_req->header(HTTP_NUM_AS_STRING), "true")) {
if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) {
request.__set_num_as_string(true);
} else {
request.__set_num_as_string(false);
Expand All @@ -483,7 +485,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_num_as_string(false);
}
if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
if (boost::iequals(http_req->header(HTTP_FUZZY_PARSE), "true")) {
if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
request.__set_fuzzy_parse(true);
} else {
request.__set_fuzzy_parse(false);
Expand All @@ -493,7 +495,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
}

if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
request.__set_read_json_by_line(true);
} else {
request.__set_read_json_by_line(false);
Expand All @@ -517,7 +519,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
}

if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) {
if (boost::iequals(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) {
if (iequal(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) {
request.__set_load_to_single_tablet(true);
} else {
request.__set_load_to_single_tablet(false);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/rle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class RlePageBuilder : public PageBuilder {

void reset() override {
_count = 0;
_finished = false;
_rle_encoder->Clear();
_rle_encoder->Reserve(RLE_PAGE_HEADER_SIZE, 0);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ set(VEC_FILES
exec/vtable_function_node.cpp
exec/vbroker_scan_node.cpp
exec/vbroker_scanner.cpp
exec/vparquet_scanner.cpp
exec/join/vhash_join_node.cpp
exprs/vectorized_agg_fn.cpp
exprs/vectorized_fn_call.cpp
Expand Down Expand Up @@ -190,6 +191,7 @@ set(VEC_FILES
runtime/vdata_stream_recvr.cpp
runtime/vdata_stream_mgr.cpp
runtime/vpartition_info.cpp
utils/arrow_column_to_doris_column.cpp
runtime/vsorted_run_merger.cpp)

add_library(Vec STATIC
Expand Down
63 changes: 63 additions & 0 deletions be/src/vec/data_types/data_type_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,67 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
return nested;
}

DataTypePtr DataTypeFactory::create_data_type(const arrow::Type::type& type, bool is_nullable) {
DataTypePtr nested = nullptr;
switch (type) {
case ::arrow::Type::BOOL:
nested = std::make_shared<vectorized::DataTypeUInt8>();
break;
case ::arrow::Type::INT8:
nested = std::make_shared<vectorized::DataTypeInt8>();
break;
case ::arrow::Type::UINT8:
nested = std::make_shared<vectorized::DataTypeUInt8>();
break;
case ::arrow::Type::INT16:
nested = std::make_shared<vectorized::DataTypeInt16>();
break;
case ::arrow::Type::UINT16:
nested = std::make_shared<vectorized::DataTypeUInt16>();
break;
case ::arrow::Type::INT32:
nested = std::make_shared<vectorized::DataTypeInt32>();
break;
case ::arrow::Type::UINT32:
nested = std::make_shared<vectorized::DataTypeUInt32>();
break;
case ::arrow::Type::INT64:
nested = std::make_shared<vectorized::DataTypeInt64>();
break;
case ::arrow::Type::UINT64:
nested = std::make_shared<vectorized::DataTypeUInt64>();
break;
case ::arrow::Type::HALF_FLOAT:
case ::arrow::Type::FLOAT:
nested = std::make_shared<vectorized::DataTypeFloat32>();
break;
case ::arrow::Type::DOUBLE:
nested = std::make_shared<vectorized::DataTypeFloat64>();
break;
case ::arrow::Type::DATE32:
nested = std::make_shared<vectorized::DataTypeDate>();
break;
case ::arrow::Type::DATE64:
case ::arrow::Type::TIMESTAMP:
nested = std::make_shared<vectorized::DataTypeDateTime>();
break;
case ::arrow::Type::BINARY:
case ::arrow::Type::FIXED_SIZE_BINARY:
case ::arrow::Type::STRING:
nested = std::make_shared<vectorized::DataTypeString>();
break;
case ::arrow::Type::DECIMAL:
nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
break;
default:
DCHECK(false) << "invalid arrow type:" << (int)type;
break;
}

if (nested && is_nullable) {
return std::make_shared<vectorized::DataTypeNullable>(nested);
}
return nested;
}

} // namespace doris::vectorized
3 changes: 3 additions & 0 deletions be/src/vec/data_types/data_type_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <mutex>
#include <string>

#include "arrow/type.h"
#include "gen_cpp/data.pb.h"
#include "olap/field.h"
#include "olap/tablet_schema.h"
Expand Down Expand Up @@ -87,6 +88,8 @@ class DataTypeFactory {

DataTypePtr create_data_type(const PColumnMeta& pcolumn);

DataTypePtr create_data_type(const arrow::Type::type& type, bool is_nullable);

private:
DataTypePtr _create_primitive_data_type(const FieldType& type) const;

Expand Down
Loading