Skip to content

Commit

Permalink
Exchange receiver decode optimization to do squashing work at the sam…
Browse files Browse the repository at this point in the history
…e time (#6202)

close #6157
  • Loading branch information
yibin87 authored Nov 2, 2022
1 parent bbe697c commit dc28b51
Show file tree
Hide file tree
Showing 17 changed files with 1,022 additions and 84 deletions.
30 changes: 3 additions & 27 deletions dbms/src/DataStreams/NativeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,8 @@ namespace ErrorCodes
extern const int INCORRECT_INDEX;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int NOT_IMPLEMENTED;
} // namespace ErrorCodes

namespace
{
void checkColumnSize(size_t expected, size_t actual)
{
if (expected != actual)
throw Exception(
fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual),
ErrorCodes::LOGICAL_ERROR);
}

void checkDataTypeName(size_t column_index, const String & expected, const String & actual)
{
if (expected != actual)
throw Exception(
fmt::format(
"NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}",
column_index,
expected,
actual),
ErrorCodes::LOGICAL_ERROR);
}
} // namespace

NativeBlockInputStream::NativeBlockInputStream(
ReadBuffer & istr_,
UInt64 server_revision_,
Expand Down Expand Up @@ -179,9 +155,9 @@ Block NativeBlockInputStream::readImpl()
}

if (header)
checkColumnSize(header.columns(), columns);
CodecUtils::checkColumnSize(header.columns(), columns);
else if (!output_names.empty())
checkColumnSize(output_names.size(), columns);
CodecUtils::checkColumnSize(output_names.size(), columns);

for (size_t i = 0; i < columns; ++i)
{
Expand All @@ -208,7 +184,7 @@ Block NativeBlockInputStream::readImpl()
readBinary(type_name, istr);
if (header)
{
checkDataTypeName(i, header_datatypes[i].name, type_name);
CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name);
column.type = header_datatypes[i].type;
}
else
Expand Down
17 changes: 3 additions & 14 deletions dbms/src/DataStreams/NativeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/PODArray.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MarkInCompressedFile.h>
#include <Flash/Coprocessor/CodecUtils.h>
#include <IO/CompressedReadBufferFromFile.h>

namespace DB
Expand Down Expand Up @@ -116,27 +117,15 @@ class NativeBlockInputStream : public IProfilingBlockInputStream
Block header;
UInt64 server_revision;
bool align_column_name_with_header = false;

struct DataTypeWithTypeName
{
DataTypeWithTypeName(const DataTypePtr & t, const String & n)
: type(t)
, name(n)
{
}

DataTypePtr type;
String name;
};
std::vector<DataTypeWithTypeName> header_datatypes;
std::vector<CodecUtils::DataTypeWithTypeName> header_datatypes;

bool use_index = false;
IndexForNativeFormat::Blocks::const_iterator index_block_it;
IndexForNativeFormat::Blocks::const_iterator index_block_end;
IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it;

/// If an index is specified, then `istr` must be CompressedReadBufferFromFile.
CompressedReadBufferFromFile<> * istr_concrete;
CompressedReadBufferFromFile<> * istr_concrete = nullptr;

PODArray<double> avg_value_size_hints;

Expand Down
16 changes: 12 additions & 4 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/IChunkDecodeAndSquash.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -64,6 +65,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
// CoprocessorBlockInputStream doesn't take care of this.
size_t stream_id;

std::unique_ptr<IChunkDecodeAndSquash> decoder_ptr;

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (const auto & execution_summary : resp.execution_summaries())
Expand All @@ -84,6 +87,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
{
if (resp.execution_summaries_size() == 0)
return;

if (!execution_summaries_inited[index].load())
{
initRemoteExecutionSummaries(resp, index);
Expand Down Expand Up @@ -128,7 +132,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
{
while (true)
{
auto result = remote_reader->nextResult(block_queue, sample_block, stream_id);
auto result = remote_reader->nextResult(block_queue, sample_block, stream_id, decoder_ptr);
if (result.meet_error)
{
LOG_WARNING(log, "remote reader meets error: {}", result.error_msg);
Expand All @@ -155,21 +159,22 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
}

const auto & decode_detail = result.decode_detail;
total_rows += decode_detail.rows;

size_t index = 0;
if constexpr (is_streaming_reader)
index = result.call_index;

++connection_profile_infos[index].packets;
connection_profile_infos[index].packets += decode_detail.packets;
connection_profile_infos[index].bytes += decode_detail.packet_bytes;

total_rows += decode_detail.rows;
LOG_TRACE(
log,
"recv {} rows from remote for {}, total recv row num: {}",
decode_detail.rows,
result.req_info,
total_rows);

if (decode_detail.rows > 0)
return true;
// else continue
Expand All @@ -193,6 +198,9 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
constexpr size_t squash_rows_limit = 8192;
if constexpr (is_streaming_reader)
decoder_ptr = std::make_unique<CHBlockChunkDecodeAndSquash>(sample_block, squash_rows_limit);
}

Block getHeader() const override { return sample_block; }
Expand All @@ -211,7 +219,6 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
if (!fetchRemoteResult())
return {};
}
// todo should merge some blocks to make sure the output block is big enough
Block block = block_queue.front();
block_queue.pop();
return block;
Expand All @@ -222,6 +229,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr;
}

size_t getTotalRows() const { return total_rows; }
size_t getSourceNum() const { return source_num; }
bool isStreamingCall() const { return is_streaming_reader; }
const std::vector<ConnectionProfileInfo> & getConnectionProfileInfos() const { return connection_profile_infos; }
Expand Down
101 changes: 93 additions & 8 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/TiFlashException.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <IO/ReadBufferFromString.h>
Expand Down Expand Up @@ -48,6 +49,20 @@ class CHBlockChunkCodecStream : public ChunkCodecStream
DataTypes expected_types;
};

CHBlockChunkCodec::CHBlockChunkCodec(
const Block & header_)
: header(header_)
{
for (const auto & column : header)
header_datatypes.emplace_back(column.type, column.type->getName());
}

CHBlockChunkCodec::CHBlockChunkCodec(const DAGSchema & schema)
{
for (const auto & c : schema)
output_names.push_back(c.first);
}

size_t getExtraInfoSize(const Block & block)
{
size_t size = 64; /// to hold some length of structures, such as column number, row number...
Expand Down Expand Up @@ -83,6 +98,14 @@ void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & o
type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {});
}

void CHBlockChunkCodec::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows)
{
IDataType::InputStreamGetter input_stream_getter = [&](const IDataType::SubstreamPath &) {
return &istr;
};
type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, 0, false, {});
}

void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end)
{
/// only check block schema in CHBlock codec because for both
Expand Down Expand Up @@ -120,21 +143,83 @@ std::unique_ptr<ChunkCodecStream> CHBlockChunkCodec::newCodecStream(const std::v
return std::make_unique<CHBlockChunkCodecStream>(field_types);
}

Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr, size_t reserve_size)
{
Block res;
if (istr.eof())
{
return res;
}

/// Dimensions
size_t columns = 0;
size_t rows = 0;
readBlockMeta(istr, columns, rows);

for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName column;
readColumnMeta(i, istr, column);

/// Data
MutableColumnPtr read_column = column.type->createColumn();
if (reserve_size > 0)
read_column->reserve(std::max(rows, reserve_size));
else if (rows)
read_column->reserve(rows);

if (rows) /// If no rows, nothing to read.
readData(*column.type, *read_column, istr, rows);

column.column = std::move(read_column);
res.insert(std::move(column));
}
return res;
}
void CHBlockChunkCodec::readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const
{
readVarUInt(columns, istr);
readVarUInt(rows, istr);

if (header)
CodecUtils::checkColumnSize(header.columns(), columns);
else if (!output_names.empty())
CodecUtils::checkColumnSize(output_names.size(), columns);
}

void CHBlockChunkCodec::readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column)
{
/// Name
readBinary(column.name, istr);
if (header)
column.name = header.getByPosition(i).name;
else if (!output_names.empty())
column.name = output_names[i];

/// Type
String type_name;
readBinary(type_name, istr);
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
if (header)
{
CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name);
column.type = header_datatypes[i].type;
}
else
{
column.type = data_type_factory.get(type_name);
}
}

Block CHBlockChunkCodec::decode(const String & str, const DAGSchema & schema)
{
ReadBufferFromString read_buffer(str);
std::vector<String> output_names;
for (const auto & c : schema)
output_names.push_back(c.first);
NativeBlockInputStream block_in(read_buffer, 0, std::move(output_names));
return block_in.read();
return CHBlockChunkCodec(schema).decodeImpl(read_buffer);
}

Block CHBlockChunkCodec::decode(const String & str, const Block & header)
{
ReadBufferFromString read_buffer(str);
NativeBlockInputStream block_in(read_buffer, header, 0, /*align_column_name_with_header=*/true);
return block_in.read();
return CHBlockChunkCodec(header).decodeImpl(read_buffer);
}

} // namespace DB
20 changes: 19 additions & 1 deletion dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,34 @@
#pragma once

#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/CodecUtils.h>

namespace DB
{
class CHBlockChunkCodec : public ChunkCodec
class CHBlockChunkDecodeAndSquash;

class CHBlockChunkCodec final : public ChunkCodec
{
public:
CHBlockChunkCodec() = default;
CHBlockChunkCodec(const Block & header_);
CHBlockChunkCodec(const DAGSchema & schema);

Block decode(const String &, const DAGSchema & schema) override;
static Block decode(const String &, const Block & header);
std::unique_ptr<ChunkCodecStream> newCodecStream(const std::vector<tipb::FieldType> & field_types) override;

private:
friend class CHBlockChunkDecodeAndSquash;
void readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column);
void readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const;
static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows);
/// 'reserve_size' used for Squash usage, and takes effect when 'reserve_size' > 0
Block decodeImpl(ReadBuffer & istr, size_t reserve_size = 0);

Block header;
std::vector<CodecUtils::DataTypeWithTypeName> header_datatypes;
std::vector<String> output_names;
};

} // namespace DB
49 changes: 49 additions & 0 deletions dbms/src/Flash/Coprocessor/CodecUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/CodecUtils.h>
#include <Flash/Coprocessor/DAGUtils.h>

namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

namespace CodecUtils
{
void checkColumnSize(size_t expected, size_t actual)
{
if unlikely (expected != actual)
throw Exception(
fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual),
ErrorCodes::LOGICAL_ERROR);
}

void checkDataTypeName(size_t column_index, const String & expected, const String & actual)
{
if unlikely (expected != actual)
throw Exception(
fmt::format(
"NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}",
column_index,
expected,
actual),
ErrorCodes::LOGICAL_ERROR);
}

} // namespace CodecUtils
} // namespace DB
Loading

0 comments on commit dc28b51

Please sign in to comment.