Skip to content

Commit

Permalink
[FLASH-524] Optimize tmt read for single stream (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
solotzg authored Sep 26, 2019
1 parent 0a46dfa commit dfec883
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 2 deletions.
129 changes: 129 additions & 0 deletions dbms/src/DataStreams/TMTSingleSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/TMTSingleSortedBlockInputStream.h>
#include <DataStreams/dedupUtils.h>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

static constexpr size_t SIMD_BYTES = 16;
static constexpr Int32 STEP = SIMD_BYTES;
#if __SSE2__
static const __m128i zero16 = _mm_setzero_si128();
#endif

// pos in columns is made const in MergeTreeDataSelectExecutor::read.
static const size_t pk_column_index = 0;
static const size_t delmark_column_index = 2;

/// tol_size > 0
static bool isPKDiffEachInSortedColumn(const UInt64 * data_start, size_t tol_size)
{
tol_size -= 1;
const UInt64 * data_pos = data_start;
const UInt64 * data_end = data_start + tol_size;
const UInt64 * data_end_sse = data_start + tol_size / STEP * STEP;

#if __SSE2__

alignas(STEP) std::array<UInt8, STEP> step_data{};

for (; data_pos != data_end_sse; data_pos += STEP)
{
for (size_t i = 0; i < STEP; ++i)
step_data[i] = data_pos[i] == data_pos[i + 1];
int mask = _mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(step_data.data())), zero16));
if (mask)
return false;
}

#endif

for (; data_pos != data_end; ++data_pos)
{
if (data_pos[0] == data_pos[1])
return false;
}

return true;
}

inline UInt64 getPkInBlockAt(const Block & block, size_t pos)
{
return static_cast<const ColumnUInt64 *>(block.getByPosition(pk_column_index).column.get())->getData()[pos];
}

void TMTSingleSortedBlockInputStream::updateNextBlock()
{
if (!next_block)
finish = true;
cur_block = std::move(next_block);
}

Block TMTSingleSortedBlockInputStream::readImpl()
{
while (true)
{
if (finish)
return Block();

{
Block tmp_block = input->read();
if (first)
{
if (!tmp_block)
return tmp_block;
first = false;
cur_block = std::move(tmp_block);
continue;
}
next_block = std::move(tmp_block);
}

// cur_block is never empty.
const size_t rows = cur_block.rows();

// if not exist, just make it diff from the last element.
UInt64 next_block_first_pk = next_block ? getPkInBlockAt(next_block, 0) : getPkInBlockAt(cur_block, rows - 1) + 1;

const UInt64 * pk_column
= static_cast<const ColumnUInt64 *>(cur_block.getByPosition(pk_column_index).column.get())->getData().data();
const UInt8 * delmark_column
= static_cast<const ColumnUInt8 *>(cur_block.getByPosition(delmark_column_index).column.get())->getData().data();

if (isPKDiffEachInSortedColumn(pk_column, rows) && pk_column[rows - 1] != next_block_first_pk && memoryIsZero(delmark_column, rows))
{
auto tmp_block = std::move(cur_block);
updateNextBlock();
return tmp_block;
}

IColumn::Filter filter(rows, 0);
for (size_t pos = 1; pos < rows; ++pos)
{
if (pk_column[pos] != pk_column[pos - 1])
filter[pos - 1] = delmark_column[pos - 1] ^ (UInt8)1;
}
if (next_block_first_pk != pk_column[rows - 1])
filter[rows - 1] = delmark_column[rows - 1] ^ (UInt8)1;

if (memoryIsZero(filter.data(), rows))
{
updateNextBlock();
continue;
}
else
{
Block res = filterBlock(cur_block, filter);
updateNextBlock();
return res;
}
}
}

} // namespace DB
41 changes: 41 additions & 0 deletions dbms/src/DataStreams/TMTSingleSortedBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <common/logger_useful.h>

namespace DB
{

/// make it only can be used when pk is int64 or uint64
class TMTSingleSortedBlockInputStream : public IProfilingBlockInputStream
{
public:
TMTSingleSortedBlockInputStream(const BlockInputStreamPtr & input_) : input(input_) {}

protected:
Block getHeader() const override { return input->getHeader(); }

bool isGroupedOutput() const override { return input->isGroupedOutput(); }

bool isSortedOutput() const override { return input->isSortedOutput(); }

const SortDescription & getSortDescription() const override { return input->getSortDescription(); }

String getName() const override { return "TMTSingleSortedBlockInputStream"; }

Block readImpl() override;

private:
void updateNextBlock();

private:
BlockInputStreamPtr input;

bool first = true;
bool finish = false;
Block cur_block;
Block next_block;
Logger * log = &Logger::get("TMTSingleSortedBlockInputStream");
};

} // namespace DB
3 changes: 1 addition & 2 deletions dbms/src/DataStreams/VersionFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ Block VersionFilterBlockInputStream::readImpl()
if (filter_start == data_start && memoryIsZero(col_filter.data(), rows))
continue;

deleteRows(block, col_filter);
return block;
return filterBlock(block, col_filter);
}
}

Expand Down
12 changes: 12 additions & 0 deletions dbms/src/DataStreams/dedupUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ inline void deleteRows(Block & block, const IColumn::Filter & filter)
}
}

inline Block filterBlock(const Block & block, const IColumn::Filter & filter)
{
Block res;
for (size_t i = 0, num_columns = block.columns(); i < num_columns; ++i)
{
ColumnWithTypeAndName column = block.getByPosition(i);
column.column = column.column->filter(filter, 0);
res.insert(std::move(column));
}
return res;
}


inline size_t setFilterByDelMarkColumn(const Block & block, IColumn::Filter & filter)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct numeric_limits<__uint128_t>
#include <DataStreams/ReplacingDeletingSortedBlockInputStream.h>
#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <DataStreams/SummingSortedBlockInputStream.h>
#include <DataStreams/TMTSingleSortedBlockInputStream.h>
#include <DataStreams/TMTSortedBlockInputStream.h>
#include <DataStreams/TMTUnionBlockInputStream.h>
#include <DataStreams/VersionFilterBlockInputStream.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ template <TMTPKType pk_type>
BlockInputStreamPtr makeMultiWayMergeSortInput(const BlockInputStreams & inputs, const SortDescription & description,
const size_t version_column_index, const size_t delmark_column_index, size_t max_block_size)
{
if (pk_type != TMTPKType::UNSPECIFIED && inputs.size() == 1)
return std::make_shared<TMTSingleSortedBlockInputStream>(inputs[0]);
return std::make_shared<TMTSortedBlockInputStream<pk_type>>(
inputs, description, version_column_index, delmark_column_index, max_block_size);
};
Expand Down

0 comments on commit dfec883

Please sign in to comment.