From d6d1d2cef97c156c1b9044d82cb905d678e5e94d Mon Sep 17 00:00:00 2001 From: xufei Date: Sat, 4 Jan 2020 10:07:15 +0800 Subject: [PATCH] FLASH-795 support chblock encode type (#383) * support chblock encode type for cop request * fix timestamp bug for ch block encode * add unsupported type for chblock encode * refine code * fix build error * address comments --- contrib/tipb | 2 +- dbms/src/Debug/dbgFuncCoprocessor.cpp | 34 +-- .../Flash/Coprocessor/CHBlockChunkCodec.cpp | 80 ++++++++ .../src/Flash/Coprocessor/CHBlockChunkCodec.h | 16 ++ .../Coprocessor/DAGBlockOutputStream.cpp | 32 ++- .../Flash/Coprocessor/DAGBlockOutputStream.h | 2 +- dbms/src/Flash/Coprocessor/DAGQuerySource.cpp | 23 ++- dbms/src/Flash/Coprocessor/DAGQuerySource.h | 6 +- dbms/src/Flash/Coprocessor/DAGUtils.cpp | 194 ++++++------------ dbms/src/Flash/Coprocessor/DAGUtils.h | 2 +- dbms/src/Flash/Coprocessor/InterpreterDAG.cpp | 2 +- .../mutable-test/txn_dag/chblock_encode.test | 45 ++++ 12 files changed, 268 insertions(+), 170 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp create mode 100644 dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h create mode 100644 tests/mutable-test/txn_dag/chblock_encode.test diff --git a/contrib/tipb b/contrib/tipb index a36bd2ab231..568726749cb 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit a36bd2ab2314cb65a0addb9ac0e473b115224540 +Subproject commit 568726749cb746b365cca389ceb4043ca6a2f250 diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 8b0da45ee59..03e870b1c74 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -368,6 +369,8 @@ std::tuple compileQuery(Context & context, if (encode_type == "chunk") dag_request.set_encode_type(tipb::EncodeType::TypeChunk); + else if (encode_type == "chblock") + dag_request.set_encode_type(tipb::EncodeType::TypeCHBlock); else dag_request.set_encode_type(tipb::EncodeType::TypeDefault); @@ -670,22 +673,26 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest return dag_response; } -void arrowChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks) +std::unique_ptr getCodec(tipb::EncodeType encode_type) { - ArrowChunkCodec codec; - for (const auto & chunk : dag_response.chunks()) + switch (encode_type) { - blocks.emplace_back(codec.decode(chunk, schema)); + case tipb::EncodeType::TypeDefault: + return std::make_unique(); + case tipb::EncodeType::TypeChunk: + return std::make_unique(); + case tipb::EncodeType::TypeCHBlock: + return std::make_unique(); + default: + throw Exception("Unsupported encode type", ErrorCodes::BAD_ARGUMENTS); } } -void defaultChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks) +void chunksToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks) { - DefaultChunkCodec codec; + auto codec = getCodec(dag_response.encode_type()); for (const auto & chunk : dag_response.chunks()) - { - blocks.emplace_back(codec.decode(chunk, schema)); - } + blocks.emplace_back(codec->decode(chunk, schema)); } BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response) @@ -694,14 +701,7 @@ BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const throw Exception(dag_response.error().msg(), dag_response.error().code()); BlocksList blocks; - if (dag_response.encode_type() == tipb::EncodeType::TypeChunk) - { - arrowChunkToBlocks(schema, dag_response, blocks); - } - else - { - defaultChunkToBlocks(schema, dag_response, blocks); - } + chunksToBlocks(schema, dag_response, blocks); return std::make_shared(std::move(blocks)); } diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp new file mode 100644 index 00000000000..8f6e5d713d5 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -0,0 +1,80 @@ +#include +#include +#include +#include + +namespace DB +{ + +class CHBlockChunkCodecStream : public ChunkCodecStream +{ +public: + explicit CHBlockChunkCodecStream(const std::vector & field_types) : ChunkCodecStream(field_types) + { + output = std::make_unique(); + } + + String getString() override + { + std::stringstream ss; + return output->str(); + } + void clear() override { output = std::make_unique(); } + void encode(const Block & block, size_t start, size_t end) override; + std::unique_ptr output; +}; + +void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit) +{ + /** If there are columns-constants - then we materialize them. + * (Since the data type does not know how to serialize / deserialize constants.) + */ + ColumnPtr full_column; + + if (ColumnPtr converted = column->convertToFullColumnIfConst()) + full_column = converted; + else + full_column = column; + + IDataType::OutputStreamGetter output_stream_getter = [&](const IDataType::SubstreamPath &) { return &ostr; }; + type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {}); +} + +void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end) +{ + // Encode data in chunk by chblock encode + if (start != 0 || end != block.rows()) + throw Exception("CHBlock encode only support encode whole block"); + block.checkNumberOfRows(); + size_t columns = block.columns(); + size_t rows = block.rows(); + + writeVarUInt(columns, *output); + writeVarUInt(rows, *output); + + for (size_t i = 0; i < columns; i++) + { + const ColumnWithTypeAndName & column = block.safeGetByPosition(i); + + writeStringBinary(column.name, *output); + writeStringBinary(column.type->getName(), *output); + + if (rows) + writeData(*column.type, column.column, *output, 0, 0); + } +} + +std::unique_ptr CHBlockChunkCodec::newCodecStream(const std::vector & field_types) +{ + return std::make_unique(field_types); +} + +Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema &) +{ + const String & row_data = chunk.rows_data(); + ReadBufferFromString read_buffer(row_data); + NativeBlockInputStream block_in(read_buffer, 0); + return block_in.read(); +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h new file mode 100644 index 00000000000..fce2d468a7c --- /dev/null +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +namespace DB +{ + +class CHBlockChunkCodec : public ChunkCodec +{ +public: + CHBlockChunkCodec() = default; + Block decode(const tipb::Chunk &, const DAGSchema &) override; + std::unique_ptr newCodecStream(const std::vector & field_types) override; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp index 9a75a9529a2..3605ce8661b 100644 --- a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp +++ b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp @@ -1,6 +1,7 @@ #include #include +#include #include namespace DB @@ -28,6 +29,11 @@ DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_, { chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); } + else if (encodeType == tipb::EncodeType::TypeCHBlock) + { + chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); + records_per_chunk = -1; + } else { throw Exception("Only Default and Arrow encode type is supported in DAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER); @@ -63,17 +69,29 @@ void DAGBlockOutputStream::write(const Block & block) { if (block.columns() != result_field_types.size()) throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR); - size_t rows = block.rows(); - for (size_t row_index = 0; row_index < rows;) + if (records_per_chunk == -1) { - if (current_records_num >= records_per_chunk) + current_records_num = 0; + if (block.rows() > 0) { + chunk_codec_stream->encode(block, 0, block.rows()); encodeChunkToDAGResponse(); } - const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows); - chunk_codec_stream->encode(block, row_index, upper); - current_records_num += (upper - row_index); - row_index = upper; + } + else + { + size_t rows = block.rows(); + for (size_t row_index = 0; row_index < rows;) + { + if (current_records_num >= records_per_chunk) + { + encodeChunkToDAGResponse(); + } + const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows); + chunk_codec_stream->encode(block, row_index, upper); + current_records_num += (upper - row_index); + row_index = upper; + } } } diff --git a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h index 40aff3a6ed9..79cea3aa26b 100644 --- a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h +++ b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h @@ -33,7 +33,7 @@ class DAGBlockOutputStream : public IBlockOutputStream tipb::SelectResponse & dag_response; std::vector result_field_types; Block header; - const Int64 records_per_chunk; + Int64 records_per_chunk; std::unique_ptr chunk_codec_stream; Int64 current_records_num; }; diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index ec0e9f74377..f9642125995 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -65,15 +65,19 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, Re "Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED); } } + analyzeResultFieldTypes(); + analyzeDAGEncodeType(); +} + +void DAGQuerySource::analyzeDAGEncodeType() +{ encode_type = dag_request.encode_type(); - if (encode_type == tipb::EncodeType::TypeChunk && hasUnsupportedTypeForArrowEncode(getResultFieldTypes())) - { + if (isUnsupportedEncodeType(getResultFieldTypes(), encode_type)) encode_type = tipb::EncodeType::TypeDefault; - } if (encode_type == tipb::EncodeType::TypeChunk && dag_request.has_chunk_memory_layout() && dag_request.chunk_memory_layout().has_endian() && dag_request.chunk_memory_layout().endian() == tipb::Endian::BigEndian) // todo support BigEndian encode for chunk encode type - throw Exception("BigEndian encode for chunk encode type is not supported yet.", ErrorCodes::NOT_IMPLEMENTED); + encode_type = tipb::EncodeType::TypeDefault; } std::tuple DAGQuerySource::parse(size_t max_query_size) @@ -135,7 +139,7 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector DAGQuerySource::getResultFieldTypes() const +void DAGQuerySource::analyzeResultFieldTypes() { std::vector executor_output; for (int i = dag_request.executors_size() - 1; i >= 0; i--) @@ -152,14 +156,13 @@ std::vector DAGQuerySource::getResultFieldTypes() const // todo should always use output offset to re-construct the output field types if (hasAggregation()) { - return executor_output; + result_field_types = std::move(executor_output); } - std::vector ret; - for (int i : dag_request.output_offsets()) + else { - ret.push_back(executor_output[i]); + for (UInt32 i : dag_request.output_offsets()) + result_field_types.push_back(executor_output[i]); } - return ret; } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.h b/dbms/src/Flash/Coprocessor/DAGQuerySource.h index 1dc8a0fb893..abfcf680f48 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.h +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.h @@ -80,7 +80,7 @@ class DAGQuerySource : public IQuerySource }; const tipb::DAGRequest & getDAGRequest() const { return dag_request; }; - std::vector getResultFieldTypes() const; + std::vector getResultFieldTypes() const { return result_field_types; }; ASTPtr getAST() const { return ast; }; @@ -95,6 +95,9 @@ class DAGQuerySource : public IQuerySource } } + void analyzeResultFieldTypes(); + void analyzeDAGEncodeType(); + protected: Context & context; DAGContext & dag_context; @@ -112,6 +115,7 @@ class DAGQuerySource : public IQuerySource Int32 order_index = -1; Int32 limit_index = -1; + std::vector result_field_types; tipb::EncodeType encode_type; ASTPtr ast; diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index 356fe866447..eeef74b67e5 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -274,11 +274,17 @@ bool exprHasValidFieldType(const tipb::Expr & expr) return expr.has_field_type() && !(expr.field_type().tp() == TiDB::TP::TypeNewDecimal && expr.field_type().decimal() == -1); } -bool hasUnsupportedTypeForArrowEncode(const std::vector & types) +bool isUnsupportedEncodeType(const std::vector & types, tipb::EncodeType encode_type) { + if (encode_type == tipb::EncodeType::TypeDefault) + return false; for (const auto & type : types) - if (type.tp() == TiDB::TypeSet) + { + if (encode_type == tipb::EncodeType::TypeCHBlock && (type.tp() == TiDB::TypeSet || type.tp() == TiDB::TypeEnum)) + return true; + if (encode_type == tipb::EncodeType::TypeChunk && type.tp() == TiDB::TypeSet) return true; + } return false; } @@ -429,76 +435,46 @@ std::unordered_map scalar_func_map({ {tipb::ScalarFuncSig::CastJsonAsJson, "cast"}, */ - {tipb::ScalarFuncSig::CoalesceInt, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceReal, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceString, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceDecimal, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceTime, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceDuration, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceInt, "coalesce"}, {tipb::ScalarFuncSig::CoalesceReal, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceString, "coalesce"}, {tipb::ScalarFuncSig::CoalesceDecimal, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceTime, "coalesce"}, {tipb::ScalarFuncSig::CoalesceDuration, "coalesce"}, {tipb::ScalarFuncSig::CoalesceJson, "coalesce"}, - {tipb::ScalarFuncSig::LTInt, "less"}, - {tipb::ScalarFuncSig::LTReal, "less"}, - {tipb::ScalarFuncSig::LTString, "less"}, - {tipb::ScalarFuncSig::LTDecimal, "less"}, - {tipb::ScalarFuncSig::LTTime, "less"}, - {tipb::ScalarFuncSig::LTDuration, "less"}, + {tipb::ScalarFuncSig::LTInt, "less"}, {tipb::ScalarFuncSig::LTReal, "less"}, {tipb::ScalarFuncSig::LTString, "less"}, + {tipb::ScalarFuncSig::LTDecimal, "less"}, {tipb::ScalarFuncSig::LTTime, "less"}, {tipb::ScalarFuncSig::LTDuration, "less"}, {tipb::ScalarFuncSig::LTJson, "less"}, - {tipb::ScalarFuncSig::LEInt, "lessOrEquals"}, - {tipb::ScalarFuncSig::LEReal, "lessOrEquals"}, - {tipb::ScalarFuncSig::LEString, "lessOrEquals"}, - {tipb::ScalarFuncSig::LEDecimal, "lessOrEquals"}, - {tipb::ScalarFuncSig::LETime, "lessOrEquals"}, - {tipb::ScalarFuncSig::LEDuration, "lessOrEquals"}, + {tipb::ScalarFuncSig::LEInt, "lessOrEquals"}, {tipb::ScalarFuncSig::LEReal, "lessOrEquals"}, + {tipb::ScalarFuncSig::LEString, "lessOrEquals"}, {tipb::ScalarFuncSig::LEDecimal, "lessOrEquals"}, + {tipb::ScalarFuncSig::LETime, "lessOrEquals"}, {tipb::ScalarFuncSig::LEDuration, "lessOrEquals"}, {tipb::ScalarFuncSig::LEJson, "lessOrEquals"}, - {tipb::ScalarFuncSig::GTInt, "greater"}, - {tipb::ScalarFuncSig::GTReal, "greater"}, - {tipb::ScalarFuncSig::GTString, "greater"}, - {tipb::ScalarFuncSig::GTDecimal, "greater"}, - {tipb::ScalarFuncSig::GTTime, "greater"}, - {tipb::ScalarFuncSig::GTDuration, "greater"}, + {tipb::ScalarFuncSig::GTInt, "greater"}, {tipb::ScalarFuncSig::GTReal, "greater"}, {tipb::ScalarFuncSig::GTString, "greater"}, + {tipb::ScalarFuncSig::GTDecimal, "greater"}, {tipb::ScalarFuncSig::GTTime, "greater"}, {tipb::ScalarFuncSig::GTDuration, "greater"}, {tipb::ScalarFuncSig::GTJson, "greater"}, - {tipb::ScalarFuncSig::GreatestInt, "greatest"}, - {tipb::ScalarFuncSig::GreatestReal, "greatest"}, - {tipb::ScalarFuncSig::GreatestString, "greatest"}, - {tipb::ScalarFuncSig::GreatestDecimal, "greatest"}, + {tipb::ScalarFuncSig::GreatestInt, "greatest"}, {tipb::ScalarFuncSig::GreatestReal, "greatest"}, + {tipb::ScalarFuncSig::GreatestString, "greatest"}, {tipb::ScalarFuncSig::GreatestDecimal, "greatest"}, {tipb::ScalarFuncSig::GreatestTime, "greatest"}, - {tipb::ScalarFuncSig::LeastInt, "least"}, - {tipb::ScalarFuncSig::LeastReal, "least"}, - {tipb::ScalarFuncSig::LeastString, "least"}, - {tipb::ScalarFuncSig::LeastDecimal, "least"}, - {tipb::ScalarFuncSig::LeastTime, "least"}, + {tipb::ScalarFuncSig::LeastInt, "least"}, {tipb::ScalarFuncSig::LeastReal, "least"}, {tipb::ScalarFuncSig::LeastString, "least"}, + {tipb::ScalarFuncSig::LeastDecimal, "least"}, {tipb::ScalarFuncSig::LeastTime, "least"}, //{tipb::ScalarFuncSig::IntervalInt, "cast"}, //{tipb::ScalarFuncSig::IntervalReal, "cast"}, - {tipb::ScalarFuncSig::GEInt, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GEReal, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GEString, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GEDecimal, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GETime, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GEDuration, "greaterOrEquals"}, + {tipb::ScalarFuncSig::GEInt, "greaterOrEquals"}, {tipb::ScalarFuncSig::GEReal, "greaterOrEquals"}, + {tipb::ScalarFuncSig::GEString, "greaterOrEquals"}, {tipb::ScalarFuncSig::GEDecimal, "greaterOrEquals"}, + {tipb::ScalarFuncSig::GETime, "greaterOrEquals"}, {tipb::ScalarFuncSig::GEDuration, "greaterOrEquals"}, {tipb::ScalarFuncSig::GEJson, "greaterOrEquals"}, - {tipb::ScalarFuncSig::EQInt, "equals"}, - {tipb::ScalarFuncSig::EQReal, "equals"}, - {tipb::ScalarFuncSig::EQString, "equals"}, - {tipb::ScalarFuncSig::EQDecimal, "equals"}, - {tipb::ScalarFuncSig::EQTime, "equals"}, - {tipb::ScalarFuncSig::EQDuration, "equals"}, + {tipb::ScalarFuncSig::EQInt, "equals"}, {tipb::ScalarFuncSig::EQReal, "equals"}, {tipb::ScalarFuncSig::EQString, "equals"}, + {tipb::ScalarFuncSig::EQDecimal, "equals"}, {tipb::ScalarFuncSig::EQTime, "equals"}, {tipb::ScalarFuncSig::EQDuration, "equals"}, {tipb::ScalarFuncSig::EQJson, "equals"}, - {tipb::ScalarFuncSig::NEInt, "notEquals"}, - {tipb::ScalarFuncSig::NEReal, "notEquals"}, - {tipb::ScalarFuncSig::NEString, "notEquals"}, - {tipb::ScalarFuncSig::NEDecimal, "notEquals"}, - {tipb::ScalarFuncSig::NETime, "notEquals"}, - {tipb::ScalarFuncSig::NEDuration, "notEquals"}, - {tipb::ScalarFuncSig::NEJson, "notEquals"}, + {tipb::ScalarFuncSig::NEInt, "notEquals"}, {tipb::ScalarFuncSig::NEReal, "notEquals"}, {tipb::ScalarFuncSig::NEString, "notEquals"}, + {tipb::ScalarFuncSig::NEDecimal, "notEquals"}, {tipb::ScalarFuncSig::NETime, "notEquals"}, + {tipb::ScalarFuncSig::NEDuration, "notEquals"}, {tipb::ScalarFuncSig::NEJson, "notEquals"}, //{tipb::ScalarFuncSig::NullEQInt, "cast"}, //{tipb::ScalarFuncSig::NullEQReal, "cast"}, @@ -508,57 +484,37 @@ std::unordered_map scalar_func_map({ //{tipb::ScalarFuncSig::NullEQDuration, "cast"}, //{tipb::ScalarFuncSig::NullEQJson, "cast"}, - {tipb::ScalarFuncSig::PlusReal, "plus"}, - {tipb::ScalarFuncSig::PlusDecimal, "plus"}, - {tipb::ScalarFuncSig::PlusInt, "plus"}, + {tipb::ScalarFuncSig::PlusReal, "plus"}, {tipb::ScalarFuncSig::PlusDecimal, "plus"}, {tipb::ScalarFuncSig::PlusInt, "plus"}, - {tipb::ScalarFuncSig::MinusReal, "minus"}, - {tipb::ScalarFuncSig::MinusDecimal, "minus"}, - {tipb::ScalarFuncSig::MinusInt, "minus"}, + {tipb::ScalarFuncSig::MinusReal, "minus"}, {tipb::ScalarFuncSig::MinusDecimal, "minus"}, {tipb::ScalarFuncSig::MinusInt, "minus"}, - {tipb::ScalarFuncSig::MultiplyReal, "multiply"}, - {tipb::ScalarFuncSig::MultiplyDecimal, "multiply"}, + {tipb::ScalarFuncSig::MultiplyReal, "multiply"}, {tipb::ScalarFuncSig::MultiplyDecimal, "multiply"}, {tipb::ScalarFuncSig::MultiplyInt, "multiply"}, - {tipb::ScalarFuncSig::DivideReal, "divide"}, - {tipb::ScalarFuncSig::DivideDecimal, "divide"}, - {tipb::ScalarFuncSig::IntDivideInt, "intDiv"}, - {tipb::ScalarFuncSig::IntDivideDecimal, "divide"}, + {tipb::ScalarFuncSig::DivideReal, "divide"}, {tipb::ScalarFuncSig::DivideDecimal, "divide"}, + {tipb::ScalarFuncSig::IntDivideInt, "intDiv"}, {tipb::ScalarFuncSig::IntDivideDecimal, "divide"}, - {tipb::ScalarFuncSig::ModReal, "modulo"}, - {tipb::ScalarFuncSig::ModDecimal, "modulo"}, - {tipb::ScalarFuncSig::ModInt, "modulo"}, + {tipb::ScalarFuncSig::ModReal, "modulo"}, {tipb::ScalarFuncSig::ModDecimal, "modulo"}, {tipb::ScalarFuncSig::ModInt, "modulo"}, {tipb::ScalarFuncSig::MultiplyIntUnsigned, "multiply"}, - {tipb::ScalarFuncSig::AbsInt, "abs"}, - {tipb::ScalarFuncSig::AbsUInt, "abs"}, - {tipb::ScalarFuncSig::AbsReal, "abs"}, + {tipb::ScalarFuncSig::AbsInt, "abs"}, {tipb::ScalarFuncSig::AbsUInt, "abs"}, {tipb::ScalarFuncSig::AbsReal, "abs"}, {tipb::ScalarFuncSig::AbsDecimal, "abs"}, - {tipb::ScalarFuncSig::CeilIntToDec, "ceil"}, - {tipb::ScalarFuncSig::CeilIntToInt, "ceil"}, - {tipb::ScalarFuncSig::CeilDecToInt, "ceil"}, - {tipb::ScalarFuncSig::CeilDecToDec, "ceil"}, - {tipb::ScalarFuncSig::CeilReal, "ceil"}, - - {tipb::ScalarFuncSig::FloorIntToDec, "floor"}, - {tipb::ScalarFuncSig::FloorIntToInt, "floor"}, - {tipb::ScalarFuncSig::FloorDecToInt, "floor"}, - {tipb::ScalarFuncSig::FloorDecToDec, "floor"}, - {tipb::ScalarFuncSig::FloorReal, "floor"}, - - {tipb::ScalarFuncSig::RoundReal, "round"}, - {tipb::ScalarFuncSig::RoundInt, "round"}, - {tipb::ScalarFuncSig::RoundDec, "round"}, + {tipb::ScalarFuncSig::CeilIntToDec, "ceil"}, {tipb::ScalarFuncSig::CeilIntToInt, "ceil"}, {tipb::ScalarFuncSig::CeilDecToInt, "ceil"}, + {tipb::ScalarFuncSig::CeilDecToDec, "ceil"}, {tipb::ScalarFuncSig::CeilReal, "ceil"}, + + {tipb::ScalarFuncSig::FloorIntToDec, "floor"}, {tipb::ScalarFuncSig::FloorIntToInt, "floor"}, + {tipb::ScalarFuncSig::FloorDecToInt, "floor"}, {tipb::ScalarFuncSig::FloorDecToDec, "floor"}, {tipb::ScalarFuncSig::FloorReal, "floor"}, + + {tipb::ScalarFuncSig::RoundReal, "round"}, {tipb::ScalarFuncSig::RoundInt, "round"}, {tipb::ScalarFuncSig::RoundDec, "round"}, //{tipb::ScalarFuncSig::RoundWithFracReal, "cast"}, //{tipb::ScalarFuncSig::RoundWithFracInt, "cast"}, //{tipb::ScalarFuncSig::RoundWithFracDec, "cast"}, {tipb::ScalarFuncSig::Log1Arg, "log"}, //{tipb::ScalarFuncSig::Log2Args, "cast"}, - {tipb::ScalarFuncSig::Log2, "log2"}, - {tipb::ScalarFuncSig::Log10, "log10"}, + {tipb::ScalarFuncSig::Log2, "log2"}, {tipb::ScalarFuncSig::Log10, "log10"}, {tipb::ScalarFuncSig::Rand, "rand"}, //{tipb::ScalarFuncSig::RandWithSeedFirstGen, "cast"}, @@ -568,9 +524,7 @@ std::unordered_map scalar_func_map({ //{tipb::ScalarFuncSig::CRC32, "cast"}, //{tipb::ScalarFuncSig::Sign, "cast"}, - {tipb::ScalarFuncSig::Sqrt, "sqrt"}, - {tipb::ScalarFuncSig::Acos, "acos"}, - {tipb::ScalarFuncSig::Asin, "asin"}, + {tipb::ScalarFuncSig::Sqrt, "sqrt"}, {tipb::ScalarFuncSig::Acos, "acos"}, {tipb::ScalarFuncSig::Asin, "asin"}, {tipb::ScalarFuncSig::Atan1Arg, "atan"}, //{tipb::ScalarFuncSig::Atan2Args, "cast"}, {tipb::ScalarFuncSig::Cos, "cos"}, @@ -579,28 +533,17 @@ std::unordered_map scalar_func_map({ {tipb::ScalarFuncSig::Exp, "exp"}, //{tipb::ScalarFuncSig::PI, "cast"}, //{tipb::ScalarFuncSig::Radians, "cast"}, - {tipb::ScalarFuncSig::Sin, "sin"}, - {tipb::ScalarFuncSig::Tan, "tan"}, - {tipb::ScalarFuncSig::TruncateInt, "trunc"}, + {tipb::ScalarFuncSig::Sin, "sin"}, {tipb::ScalarFuncSig::Tan, "tan"}, {tipb::ScalarFuncSig::TruncateInt, "trunc"}, {tipb::ScalarFuncSig::TruncateReal, "trunc"}, //{tipb::ScalarFuncSig::TruncateDecimal, "cast"}, {tipb::ScalarFuncSig::TruncateUint, "trunc"}, - {tipb::ScalarFuncSig::LogicalAnd, "and"}, - {tipb::ScalarFuncSig::LogicalOr, "or"}, - {tipb::ScalarFuncSig::LogicalXor, "xor"}, - {tipb::ScalarFuncSig::UnaryNotDecimal, "not"}, - {tipb::ScalarFuncSig::UnaryNotInt, "not"}, - {tipb::ScalarFuncSig::UnaryNotReal, "not"}, - {tipb::ScalarFuncSig::UnaryMinusInt, "negate"}, - {tipb::ScalarFuncSig::UnaryMinusReal, "negate"}, - {tipb::ScalarFuncSig::UnaryMinusDecimal, "negate"}, - {tipb::ScalarFuncSig::DecimalIsNull, "isNull"}, - {tipb::ScalarFuncSig::DurationIsNull, "isNull"}, - {tipb::ScalarFuncSig::RealIsNull, "isNull"}, - {tipb::ScalarFuncSig::StringIsNull, "isNull"}, - {tipb::ScalarFuncSig::TimeIsNull, "isNull"}, - {tipb::ScalarFuncSig::IntIsNull, "isNull"}, + {tipb::ScalarFuncSig::LogicalAnd, "and"}, {tipb::ScalarFuncSig::LogicalOr, "or"}, {tipb::ScalarFuncSig::LogicalXor, "xor"}, + {tipb::ScalarFuncSig::UnaryNotDecimal, "not"}, {tipb::ScalarFuncSig::UnaryNotInt, "not"}, {tipb::ScalarFuncSig::UnaryNotReal, "not"}, + {tipb::ScalarFuncSig::UnaryMinusInt, "negate"}, {tipb::ScalarFuncSig::UnaryMinusReal, "negate"}, + {tipb::ScalarFuncSig::UnaryMinusDecimal, "negate"}, {tipb::ScalarFuncSig::DecimalIsNull, "isNull"}, + {tipb::ScalarFuncSig::DurationIsNull, "isNull"}, {tipb::ScalarFuncSig::RealIsNull, "isNull"}, + {tipb::ScalarFuncSig::StringIsNull, "isNull"}, {tipb::ScalarFuncSig::TimeIsNull, "isNull"}, {tipb::ScalarFuncSig::IntIsNull, "isNull"}, {tipb::ScalarFuncSig::JsonIsNull, "isNull"}, //{tipb::ScalarFuncSig::BitAndSig, "cast"}, @@ -630,28 +573,16 @@ std::unordered_map scalar_func_map({ //{tipb::ScalarFuncSig::ValuesString, "cast"}, //{tipb::ScalarFuncSig::ValuesTime, "cast"}, - {tipb::ScalarFuncSig::InInt, "in"}, - {tipb::ScalarFuncSig::InReal, "in"}, - {tipb::ScalarFuncSig::InString, "in"}, - {tipb::ScalarFuncSig::InDecimal, "in"}, - {tipb::ScalarFuncSig::InTime, "in"}, - {tipb::ScalarFuncSig::InDuration, "in"}, + {tipb::ScalarFuncSig::InInt, "in"}, {tipb::ScalarFuncSig::InReal, "in"}, {tipb::ScalarFuncSig::InString, "in"}, + {tipb::ScalarFuncSig::InDecimal, "in"}, {tipb::ScalarFuncSig::InTime, "in"}, {tipb::ScalarFuncSig::InDuration, "in"}, {tipb::ScalarFuncSig::InJson, "in"}, - {tipb::ScalarFuncSig::IfNullInt, "ifNull"}, - {tipb::ScalarFuncSig::IfNullReal, "ifNull"}, - {tipb::ScalarFuncSig::IfNullString, "ifNull"}, - {tipb::ScalarFuncSig::IfNullDecimal, "ifNull"}, - {tipb::ScalarFuncSig::IfNullTime, "ifNull"}, - {tipb::ScalarFuncSig::IfNullDuration, "ifNull"}, - {tipb::ScalarFuncSig::IfNullJson, "ifNull"}, - - {tipb::ScalarFuncSig::IfInt, "if"}, - {tipb::ScalarFuncSig::IfReal, "if"}, - {tipb::ScalarFuncSig::IfString, "if"}, - {tipb::ScalarFuncSig::IfDecimal, "if"}, - {tipb::ScalarFuncSig::IfTime, "if"}, - {tipb::ScalarFuncSig::IfDuration, "if"}, + {tipb::ScalarFuncSig::IfNullInt, "ifNull"}, {tipb::ScalarFuncSig::IfNullReal, "ifNull"}, {tipb::ScalarFuncSig::IfNullString, "ifNull"}, + {tipb::ScalarFuncSig::IfNullDecimal, "ifNull"}, {tipb::ScalarFuncSig::IfNullTime, "ifNull"}, + {tipb::ScalarFuncSig::IfNullDuration, "ifNull"}, {tipb::ScalarFuncSig::IfNullJson, "ifNull"}, + + {tipb::ScalarFuncSig::IfInt, "if"}, {tipb::ScalarFuncSig::IfReal, "if"}, {tipb::ScalarFuncSig::IfString, "if"}, + {tipb::ScalarFuncSig::IfDecimal, "if"}, {tipb::ScalarFuncSig::IfTime, "if"}, {tipb::ScalarFuncSig::IfDuration, "if"}, {tipb::ScalarFuncSig::IfJson, "if"}, //todo need further check for caseWithExpression and multiIf @@ -948,7 +879,8 @@ std::unordered_map scalar_func_map({ //{tipb::ScalarFuncSig::Trim2Args, "cast"}, //{tipb::ScalarFuncSig::Trim3Args, "cast"}, //{tipb::ScalarFuncSig::UnHex, "cast"}, - {tipb::ScalarFuncSig::Upper, "upper"}, + {tipb::ScalarFuncSig::UpperUTF8, "upper"}, + //{tipb::ScalarFuncSig::Upper, "upper"}, //{tipb::ScalarFuncSig::CharLength, "upper"}, }); diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.h b/dbms/src/Flash/Coprocessor/DAGUtils.h index 06a55defb4d..e1b20213f71 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.h +++ b/dbms/src/Flash/Coprocessor/DAGUtils.h @@ -36,7 +36,7 @@ extern const Int8 VAR_SIZE; tipb::FieldType columnInfoToFieldType(const TiDB::ColumnInfo & ci); TiDB::ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type); -bool hasUnsupportedTypeForArrowEncode(const std::vector & types); UInt8 getFieldLengthForArrowEncode(Int32 tp); +bool isUnsupportedEncodeType(const std::vector & types, tipb::EncodeType encode_type); } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index be03113941d..290fed448b5 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -46,7 +46,7 @@ extern const int COP_BAD_DAG_REQUEST; InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_) : context(context_), dag(dag_), - keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeChunk), + keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeChunk || dag.getEncodeType() == tipb::EncodeType::TypeCHBlock), log(&Logger::get("InterpreterDAG")) {} diff --git a/tests/mutable-test/txn_dag/chblock_encode.test b/tests/mutable-test/txn_dag/chblock_encode.test new file mode 100644 index 00000000000..0f4d00741c6 --- /dev/null +++ b/tests/mutable-test/txn_dag/chblock_encode.test @@ -0,0 +1,45 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 Int8, col_2 UInt8, col_3 Int16, col_4 Nullable(UInt16), col_5 Int32, col_6 UInt32, col_7 Int64, col_8 UInt64, col_9 Nullable(Float32), col_10 Float64, col_11 MyDate, col_12 Nullable(MyDateTime), col_13 Nullable(String), col_14 Nullable(Decimal(8,2)), col_15 Nullable(Int64) default 1024, col_16 Nullable(UInt64) default 1, col_17 Nullable(UInt64) default 24, col_18 Nullable(Int8)') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode test', 12.12,100,1,9572888,1) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode',123.23,100,0,null,2) +=> DBGInvoke __raft_insert_row(default, test, 4, 52, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'encode test',0.00,100,1,9572887,null) +=> DBGInvoke __raft_insert_row(default, test, 4, 53, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', null,12345.10,null,0,9572886,3) +=> DBGInvoke __raft_insert_row(default, test, 4, 54, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', null, 'arrow encode test',null,null,1,9572888,4) +=> DBGInvoke __raft_insert_row(default, test, 4, 55, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, null, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode test',11.11,100,null,9572888,1) +=> DBGInvoke __raft_insert_row(default, test, 4, 56, -128, 255, -32768, null, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode test',22.22,100,0,9572888,2) + +=> DBGInvoke dag('select * from default.test',4,'chblock') +┌─col_1─┬─col_2─┬──col_3─┬─col_4─┬───────col_5─┬──────col_6─┬────────────────col_7─┬────────────────col_8─┬─────col_9─┬─────────col_10─┬─────col_11─┬──────────────col_12─┬─col_13────────────┬─col_14───┬─col_15─┬─col_16─┬──col_17─┬─col_18─┐ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 12.12 │ 100 │ 1 │ 9572888 │ 1 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode │ 123.23 │ 100 │ 0 │ \N │ 2 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ encode test │ 0.00 │ 100 │ 1 │ 9572887 │ \N │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ \N │ 12345.10 │ \N │ 0 │ 9572886 │ 3 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ \N │ arrow encode test │ \N │ \N │ 1 │ 9572888 │ 4 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ \N │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 11.11 │ 100 │ \N │ 9572888 │ 1 │ +│ -128 │ 255 │ -32768 │ \N │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 22.22 │ 100 │ 0 │ 9572888 │ 2 │ +└───────┴───────┴────────┴───────┴─────────────┴────────────┴──────────────────────┴──────────────────────┴───────────┴────────────────┴────────────┴─────────────────────┴───────────────────┴──────────┴────────┴────────┴─────────┴────────┘ + +=> DBGInvoke mock_dag('select * from default.test',4,0,'chblock') +┌─col_1─┬─col_2─┬──col_3─┬─col_4─┬───────col_5─┬──────col_6─┬────────────────col_7─┬────────────────col_8─┬─────col_9─┬─────────col_10─┬─────col_11─┬──────────────col_12─┬─col_13────────────┬─col_14───┬─col_15─┬─col_16─┬──col_17─┬─col_18─┐ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 12.12 │ 100 │ 1 │ 9572888 │ 1 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode │ 123.23 │ 100 │ 0 │ \N │ 2 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ encode test │ 0.00 │ 100 │ 1 │ 9572887 │ \N │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ \N │ 12345.10 │ \N │ 0 │ 9572886 │ 3 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ \N │ arrow encode test │ \N │ \N │ 1 │ 9572888 │ 4 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ \N │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 11.11 │ 100 │ \N │ 9572888 │ 1 │ +│ -128 │ 255 │ -32768 │ \N │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 22.22 │ 100 │ 0 │ 9572888 │ 2 │ +└───────┴───────┴────────┴───────┴─────────────┴────────────┴──────────────────────┴──────────────────────┴───────────┴────────────────┴────────────┴─────────────────────┴───────────────────┴──────────┴────────┴────────┴─────────┴────────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test