Skip to content

Commit

Permalink
FLASH-795 support chblock encode type (#383)
Browse files Browse the repository at this point in the history
* support chblock encode type for cop request

* fix timestamp bug for ch block encode

* add unsupported type for chblock encode

* refine code

* fix build error

* address comments
  • Loading branch information
windtalker authored Jan 4, 2020
1 parent ed0ecec commit d6d1d2c
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 170 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
34 changes: 17 additions & 17 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Debug/MockTiDB.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGUtils.h>
Expand Down Expand Up @@ -368,6 +369,8 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context,

if (encode_type == "chunk")
dag_request.set_encode_type(tipb::EncodeType::TypeChunk);
else if (encode_type == "chblock")
dag_request.set_encode_type(tipb::EncodeType::TypeCHBlock);
else
dag_request.set_encode_type(tipb::EncodeType::TypeDefault);

Expand Down Expand Up @@ -670,22 +673,26 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest
return dag_response;
}

void arrowChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks)
std::unique_ptr<ChunkCodec> getCodec(tipb::EncodeType encode_type)
{
ArrowChunkCodec codec;
for (const auto & chunk : dag_response.chunks())
switch (encode_type)
{
blocks.emplace_back(codec.decode(chunk, schema));
case tipb::EncodeType::TypeDefault:
return std::make_unique<DefaultChunkCodec>();
case tipb::EncodeType::TypeChunk:
return std::make_unique<ArrowChunkCodec>();
case tipb::EncodeType::TypeCHBlock:
return std::make_unique<CHBlockChunkCodec>();
default:
throw Exception("Unsupported encode type", ErrorCodes::BAD_ARGUMENTS);
}
}

void defaultChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks)
void chunksToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks)
{
DefaultChunkCodec codec;
auto codec = getCodec(dag_response.encode_type());
for (const auto & chunk : dag_response.chunks())
{
blocks.emplace_back(codec.decode(chunk, schema));
}
blocks.emplace_back(codec->decode(chunk, schema));
}

BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response)
Expand All @@ -694,14 +701,7 @@ BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const
throw Exception(dag_response.error().msg(), dag_response.error().code());

BlocksList blocks;
if (dag_response.encode_type() == tipb::EncodeType::TypeChunk)
{
arrowChunkToBlocks(schema, dag_response, blocks);
}
else
{
defaultChunkToBlocks(schema, dag_response, blocks);
}
chunksToBlocks(schema, dag_response, blocks);
return std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
}

Expand Down
80 changes: 80 additions & 0 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <IO/ReadBufferFromString.h>

namespace DB
{

class CHBlockChunkCodecStream : public ChunkCodecStream
{
public:
explicit CHBlockChunkCodecStream(const std::vector<tipb::FieldType> & field_types) : ChunkCodecStream(field_types)
{
output = std::make_unique<WriteBufferFromOwnString>();
}

String getString() override
{
std::stringstream ss;
return output->str();
}
void clear() override { output = std::make_unique<WriteBufferFromOwnString>(); }
void encode(const Block & block, size_t start, size_t end) override;
std::unique_ptr<WriteBufferFromOwnString> output;
};

void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit)
{
/** If there are columns-constants - then we materialize them.
* (Since the data type does not know how to serialize / deserialize constants.)
*/
ColumnPtr full_column;

if (ColumnPtr converted = column->convertToFullColumnIfConst())
full_column = converted;
else
full_column = column;

IDataType::OutputStreamGetter output_stream_getter = [&](const IDataType::SubstreamPath &) { return &ostr; };
type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {});
}

void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end)
{
// Encode data in chunk by chblock encode
if (start != 0 || end != block.rows())
throw Exception("CHBlock encode only support encode whole block");
block.checkNumberOfRows();
size_t columns = block.columns();
size_t rows = block.rows();

writeVarUInt(columns, *output);
writeVarUInt(rows, *output);

for (size_t i = 0; i < columns; i++)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);

writeStringBinary(column.name, *output);
writeStringBinary(column.type->getName(), *output);

if (rows)
writeData(*column.type, column.column, *output, 0, 0);
}
}

std::unique_ptr<ChunkCodecStream> CHBlockChunkCodec::newCodecStream(const std::vector<tipb::FieldType> & field_types)
{
return std::make_unique<CHBlockChunkCodecStream>(field_types);
}

Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema &)
{
const String & row_data = chunk.rows_data();
ReadBufferFromString read_buffer(row_data);
NativeBlockInputStream block_in(read_buffer, 0);
return block_in.read();
}

} // namespace DB
16 changes: 16 additions & 0 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <Flash/Coprocessor/ChunkCodec.h>

namespace DB
{

class CHBlockChunkCodec : public ChunkCodec
{
public:
CHBlockChunkCodec() = default;
Block decode(const tipb::Chunk &, const DAGSchema &) override;
std::unique_ptr<ChunkCodecStream> newCodecStream(const std::vector<tipb::FieldType> & field_types) override;
};

} // namespace DB
32 changes: 25 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Flash/Coprocessor/DAGBlockOutputStream.h>

#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DefaultChunkCodec.h>

namespace DB
Expand Down Expand Up @@ -28,6 +29,11 @@ DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_,
{
chunk_codec_stream = std::make_unique<ArrowChunkCodec>()->newCodecStream(result_field_types);
}
else if (encodeType == tipb::EncodeType::TypeCHBlock)
{
chunk_codec_stream = std::make_unique<CHBlockChunkCodec>()->newCodecStream(result_field_types);
records_per_chunk = -1;
}
else
{
throw Exception("Only Default and Arrow encode type is supported in DAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER);
Expand Down Expand Up @@ -63,17 +69,29 @@ void DAGBlockOutputStream::write(const Block & block)
{
if (block.columns() != result_field_types.size())
throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR);
size_t rows = block.rows();
for (size_t row_index = 0; row_index < rows;)
if (records_per_chunk == -1)
{
if (current_records_num >= records_per_chunk)
current_records_num = 0;
if (block.rows() > 0)
{
chunk_codec_stream->encode(block, 0, block.rows());
encodeChunkToDAGResponse();
}
const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows);
chunk_codec_stream->encode(block, row_index, upper);
current_records_num += (upper - row_index);
row_index = upper;
}
else
{
size_t rows = block.rows();
for (size_t row_index = 0; row_index < rows;)
{
if (current_records_num >= records_per_chunk)
{
encodeChunkToDAGResponse();
}
const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows);
chunk_codec_stream->encode(block, row_index, upper);
current_records_num += (upper - row_index);
row_index = upper;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DAGBlockOutputStream : public IBlockOutputStream
tipb::SelectResponse & dag_response;
std::vector<tipb::FieldType> result_field_types;
Block header;
const Int64 records_per_chunk;
Int64 records_per_chunk;
std::unique_ptr<ChunkCodecStream> chunk_codec_stream;
Int64 current_records_num;
};
Expand Down
23 changes: 13 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, Re
"Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED);
}
}
analyzeResultFieldTypes();
analyzeDAGEncodeType();
}

void DAGQuerySource::analyzeDAGEncodeType()
{
encode_type = dag_request.encode_type();
if (encode_type == tipb::EncodeType::TypeChunk && hasUnsupportedTypeForArrowEncode(getResultFieldTypes()))
{
if (isUnsupportedEncodeType(getResultFieldTypes(), encode_type))
encode_type = tipb::EncodeType::TypeDefault;
}
if (encode_type == tipb::EncodeType::TypeChunk && dag_request.has_chunk_memory_layout()
&& dag_request.chunk_memory_layout().has_endian() && dag_request.chunk_memory_layout().endian() == tipb::Endian::BigEndian)
// todo support BigEndian encode for chunk encode type
throw Exception("BigEndian encode for chunk encode type is not supported yet.", ErrorCodes::NOT_IMPLEMENTED);
encode_type = tipb::EncodeType::TypeDefault;
}

std::tuple<std::string, ASTPtr> DAGQuerySource::parse(size_t max_query_size)
Expand Down Expand Up @@ -135,7 +139,7 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<t
}
}

std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
void DAGQuerySource::analyzeResultFieldTypes()
{
std::vector<tipb::FieldType> executor_output;
for (int i = dag_request.executors_size() - 1; i >= 0; i--)
Expand All @@ -152,14 +156,13 @@ std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
// todo should always use output offset to re-construct the output field types
if (hasAggregation())
{
return executor_output;
result_field_types = std::move(executor_output);
}
std::vector<tipb::FieldType> ret;
for (int i : dag_request.output_offsets())
else
{
ret.push_back(executor_output[i]);
for (UInt32 i : dag_request.output_offsets())
result_field_types.push_back(executor_output[i]);
}
return ret;
}

} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQuerySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class DAGQuerySource : public IQuerySource
};
const tipb::DAGRequest & getDAGRequest() const { return dag_request; };

std::vector<tipb::FieldType> getResultFieldTypes() const;
std::vector<tipb::FieldType> getResultFieldTypes() const { return result_field_types; };

ASTPtr getAST() const { return ast; };

Expand All @@ -95,6 +95,9 @@ class DAGQuerySource : public IQuerySource
}
}

void analyzeResultFieldTypes();
void analyzeDAGEncodeType();

protected:
Context & context;
DAGContext & dag_context;
Expand All @@ -112,6 +115,7 @@ class DAGQuerySource : public IQuerySource
Int32 order_index = -1;
Int32 limit_index = -1;

std::vector<tipb::FieldType> result_field_types;
tipb::EncodeType encode_type;

ASTPtr ast;
Expand Down
Loading

0 comments on commit d6d1d2c

Please sign in to comment.