Skip to content

Commit

Permalink
[feature-wip](parquet-vec) Support parquet scanner in vectorized engi…
Browse files Browse the repository at this point in the history
…ne (#9433)
  • Loading branch information
yinzhijian authored May 17, 2022
1 parent 7e8e14b commit bee5c2f
Show file tree
Hide file tree
Showing 24 changed files with 1,621 additions and 48 deletions.
1 change: 1 addition & 0 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class RuntimeState;
class ExprContext;

namespace vectorized {
class VExprContext;
class IColumn;
using MutableColumnPtr = IColumn::MutablePtr;
} // namespace vectorized
Expand Down
13 changes: 10 additions & 3 deletions be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "util/thread.h"
#include "vec/exec/vbroker_scanner.h"
#include "vec/exec/vjson_scanner.h"
#include "vec/exec/vparquet_scanner.h"

namespace doris {

Expand Down Expand Up @@ -225,9 +226,15 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
BaseScanner* scan = nullptr;
switch (scan_range.ranges[0].format_type) {
case TFileFormatType::FORMAT_PARQUET:
scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
if (_vectorized) {
scan = new vectorized::VParquetScanner(
_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges,
scan_range.broker_addresses, _pre_filter_texprs, counter);
} else {
scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
}
break;
case TFileFormatType::FORMAT_ORC:
scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params,
Expand Down
16 changes: 16 additions & 0 deletions be/src/exec/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,22 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
return Status::OK();
}

Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
const std::vector<SlotDescriptor*>& tuple_slot_descs,
bool* eof) {
if (_batch->num_rows() == 0 || _current_line_of_batch != 0 || _current_line_of_group != 0) {
RETURN_IF_ERROR(read_record_batch(tuple_slot_descs, eof));
}
*batch = get_batch();
return Status::OK();
}

const std::shared_ptr<arrow::RecordBatch>& ParquetReaderWrap::get_batch() {
_current_line_of_batch += _batch->num_rows();
_current_line_of_group += _batch->num_rows();
return _batch;
}

Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array,
uint8_t* buf, int32_t* wbytes) {
const auto type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/parquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,16 @@ class ParquetReaderWrap {
Status size(int64_t* size);
Status init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::string& timezone);
Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof);

private:
void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value,
int32_t len);
Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc);
Status read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof);
const std::shared_ptr<arrow::RecordBatch>& get_batch();
Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
int32_t* wbtyes);

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/parquet_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class ParquetScanner : public BaseScanner {
// Close this scanner
virtual void close();

private:
protected:
// Read next buffer from reader
Status open_next_reader();

private:
protected:
//const TBrokerScanRangeParams& _params;
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;
Expand Down
40 changes: 21 additions & 19 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,29 @@ static TFileFormatType::type parse_format(const std::string& format_str,
return parse_format("CSV", compress_type);
}
TFileFormatType::type format_type = TFileFormatType::FORMAT_UNKNOWN;
if (boost::iequals(format_str, "CSV")) {
if (iequal(format_str, "CSV")) {
if (compress_type.empty()) {
format_type = TFileFormatType::FORMAT_CSV_PLAIN;
}
if (boost::iequals(compress_type, "GZ")) {
if (iequal(compress_type, "GZ")) {
format_type = TFileFormatType::FORMAT_CSV_GZ;
} else if (boost::iequals(compress_type, "LZO")) {
} else if (iequal(compress_type, "LZO")) {
format_type = TFileFormatType::FORMAT_CSV_LZO;
} else if (boost::iequals(compress_type, "BZ2")) {
} else if (iequal(compress_type, "BZ2")) {
format_type = TFileFormatType::FORMAT_CSV_BZ2;
} else if (boost::iequals(compress_type, "LZ4FRAME")) {
} else if (iequal(compress_type, "LZ4FRAME")) {
format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
} else if (boost::iequals(compress_type, "LZOP")) {
} else if (iequal(compress_type, "LZOP")) {
format_type = TFileFormatType::FORMAT_CSV_LZOP;
} else if (boost::iequals(compress_type, "DEFLATE")) {
} else if (iequal(compress_type, "DEFLATE")) {
format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
}
} else if (boost::iequals(format_str, "JSON")) {
} else if (iequal(format_str, "JSON")) {
if (compress_type.empty()) {
format_type = TFileFormatType::FORMAT_JSON;
}
} else if (iequal(format_str, "PARQUET")) {
format_type = TFileFormatType::FORMAT_PARQUET;
}
return format_type;
}
Expand Down Expand Up @@ -264,12 +266,12 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct

// get format of this put
if (!http_req->header(HTTP_COMPRESS_TYPE).empty() &&
boost::iequals(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
return Status::InternalError("compress data of JSON format is not supported.");
}
std::string format_str = http_req->header(HTTP_FORMAT_KEY);
if (boost::iequals(format_str, BeConsts::CSV_WITH_NAMES) ||
boost::iequals(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) {
if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
iequal(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) {
ctx->header_type = format_str;
//treat as CSV
format_str = BeConsts::CSV;
Expand All @@ -291,7 +293,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024;
bool read_json_by_line = false;
if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
read_json_by_line = true;
}
}
Expand Down Expand Up @@ -440,9 +442,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_negative(false);
}
if (!http_req->header(HTTP_STRICT_MODE).empty()) {
if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "false")) {
if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) {
request.__set_strictMode(false);
} else if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "true")) {
} else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) {
request.__set_strictMode(true);
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
Expand All @@ -465,7 +467,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_json_root(http_req->header(HTTP_JSONROOT));
}
if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
if (boost::iequals(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
request.__set_strip_outer_array(true);
} else {
request.__set_strip_outer_array(false);
Expand All @@ -474,7 +476,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_strip_outer_array(false);
}
if (!http_req->header(HTTP_NUM_AS_STRING).empty()) {
if (boost::iequals(http_req->header(HTTP_NUM_AS_STRING), "true")) {
if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) {
request.__set_num_as_string(true);
} else {
request.__set_num_as_string(false);
Expand All @@ -483,7 +485,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_num_as_string(false);
}
if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
if (boost::iequals(http_req->header(HTTP_FUZZY_PARSE), "true")) {
if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
request.__set_fuzzy_parse(true);
} else {
request.__set_fuzzy_parse(false);
Expand All @@ -493,7 +495,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
}

if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
request.__set_read_json_by_line(true);
} else {
request.__set_read_json_by_line(false);
Expand All @@ -517,7 +519,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
}

if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) {
if (boost::iequals(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) {
if (iequal(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) {
request.__set_load_to_single_tablet(true);
} else {
request.__set_load_to_single_tablet(false);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/rle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class RlePageBuilder : public PageBuilder {

void reset() override {
_count = 0;
_finished = false;
_rle_encoder->Clear();
_rle_encoder->Reserve(RLE_PAGE_HEADER_SIZE, 0);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ set(VEC_FILES
exec/vbroker_scan_node.cpp
exec/vbroker_scanner.cpp
exec/vjson_scanner.cpp
exec/vparquet_scanner.cpp
exec/join/vhash_join_node.cpp
exprs/vectorized_agg_fn.cpp
exprs/vectorized_fn_call.cpp
Expand Down Expand Up @@ -191,6 +192,7 @@ set(VEC_FILES
runtime/vdata_stream_recvr.cpp
runtime/vdata_stream_mgr.cpp
runtime/vpartition_info.cpp
utils/arrow_column_to_doris_column.cpp
runtime/vsorted_run_merger.cpp)

add_library(Vec STATIC
Expand Down
63 changes: 63 additions & 0 deletions be/src/vec/data_types/data_type_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,67 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
return nested;
}

DataTypePtr DataTypeFactory::create_data_type(const arrow::Type::type& type, bool is_nullable) {
DataTypePtr nested = nullptr;
switch (type) {
case ::arrow::Type::BOOL:
nested = std::make_shared<vectorized::DataTypeUInt8>();
break;
case ::arrow::Type::INT8:
nested = std::make_shared<vectorized::DataTypeInt8>();
break;
case ::arrow::Type::UINT8:
nested = std::make_shared<vectorized::DataTypeUInt8>();
break;
case ::arrow::Type::INT16:
nested = std::make_shared<vectorized::DataTypeInt16>();
break;
case ::arrow::Type::UINT16:
nested = std::make_shared<vectorized::DataTypeUInt16>();
break;
case ::arrow::Type::INT32:
nested = std::make_shared<vectorized::DataTypeInt32>();
break;
case ::arrow::Type::UINT32:
nested = std::make_shared<vectorized::DataTypeUInt32>();
break;
case ::arrow::Type::INT64:
nested = std::make_shared<vectorized::DataTypeInt64>();
break;
case ::arrow::Type::UINT64:
nested = std::make_shared<vectorized::DataTypeUInt64>();
break;
case ::arrow::Type::HALF_FLOAT:
case ::arrow::Type::FLOAT:
nested = std::make_shared<vectorized::DataTypeFloat32>();
break;
case ::arrow::Type::DOUBLE:
nested = std::make_shared<vectorized::DataTypeFloat64>();
break;
case ::arrow::Type::DATE32:
nested = std::make_shared<vectorized::DataTypeDate>();
break;
case ::arrow::Type::DATE64:
case ::arrow::Type::TIMESTAMP:
nested = std::make_shared<vectorized::DataTypeDateTime>();
break;
case ::arrow::Type::BINARY:
case ::arrow::Type::FIXED_SIZE_BINARY:
case ::arrow::Type::STRING:
nested = std::make_shared<vectorized::DataTypeString>();
break;
case ::arrow::Type::DECIMAL:
nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
break;
default:
DCHECK(false) << "invalid arrow type:" << (int)type;
break;
}

if (nested && is_nullable) {
return std::make_shared<vectorized::DataTypeNullable>(nested);
}
return nested;
}

} // namespace doris::vectorized
3 changes: 3 additions & 0 deletions be/src/vec/data_types/data_type_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <mutex>
#include <string>

#include "arrow/type.h"
#include "gen_cpp/data.pb.h"
#include "olap/field.h"
#include "olap/tablet_schema.h"
Expand Down Expand Up @@ -87,6 +88,8 @@ class DataTypeFactory {

DataTypePtr create_data_type(const PColumnMeta& pcolumn);

DataTypePtr create_data_type(const arrow::Type::type& type, bool is_nullable);

private:
DataTypePtr _create_primitive_data_type(const FieldType& type) const;

Expand Down
Loading

0 comments on commit bee5c2f

Please sign in to comment.