Skip to content

Commit

Permalink
Merge branch 'virtual-column-prewhere' of https://github.com/amosbird…
Browse files Browse the repository at this point in the history
…/ClickHouse into refactor-virtual-columns
  • Loading branch information
CurtizJ committed Feb 15, 2024
2 parents 39dbb33 + c0eeeb2 commit ef6b827
Show file tree
Hide file tree
Showing 43 changed files with 352 additions and 294 deletions.
5 changes: 5 additions & 0 deletions src/Processors/QueryPlan/ReadFromMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);

Expand Down Expand Up @@ -462,6 +463,7 @@ Pipe ReadFromMergeTree::readFromPool(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);
}
Expand All @@ -475,6 +477,7 @@ Pipe ReadFromMergeTree::readFromPool(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);
}
Expand Down Expand Up @@ -551,6 +554,7 @@ Pipe ReadFromMergeTree::readInOrder(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);
}
Expand All @@ -566,6 +570,7 @@ Pipe ReadFromMergeTree::readInOrder(
reader_settings,
required_columns,
virt_column_names,
data.getPartitionValueType(),
pool_settings,
context);
}
Expand Down
7 changes: 6 additions & 1 deletion src/Storages/ColumnsDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ NamesAndTypesList ColumnsDescription::get(const GetColumnsOptions & options) con
NamesAndTypesList res;
switch (options.kind)
{
case GetColumnsOptions::None:
{
break;
}
case GetColumnsOptions::All:
{
res = getAll();
Expand Down Expand Up @@ -572,7 +576,8 @@ static GetColumnsOptions::Kind defaultKindToGetKind(ColumnDefaultKind kind)
case ColumnDefaultKind::Ephemeral:
return GetColumnsOptions::Ephemeral;
}
UNREACHABLE();

return GetColumnsOptions::None;
}

NamesAndTypesList ColumnsDescription::getByNames(const GetColumnsOptions & options, const Names & names) const
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ColumnsDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct GetColumnsOptions
{
enum Kind : UInt8
{
None = 0,
Ordinary = 1,
Materialized = 2,
Aliases = 4,
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class MarkCache;
class UncompressedCache;
class MergeTreeTransaction;

struct MergeTreeReadTaskInfo;
using MergeTreeReadTaskInfoPtr = std::shared_ptr<const MergeTreeReadTaskInfo>;

enum class DataPartRemovalState
{
Expand Down Expand Up @@ -93,6 +95,7 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
const NamesAndTypesList & columns_,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,
Expand Down
78 changes: 78 additions & 0 deletions src/Storages/MergeTree/IMergeTreeReader.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeReadTask.h>
#include <Storages/BlockNumberColumn.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNested.h>
#include <DataTypes/DataTypeUUID.h>
#include <Common/escapeForFileName.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Columns/ColumnArray.h>
Expand All @@ -25,6 +28,7 @@ namespace ErrorCodes
IMergeTreeReader::IMergeTreeReader(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
Expand All @@ -47,14 +51,21 @@ IMergeTreeReader::IMergeTreeReader(
, part_columns(data_part_info_for_read->isWidePart()
? data_part_info_for_read->getColumnsDescriptionWithCollectedNested()
: data_part_info_for_read->getColumnsDescription())
, read_task_info(read_task_info_)
{
columns_to_read.reserve(requested_columns.size());
serializations.reserve(requested_columns.size());

size_t pos = 0;
for (const auto & column : requested_columns)
{
columns_to_read.emplace_back(getColumnInPart(column));
serializations.emplace_back(getSerializationInPart(column));

if (read_task_info && read_task_info->virt_column_names.contains(column.name))
virt_column_pos_to_name.emplace(pos, column.name);

++pos;
}
}

Expand All @@ -63,6 +74,73 @@ const IMergeTreeReader::ValueSizeMap & IMergeTreeReader::getAvgValueSizeHints()
return avg_value_size_hints;
}

void IMergeTreeReader::fillVirtualColumns(Columns & columns, size_t rows) const
{
if (std::all_of(
virt_column_pos_to_name.begin(),
virt_column_pos_to_name.end(),
[&columns](auto & elem)
{
chassert(elem.first < columns.size());
return columns[elem.first] != nullptr;
}))
return;

chassert(read_task_info != nullptr);

const IMergeTreeDataPart * part = read_task_info->data_part.get();
if (part->isProjectionPart())
part = part->getParentPart();

for (auto [pos, name] : virt_column_pos_to_name)
{
auto & column = columns[pos];

if (column != nullptr)
continue;

if (name == "_part_offset")
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column {} must have been filled by part reader", name);
}
else if (name == LightweightDeleteDescription::FILTER_COLUMN.name)
{
/// If _row_exists column isn't present in the part then fill it here with 1s
column = LightweightDeleteDescription::FILTER_COLUMN.type->createColumnConst(rows, 1)->convertToFullColumnIfConst();
}
else if (name == BlockNumberColumn::name)
{
column = BlockNumberColumn::type->createColumnConst(rows, part->info.min_block)->convertToFullColumnIfConst();
}
else if (name == "_part")
{
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(rows, part->name)
->convertToFullColumnIfConst();
}
else if (name == "_part_index")
{
column = DataTypeUInt64().createColumnConst(rows, read_task_info->part_index_in_query)->convertToFullColumnIfConst();
}
else if (name == "_part_uuid")
{
column = DataTypeUUID().createColumnConst(rows, part->uuid)->convertToFullColumnIfConst();
}
else if (name == "_partition_id")
{
column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}
.createColumnConst(rows, part->info.partition_id)
->convertToFullColumnIfConst();
}
else if (name == "_partition_value")
{
column = read_task_info->partition_value_type
->createColumnConst(rows, Tuple(part->partition.value.begin(), part->partition.value.end()))
->convertToFullColumnIfConst();
}
}
}

void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows, size_t block_number) const
{
try
Expand Down
10 changes: 10 additions & 0 deletions src/Storages/MergeTree/IMergeTreeReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class IMergeTreeReader : private boost::noncopyable
IMergeTreeReader(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const MergeTreeReadTaskInfoPtr & read_task_info_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
Expand All @@ -42,6 +43,9 @@ class IMergeTreeReader : private boost::noncopyable

const ValueSizeMap & getAvgValueSizeHints() const;

/// Add virtual columns that are not present in the block.
void fillVirtualColumns(Columns & columns, size_t rows) const;

/// Add columns from ordered_names that are not present in the block.
/// Missing columns are added in the order specified by ordered_names.
/// num_rows is needed in case if all res_columns are nullptr.
Expand Down Expand Up @@ -113,6 +117,12 @@ class IMergeTreeReader : private boost::noncopyable

/// Actual columns description in part.
const ColumnsDescription & part_columns;

/// Shared information required for reading.
MergeTreeReadTaskInfoPtr read_task_info;

/// Map of positions in requested_columns which are virtual columns to their names.
std::map<size_t, String> virt_column_pos_to_name;
};

}
31 changes: 21 additions & 10 deletions src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,18 @@ NameSet injectRequiredColumns(
if (with_subcolumns)
options.withSubcolumns();

auto virtuals_options = GetColumnsOptions(GetColumnsOptions::None).withVirtuals();

for (size_t i = 0; i < columns.size(); ++i)
{
/// We are going to fetch only physical columns and system columns
/// We are going to fetch physical columns and system columns first
if (!storage_snapshot->tryGetColumn(options, columns[i]))
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "There is no physical column or subcolumn {} in table", columns[i]);
{
if (storage_snapshot->tryGetColumn(virtuals_options, columns[i]))
continue;
else
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "There is no column or subcolumn {} in table", columns[i]);
}

have_at_least_one_physical_column |= injectRequiredColumnsRecursively(
columns[i], storage_snapshot, alter_conversions,
Expand Down Expand Up @@ -258,35 +265,32 @@ void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Colum
}


MergeTreeReadTask::Columns getReadTaskColumns(
MergeTreeReadTaskColumns getReadTaskColumns(
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const Names & required_columns,
const Names & system_columns,
const PrewhereInfoPtr & prewhere_info,
const ExpressionActionsSettings & actions_settings,
const MergeTreeReaderSettings & reader_settings,
bool with_subcolumns)
{
Names column_to_read_after_prewhere = required_columns;

/// Read system columns such as lightweight delete mask "_row_exists" if it is persisted in the part
for (const auto & name : system_columns)
if (data_part_info_for_reader.getColumns().contains(name))
column_to_read_after_prewhere.push_back(name);

/// Inject columns required for defaults evaluation
injectRequiredColumns(
data_part_info_for_reader, storage_snapshot, with_subcolumns, column_to_read_after_prewhere);

MergeTreeReadTask::Columns result;
MergeTreeReadTaskColumns result;
auto options = GetColumnsOptions(GetColumnsOptions::All)
.withExtendedObjects()
.withSystemColumns();

if (with_subcolumns)
options.withSubcolumns();

options.withVirtuals();

bool has_part_offset = std::find(required_columns.begin(), required_columns.end(), "_part_offset") != required_columns.end();
NameSet columns_from_previous_steps;
auto add_step = [&](const PrewhereExprStep & step)
{
Expand All @@ -302,6 +306,13 @@ MergeTreeReadTask::Columns getReadTaskColumns(
if (!columns_from_previous_steps.contains(name))
step_column_names.push_back(name);

/// Make sure _part_offset is read in STEP 0
if (columns_from_previous_steps.empty() && has_part_offset)
{
if (std::find(step_column_names.begin(), step_column_names.end(), "_part_offset") == step_column_names.end())
step_column_names.push_back("_part_offset");
}

if (!step_column_names.empty())
injectRequiredColumns(
data_part_info_for_reader, storage_snapshot,
Expand Down
4 changes: 1 addition & 3 deletions src/Storages/MergeTree/MergeTreeBlockReadUtils.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include <optional>
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/MergeTreeReadTask.h>

Expand All @@ -22,11 +21,10 @@ NameSet injectRequiredColumns(
bool with_subcolumns,
Names & columns);

MergeTreeReadTask::Columns getReadTaskColumns(
MergeTreeReadTaskColumns getReadTaskColumns(
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const Names & required_columns,
const Names & system_columns,
const PrewhereInfoPtr & prewhere_info,
const ExpressionActionsSettings & actions_settings,
const MergeTreeReaderSettings & reader_settings,
Expand Down
18 changes: 14 additions & 4 deletions src/Storages/MergeTree/MergeTreeDataPartCompact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const NamesAndTypesList & columns_to_read,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,
Expand All @@ -41,12 +42,21 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
{
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), alter_conversions);
auto * load_marks_threadpool = reader_settings.read_settings.load_marks_asynchronously ? &read_info->getContext()->getLoadMarksThreadpool() : nullptr;
auto * load_marks_threadpool
= reader_settings.read_settings.load_marks_asynchronously ? &read_info->getContext()->getLoadMarksThreadpool() : nullptr;

return std::make_unique<MergeTreeReaderCompact>(
read_info, columns_to_read, storage_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings, load_marks_threadpool,
avg_value_size_hints, profile_callback);
read_info,
columns_to_read,
read_task_info_,
storage_snapshot,
uncompressed_cache,
mark_cache,
mark_ranges,
reader_settings,
load_marks_threadpool,
avg_value_size_hints,
profile_callback);
}

IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeDataPartCompact.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class MergeTreeDataPartCompact : public IMergeTreeDataPart
const NamesAndTypesList & columns,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,
Expand Down
9 changes: 8 additions & 1 deletion src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
const NamesAndTypesList & columns_to_read,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const MergeTreeReadTaskInfoPtr & read_task_info_,
UncompressedCache * /* uncompressed_cache */,
MarkCache * /* mark_cache */,
const AlterConversionsPtr & alter_conversions,
Expand All @@ -44,7 +45,13 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
auto ptr = std::static_pointer_cast<const MergeTreeDataPartInMemory>(shared_from_this());

return std::make_unique<MergeTreeReaderInMemory>(
read_info, ptr, columns_to_read, storage_snapshot, mark_ranges, reader_settings);
read_info,
ptr,
columns_to_read,
read_task_info_,
storage_snapshot,
mark_ranges,
reader_settings);
}

IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(
Expand Down
Loading

0 comments on commit ef6b827

Please sign in to comment.