Skip to content

Commit

Permalink
support selective block for MPPExchangeWriter (#9195)
Browse files Browse the repository at this point in the history
ref #9196

Signed-off-by: guo-shaoge <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
guo-shaoge and ti-chi-bot[bot] authored Jul 19, 2024
1 parent 951e010 commit 9de3fdc
Show file tree
Hide file tree
Showing 31 changed files with 1,606 additions and 297 deletions.
102 changes: 82 additions & 20 deletions dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,51 @@ void ColumnAggregateFunction::updateHashWithValues(

void ColumnAggregateFunction::updateWeakHash32(WeakHash32 & hash, const TiDB::TiDBCollatorPtr &, String &) const
{
auto s = data.size();
if (hash.getData().size() != data.size())
throw Exception(
fmt::format(
"Size of WeakHash32 does not match size of column: column size is {}, hash size is {}",
s,
hash.getData().size()),
ErrorCodes::LOGICAL_ERROR);
updateWeakHash32Impl<false>(hash, {});
}

auto & hash_data = hash.getData();
void ColumnAggregateFunction::updateWeakHash32(
WeakHash32 & hash,
const TiDB::TiDBCollatorPtr &,
String &,
const BlockSelective & selective) const
{
updateWeakHash32Impl<true>(hash, selective);
}

template <bool selective_block>
void ColumnAggregateFunction::updateWeakHash32Impl(WeakHash32 & hash, const BlockSelective & selective) const
{
size_t rows;
if constexpr (selective_block)
{
rows = selective.size();
}
else
{
rows = data.size();
}

RUNTIME_CHECK_MSG(
hash.getData().size() == rows,
"size of WeakHash32({}) doesn't match size of column({})",
hash.getData().size(),
rows);

std::vector<UInt8> v;
for (size_t i = 0; i < s; ++i)
UInt32 * hash_data = hash.getData().data();

for (size_t i = 0; i < rows; ++i)
{
size_t row = i;
if constexpr (selective_block)
row = selective[i];

WriteBufferFromVector<std::vector<UInt8>> wbuf(v);
func->serialize(data[i], wbuf);
func->serialize(data[row], wbuf);
wbuf.finalize();
hash_data[i] = ::updateWeakHash32(v.data(), v.size(), hash_data[i]);
*hash_data = ::updateWeakHash32(v.data(), v.size(), *hash_data);
++hash_data;
}
}

Expand Down Expand Up @@ -398,30 +425,65 @@ ColumnPtr ColumnAggregateFunction::replicateRange(size_t start_row, size_t end_r
MutableColumns ColumnAggregateFunction::scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector & selector)
const
{
return scatterImpl<false>(num_columns, selector, {});
}

MutableColumns ColumnAggregateFunction::scatter(
IColumn::ColumnIndex num_columns,
const IColumn::Selector & selector,
const BlockSelective & selective) const
{
return scatterImpl<true>(num_columns, selector, selective);
}

template <bool selective_block>
MutableColumns ColumnAggregateFunction::scatterImpl(
IColumn::ColumnIndex num_columns,
const IColumn::Selector & selector,
const BlockSelective & selective) const
{
size_t rows = size();
if constexpr (selective_block)
{
rows = selective.size();
}

RUNTIME_CHECK_MSG(
selector.size() == rows,
"size of selector({}) doesn't match size of column({})",
selector.size(),
rows);

/// Columns with scattered values will point to this column as the owner of values.
MutableColumns columns(num_columns);
for (auto & column : columns)
column = createView();

size_t num_rows = size();

{
size_t reserve_size = 1.1 * num_rows / num_columns; /// 1.1 is just a guess. Better to use n-sigma rule.
size_t reserve_size = 1.1 * rows / num_columns; /// 1.1 is just a guess. Better to use n-sigma rule.

if (reserve_size > 1)
for (auto & column : columns)
column->reserve(reserve_size);
}

for (size_t i = 0; i < num_rows; ++i)
static_cast<ColumnAggregateFunction &>(*columns[selector[i]]).data.push_back(data[i]);
for (size_t i = 0; i < rows; ++i)
{
size_t row = i;
if constexpr (selective_block)
row = selective[i];

static_cast<ColumnAggregateFunction &>(*columns[selector[i]]).data.push_back(data[row]);
}
return columns;
}

void ColumnAggregateFunction::scatterTo(
ScatterColumns & columns [[maybe_unused]],
const Selector & selector [[maybe_unused]]) const
void ColumnAggregateFunction::scatterTo(ScatterColumns &, const Selector &) const
{
throw TiFlashException("ColumnAggregateFunction does not support scatterTo", Errors::Coprocessor::Unimplemented);
}

void ColumnAggregateFunction::scatterTo(ScatterColumns &, const Selector &, const BlockSelective &) const
{
throw TiFlashException("ColumnAggregateFunction does not support scatterTo", Errors::Coprocessor::Unimplemented);
}
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Columns/ColumnAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega
const override;

void updateWeakHash32(WeakHash32 & hash, const TiDB::TiDBCollatorPtr &, String &) const override;
void updateWeakHash32(WeakHash32 & hash, const TiDB::TiDBCollatorPtr &, String &, const BlockSelective & selective)
const override;

size_t byteSize() const override;

Expand All @@ -189,8 +191,12 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega
ColumnPtr replicateRange(size_t start_row, size_t end_row, const IColumn::Offsets & offsets) const override;

MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector, const BlockSelective & selective)
const override;

void scatterTo(ScatterColumns & columns, const Selector & selector) const override;
void scatterTo(ScatterColumns & columns, const Selector & selector, const BlockSelective & selective)
const override;

void gather(ColumnGathererStream & gatherer_stream) override;

Expand All @@ -204,6 +210,15 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega
const Container & getData() const { return data; }

void getExtremes(Field & min, Field & max) const override;

template <bool selective_block>
void updateWeakHash32Impl(WeakHash32 & hash, const BlockSelective & selective) const;

template <bool selective_block>
MutableColumns scatterImpl(
IColumn::ColumnIndex num_columns,
const IColumn::Selector & selector,
const BlockSelective & selective) const;
};


Expand Down
65 changes: 51 additions & 14 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,34 +253,71 @@ void ColumnArray::updateWeakHash32(
const TiDB::TiDBCollatorPtr & collator,
String & sort_key_container) const
{
auto s = offsets->size();
if (hash.getData().size() != s)
throw Exception(
"Size of WeakHash32 does not match size of column: column size is " + std::to_string(s) + ", hash size is "
+ std::to_string(hash.getData().size()),
ErrorCodes::LOGICAL_ERROR);
updateWeakHash32Impl<false>(hash, collator, sort_key_container, {});
}

void ColumnArray::updateWeakHash32(
WeakHash32 & hash,
const TiDB::TiDBCollatorPtr & collator,
String & sort_key_container,
const BlockSelective & selective) const
{
updateWeakHash32Impl<true>(hash, collator, sort_key_container, selective);
}

template <bool selective_block>
void ColumnArray::updateWeakHash32Impl(
WeakHash32 & hash,
const TiDB::TiDBCollatorPtr & collator,
String & sort_key_container,
const BlockSelective & selective) const
{
size_t rows;
if constexpr (selective_block)
{
rows = selective.size();
}
else
{
rows = offsets->size();
}

RUNTIME_CHECK_MSG(
rows == hash.getData().size(),
"size of WeakHash32({}) doesn't match size of column({})",
hash.getData().size(),
rows);

WeakHash32 internal_hash(data->size());
data->updateWeakHash32(internal_hash, collator, sort_key_container);

Offset prev_offset = 0;
const auto & offsets_data = getOffsets();
auto & hash_data = hash.getData();
auto & internal_hash_data = internal_hash.getData();
UInt32 * hash_data = hash.getData().data();
const auto & internal_hash_data = internal_hash.getData();

for (size_t i = 0; i < s; ++i)
for (size_t i = 0; i < rows; ++i)
{
/// This row improves hash a little bit according to integration tests.
/// It is the same as to use previous hash value as the first element of array.
hash_data[i] = intHashCRC32(hash_data[i]);
*hash_data = intHashCRC32(*hash_data);

size_t row = i;
if constexpr (selective_block)
row = selective[i];

Offset prev_offset = 0;
if likely (row > 0)
prev_offset = offsets_data[row - 1];

for (size_t row = prev_offset; row < offsets_data[i]; ++row)
for (size_t sub_row = prev_offset; sub_row < offsets_data[row]; ++sub_row)
{
/// It is probably not the best way to combine hashes.
/// But much better then xor which lead to similar hash for arrays like [1], [1, 1, 1], [1, 1, 1, 1, 1], ...
/// Much better implementation - to add offsets as an optional argument to updateWeakHash32.
hash_data[i] = intHashCRC32(internal_hash_data[row], hash_data[i]);
*hash_data = intHashCRC32(internal_hash_data[sub_row], *hash_data);
}

prev_offset = offsets_data[i];
++hash_data;
}
}

Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
void updateHashWithValues(IColumn::HashValues & hash_values, const TiDB::TiDBCollatorPtr &, String &)
const override;
void updateWeakHash32(WeakHash32 & hash, const TiDB::TiDBCollatorPtr &, String &) const override;
void updateWeakHash32(WeakHash32 & hash, const TiDB::TiDBCollatorPtr &, String &, const BlockSelective & selective)
const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void insert(const Field & x) override;
void insertFrom(const IColumn & src_, size_t n) override;
Expand Down Expand Up @@ -144,10 +146,19 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
{
return scatterImpl<ColumnArray>(num_columns, selector);
}
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector, const BlockSelective & selective)
const override
{
return scatterImpl<ColumnArray>(num_columns, selector, selective);
}
void scatterTo(ScatterColumns & columns, const Selector & selector) const override
{
scatterToImpl<ColumnArray>(columns, selector);
}
void scatterTo(ScatterColumns & columns, const Selector & selector, const BlockSelective & selective) const override
{
scatterToImpl<ColumnArray>(columns, selector, selective);
}
void gather(ColumnGathererStream & gatherer_stream) override;

void forEachSubcolumn(ColumnCallback callback) override
Expand Down Expand Up @@ -196,6 +207,13 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
ColumnPtr filterTuple(const Filter & filt, ssize_t result_size_hint) const;
ColumnPtr filterNullable(const Filter & filt, ssize_t result_size_hint) const;
ColumnPtr filterGeneric(const Filter & filt, ssize_t result_size_hint) const;

template <bool selective_block>
void updateWeakHash32Impl(
WeakHash32 & hash,
const TiDB::TiDBCollatorPtr & collator,
String & sort_key_container,
const BlockSelective & selective) const;
};


Expand Down
Loading

0 comments on commit 9de3fdc

Please sign in to comment.