Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-795 support chblock encode type #383

Merged
merged 8 commits into from
Jan 4, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tipb
34 changes: 17 additions & 17 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Debug/MockTiDB.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGUtils.h>
Expand Down Expand Up @@ -331,6 +332,8 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context,

if (encode_type == "chunk")
dag_request.set_encode_type(tipb::EncodeType::TypeChunk);
else if (encode_type == "chblock")
dag_request.set_encode_type(tipb::EncodeType::TypeCHBlock);
else
dag_request.set_encode_type(tipb::EncodeType::TypeDefault);

Expand Down Expand Up @@ -633,22 +636,26 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest
return dag_response;
}

void arrowChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks)
std::unique_ptr<ChunkCodec> getCodec(tipb::EncodeType encode_type)
{
ArrowChunkCodec codec;
for (const auto & chunk : dag_response.chunks())
switch (encode_type)
{
blocks.emplace_back(codec.decode(chunk, schema));
case tipb::EncodeType::TypeDefault:
return std::make_unique<DefaultChunkCodec>();
case tipb::EncodeType::TypeChunk:
return std::make_unique<ArrowChunkCodec>();
case tipb::EncodeType::TypeCHBlock:
return std::make_unique<CHBlockChunkCodec>();
default:
throw Exception("Unsupported encode type", ErrorCodes::BAD_ARGUMENTS);
}
}

void defaultChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks)
void chunksToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks)
{
DefaultChunkCodec codec;
auto codec = getCodec(dag_response.encode_type());
for (const auto & chunk : dag_response.chunks())
{
blocks.emplace_back(codec.decode(chunk, schema));
}
blocks.emplace_back(codec->decode(chunk, schema));
}

BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response)
Expand All @@ -657,14 +664,7 @@ BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const
throw Exception(dag_response.error().msg(), dag_response.error().code());

BlocksList blocks;
if (dag_response.encode_type() == tipb::EncodeType::TypeChunk)
{
arrowChunkToBlocks(schema, dag_response, blocks);
}
else
{
defaultChunkToBlocks(schema, dag_response, blocks);
}
chunksToBlocks(schema, dag_response, blocks);
return std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
}

Expand Down
80 changes: 80 additions & 0 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <IO/ReadBufferFromString.h>

namespace DB
{

class CHBlockChunkCodecStream : public ChunkCodecStream
{
public:
explicit CHBlockChunkCodecStream(const std::vector<tipb::FieldType> & field_types) : ChunkCodecStream(field_types)
{
output = std::make_unique<WriteBufferFromOwnString>();
}

String getString() override
{
std::stringstream ss;
return output->str();
}
void clear() override { output = std::make_unique<WriteBufferFromOwnString>(); }
void encode(const Block & block, size_t start, size_t end) override;
std::unique_ptr<WriteBufferFromOwnString> output;
};

void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit)
{
/** If there are columns-constants - then we materialize them.
* (Since the data type does not know how to serialize / deserialize constants.)
*/
ColumnPtr full_column;

if (ColumnPtr converted = column->convertToFullColumnIfConst())
full_column = converted;
else
full_column = column;

IDataType::OutputStreamGetter output_stream_getter = [&](const IDataType::SubstreamPath &) { return &ostr; };
type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {});
}

void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end)
{
// Encode data in chunk by arrow encode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrow?

if (start != 0 || end != block.rows())
throw Exception("CHBlock encode only support encode whole block");
block.checkNumberOfRows();
size_t columns = block.columns();
size_t rows = block.rows();

writeVarUInt(columns, *output);
writeVarUInt(rows, *output);

for (size_t i = 0; i < columns; i++)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);

writeStringBinary(column.name, *output);
writeStringBinary(column.type->getName(), *output);

if (rows)
writeData(*column.type, column.column, *output, 0, 0);
}
}

std::unique_ptr<ChunkCodecStream> CHBlockChunkCodec::newCodecStream(const std::vector<tipb::FieldType> & field_types)
{
return std::make_unique<CHBlockChunkCodecStream>(field_types);
}

Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema &)
{
const String & row_data = chunk.rows_data();
ReadBufferFromString read_buffer(row_data);
NativeBlockInputStream block_in(read_buffer, 0);
return block_in.read();
}

} // namespace DB
16 changes: 16 additions & 0 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <Flash/Coprocessor/ChunkCodec.h>

namespace DB
{

class CHBlockChunkCodec : public ChunkCodec
{
public:
CHBlockChunkCodec() = default;
Block decode(const tipb::Chunk &, const DAGSchema &) override;
std::unique_ptr<ChunkCodecStream> newCodecStream(const std::vector<tipb::FieldType> & field_types) override;
};

} // namespace DB
32 changes: 25 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Flash/Coprocessor/DAGBlockOutputStream.h>

#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DefaultChunkCodec.h>

namespace DB
Expand Down Expand Up @@ -28,6 +29,11 @@ DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_,
{
chunk_codec_stream = std::make_unique<ArrowChunkCodec>()->newCodecStream(result_field_types);
}
else if (encodeType == tipb::EncodeType::TypeCHBlock)
{
chunk_codec_stream = std::make_unique<CHBlockChunkCodec>()->newCodecStream(result_field_types);
records_per_chunk = -1;
}
else
{
throw Exception("Only Default and Arrow encode type is supported in DAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER);
Expand Down Expand Up @@ -63,17 +69,29 @@ void DAGBlockOutputStream::write(const Block & block)
{
if (block.columns() != result_field_types.size())
throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR);
size_t rows = block.rows();
for (size_t row_index = 0; row_index < rows;)
if (records_per_chunk == -1)
{
if (current_records_num >= records_per_chunk)
current_records_num = 0;
if (block.rows() > 0)
{
chunk_codec_stream->encode(block, 0, block.rows());
encodeChunkToDAGResponse();
}
const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows);
chunk_codec_stream->encode(block, row_index, upper);
current_records_num += (upper - row_index);
row_index = upper;
}
else
{
size_t rows = block.rows();
for (size_t row_index = 0; row_index < rows;)
{
if (current_records_num >= records_per_chunk)
{
encodeChunkToDAGResponse();
}
const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows);
chunk_codec_stream->encode(block, row_index, upper);
current_records_num += (upper - row_index);
row_index = upper;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DAGBlockOutputStream : public IBlockOutputStream
tipb::SelectResponse & dag_response;
std::vector<tipb::FieldType> result_field_types;
Block header;
const Int64 records_per_chunk;
Int64 records_per_chunk;
std::unique_ptr<ChunkCodecStream> chunk_codec_stream;
Int64 current_records_num;
};
Expand Down
23 changes: 13 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, Re
"Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED);
}
}
analyzeResultFieldTypes();
analyzeDAGEncodeType();
}

void DAGQuerySource::analyzeDAGEncodeType()
{
encode_type = dag_request.encode_type();
if (encode_type == tipb::EncodeType::TypeChunk && hasUnsupportedTypeForArrowEncode(getResultFieldTypes()))
{
if (isUnsupportedEncodeType(getResultFieldTypes(), encode_type))
encode_type = tipb::EncodeType::TypeDefault;
}
if (encode_type == tipb::EncodeType::TypeChunk && dag_request.has_chunk_memory_layout()
&& dag_request.chunk_memory_layout().has_endian() && dag_request.chunk_memory_layout().endian() == tipb::Endian::BigEndian)
// todo support BigEndian encode for chunk encode type
throw Exception("BigEndian encode for chunk encode type is not supported yet.", ErrorCodes::NOT_IMPLEMENTED);
encode_type = tipb::EncodeType::TypeDefault;
}

std::tuple<std::string, ASTPtr> DAGQuerySource::parse(size_t max_query_size)
Expand Down Expand Up @@ -135,7 +139,7 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<t
}
}

std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
void DAGQuerySource::analyzeResultFieldTypes()
{
std::vector<tipb::FieldType> executor_output;
for (int i = dag_request.executors_size() - 1; i >= 0; i--)
Expand All @@ -152,14 +156,13 @@ std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
// todo should always use output offset to re-construct the output field types
if (hasAggregation())
{
return executor_output;
result_field_types = std::move(executor_output);
}
std::vector<tipb::FieldType> ret;
for (int i : dag_request.output_offsets())
else
{
ret.push_back(executor_output[i]);
for (UInt32 i : dag_request.output_offsets())
result_field_types.push_back(executor_output[i]);
}
return ret;
}

} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQuerySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class DAGQuerySource : public IQuerySource
};
const tipb::DAGRequest & getDAGRequest() const { return dag_request; };

std::vector<tipb::FieldType> getResultFieldTypes() const;
std::vector<tipb::FieldType> getResultFieldTypes() const { return result_field_types; };

ASTPtr getAST() const { return ast; };

Expand All @@ -95,6 +95,9 @@ class DAGQuerySource : public IQuerySource
}
}

void analyzeResultFieldTypes();
void analyzeDAGEncodeType();

protected:
Context & context;
DAGContext & dag_context;
Expand All @@ -112,6 +115,7 @@ class DAGQuerySource : public IQuerySource
Int32 order_index = -1;
Int32 limit_index = -1;

std::vector<tipb::FieldType> result_field_types;
tipb::EncodeType encode_type;

ASTPtr ast;
Expand Down
Loading