Skip to content

Commit

Permalink
80
Browse files Browse the repository at this point in the history
Signed-off-by: Zhigao Tong <[email protected]>
  • Loading branch information
solotzg committed Jan 30, 2023
1 parent 3c8155f commit 68921ee
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 31 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
}
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = NewMPPExchangeWriter(
std::unique_ptr<DAGResponseWriter> response_writer = newMPPExchangeWriter(
dagContext().tunnel_set,
partition_col_ids,
partition_col_collators,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ struct MockWriter

void broadcastOrPassThroughWrite(Blocks & blocks)
{
auto packet = MPPTunnelSetHelper::ToPacket(blocks, result_field_types, MPPDataPacketV0);
auto && packet = MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types);
++total_packets;
if (!packet)
return;

if (!packet->packet.chunks().empty())
total_bytes += packet->packet.ByteSizeLong();
queue->push(std::move(packet));
Expand Down
16 changes: 8 additions & 8 deletions dbms/src/Flash/Mpp/MPPTunnelSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ template <typename Tunnel>
void MPPTunnelSetBase<Tunnel>::broadcastOrPassThroughWrite(Blocks & blocks)
{
RUNTIME_CHECK(!tunnels.empty());
auto tracked_packet = MPPTunnelSetHelper::ToPacket(blocks, result_field_types, MPPDataPacketV0);
auto && tracked_packet = MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types);
if (!tracked_packet)
return;

auto packet_bytes = tracked_packet->getPacket().ByteSizeLong();
checkPacketSize(packet_bytes);
// TODO avoid copy packet for broadcast.
Expand Down Expand Up @@ -142,11 +145,9 @@ void MPPTunnelSetBase<Tunnel>::broadcastOrPassThroughWrite(Blocks & blocks)
template <typename Tunnel>
void MPPTunnelSetBase<Tunnel>::partitionWrite(Blocks & blocks, int16_t partition_id)
{
auto tracked_packet = MPPTunnelSetHelper::ToPacket(blocks, result_field_types, MPPDataPacketV0);

if unlikely (tracked_packet->getPacket().chunks_size() <= 0)
auto && tracked_packet = MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types);
if (!tracked_packet)
return;

auto packet_bytes = tracked_packet->getPacket().ByteSizeLong();
checkPacketSize(packet_bytes);
tunnels[partition_id]->write(std::move(tracked_packet));
Expand Down Expand Up @@ -223,14 +224,13 @@ void MPPTunnelSetBase<Tunnel>::fineGrainedShuffleWrite(
size_t num_columns,
int16_t partition_id)
{
auto tracked_packet = MPPTunnelSetHelper::ToFineGrainedPacket(
auto tracked_packet = MPPTunnelSetHelper::ToFineGrainedPacketV0(
header,
scattered,
bucket_idx,
fine_grained_shuffle_stream_count,
num_columns,
result_field_types,
MPPDataPacketV0);
result_field_types);

if unlikely (tracked_packet->getPacket().chunks_size() <= 0)
return;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTunnelSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ class MPPTunnelSetBase : private boost::noncopyable

const std::vector<TunnelPtr> & getTunnels() const { return tunnels; }

private:
bool isLocal(size_t index) const;


private:
std::vector<TunnelPtr> tunnels;
std::unordered_map<MPPTaskId, size_t> receiver_task_id_to_index_map;
Expand Down
16 changes: 7 additions & 9 deletions dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ TrackedMppDataPacketPtr ToPacket(
return tracked_packet;
}

TrackedMppDataPacketPtr ToPacket(Blocks & blocks, const std::vector<tipb::FieldType> & field_types, MPPDataPacketVersion version)
TrackedMppDataPacketPtr ToPacketV0(Blocks & blocks, const std::vector<tipb::FieldType> & field_types)
{
assert(version == MPPDataPacketV0);
if (blocks.empty())
return nullptr;

CHBlockChunkCodec codec;
auto codec_stream = codec.newCodecStream(field_types);
auto tracked_packet = std::make_shared<TrackedMppDataPacket>(version);
auto tracked_packet = std::make_shared<TrackedMppDataPacket>(MPPDataPacketV0);
while (!blocks.empty())
{
const auto & block = blocks.back();
Expand Down Expand Up @@ -103,20 +104,17 @@ TrackedMppDataPacketPtr ToFineGrainedPacket(
return tracked_packet;
}

TrackedMppDataPacketPtr ToFineGrainedPacket(
TrackedMppDataPacketPtr ToFineGrainedPacketV0(
const Block & header,
std::vector<IColumn::ScatterColumns> & scattered,
size_t bucket_idx,
UInt64 fine_grained_shuffle_stream_count,
size_t num_columns,
const std::vector<tipb::FieldType> & field_types,
MPPDataPacketVersion version)
const std::vector<tipb::FieldType> & field_types)
{
assert(version == MPPDataPacketV0);

CHBlockChunkCodec codec;
auto codec_stream = codec.newCodecStream(field_types);
auto tracked_packet = std::make_shared<TrackedMppDataPacket>(version);
auto tracked_packet = std::make_shared<TrackedMppDataPacket>(MPPDataPacketV0);
for (uint64_t stream_idx = 0; stream_idx < fine_grained_shuffle_stream_count; ++stream_idx)
{
// assemble scatter columns into a block
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Flash/Mpp/MPPTunnelSetHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum class CompressionMethod;

namespace DB::MPPTunnelSetHelper
{
TrackedMppDataPacketPtr ToPacket(Blocks & blocks, const std::vector<tipb::FieldType> & field_types, MPPDataPacketVersion version);
TrackedMppDataPacketPtr ToPacketV0(Blocks & blocks, const std::vector<tipb::FieldType> & field_types);

TrackedMppDataPacketPtr ToPacket(
const Block & header,
Expand All @@ -34,14 +34,13 @@ TrackedMppDataPacketPtr ToPacket(
CompressionMethod compression_method,
size_t & original_size);

TrackedMppDataPacketPtr ToFineGrainedPacket(
TrackedMppDataPacketPtr ToFineGrainedPacketV0(
const Block & header,
std::vector<IColumn::ScatterColumns> & scattered,
size_t bucket_idx,
UInt64 fine_grained_shuffle_stream_count,
size_t num_columns,
const std::vector<tipb::FieldType> & field_types,
MPPDataPacketVersion version);
const std::vector<tipb::FieldType> & field_types);

TrackedMppDataPacketPtr ToFineGrainedPacket(
const Block & header,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/newMPPExchangeWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
namespace DB
{
template <class ExchangeWriterPtr>
std::unique_ptr<DAGResponseWriter> NewMPPExchangeWriter(
std::unique_ptr<DAGResponseWriter> newMPPExchangeWriter(
const ExchangeWriterPtr & writer,
const std::vector<Int64> & partition_col_ids,
const TiDB::TiDBCollators & partition_col_collators,
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ struct MockExchangeWriter
}
void broadcastOrPassThroughWrite(Blocks & blocks)
{
checker(MPPTunnelSetHelper::ToPacket(blocks, result_field_types, MPPDataPacketV0), 0);
checker(MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types), 0);
}
void partitionWrite(Blocks & blocks, uint16_t part_id)
{
checker(MPPTunnelSetHelper::ToPacket(blocks, result_field_types, MPPDataPacketV0), part_id);
checker(MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types), part_id);
}
void fineGrainedShuffleWrite(
const Block & header,
Expand All @@ -182,14 +182,13 @@ struct MockExchangeWriter
size_t num_columns,
int16_t part_id)
{
auto tracked_packet = MPPTunnelSetHelper::ToFineGrainedPacket(
auto tracked_packet = MPPTunnelSetHelper::ToFineGrainedPacketV0(
header,
scattered,
bucket_idx,
fine_grained_shuffle_stream_count,
num_columns,
result_field_types,
MPPDataPacketV0);
result_field_types);
checker(tracked_packet, part_id);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void PhysicalExchangeSender::transformImpl(DAGPipeline & pipeline, Context & con
}
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = NewMPPExchangeWriter(
std::unique_ptr<DAGResponseWriter> response_writer = newMPPExchangeWriter(
dag_context.tunnel_set,
partition_col_ids,
partition_col_collators,
Expand Down

0 comments on commit 68921ee

Please sign in to comment.