Skip to content

Commit

Permalink
Merge pull request ClickHouse#49660 from CurtizJ/fix-sparse-columns-r…
Browse files Browse the repository at this point in the history
…eload

Fix reading from sparse columns after restart
  • Loading branch information
alexey-milovidov authored and Enmk committed Oct 4, 2023
1 parent fcba523 commit 0eff70f
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 12 deletions.
23 changes: 19 additions & 4 deletions src/DataTypes/Serializations/SerializationInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ void SerializationInfoByName::writeJSON(WriteBuffer & out) const
return writeString(oss.str(), out);
}

void SerializationInfoByName::readJSON(ReadBuffer & in)
SerializationInfoByName SerializationInfoByName::readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in)
{
String json_str;
readString(json_str, in);
Expand All @@ -262,22 +263,36 @@ void SerializationInfoByName::readJSON(ReadBuffer & in)
"Unknown version of serialization infos ({}). Should be less or equal than {}",
object->getValue<size_t>(KEY_VERSION), SERIALIZATION_INFO_VERSION);

SerializationInfoByName infos;
if (object->has(KEY_COLUMNS))
{
std::unordered_map<std::string_view, const IDataType *> column_type_by_name;
for (const auto & [name, type] : columns)
column_type_by_name.emplace(name, type.get());

auto array = object->getArray(KEY_COLUMNS);
for (const auto & elem : *array)
{
auto elem_object = elem.extract<Poco::JSON::Object::Ptr>();

if (!elem_object->has(KEY_NAME))
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Missed field '{}' in SerializationInfo of columns", KEY_NAME);
"Missed field '{}' in serialization infos", KEY_NAME);

auto name = elem_object->getValue<String>(KEY_NAME);
if (auto it = find(name); it != end())
it->second->fromJSON(*elem_object);
auto it = column_type_by_name.find(name);

if (it == column_type_by_name.end())
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Found unexpected column '{}' in serialization infos", name);

auto info = it->second->createSerializationInfo(settings);
info->fromJSON(*elem_object);
infos.emplace(name, std::move(info));
}
}

return infos;
}

}
8 changes: 6 additions & 2 deletions src/DataTypes/Serializations/SerializationInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ using MutableSerializationInfos = std::vector<MutableSerializationInfoPtr>;
class SerializationInfoByName : public std::map<String, MutableSerializationInfoPtr>
{
public:
using Settings = SerializationInfo::Settings;

SerializationInfoByName() = default;
SerializationInfoByName(const NamesAndTypesList & columns, const SerializationInfo::Settings & settings);
SerializationInfoByName(const NamesAndTypesList & columns, const Settings & settings);

void add(const Block & block);
void add(const SerializationInfoByName & other);
Expand All @@ -108,7 +110,9 @@ class SerializationInfoByName : public std::map<String, MutableSerializationInfo
void replaceData(const SerializationInfoByName & other);

void writeJSON(WriteBuffer & out) const;
void readJSON(ReadBuffer & in);

static SerializationInfoByName readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in);
};

}
7 changes: 3 additions & 4 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1333,12 +1333,11 @@ void IMergeTreeDataPart::loadColumns(bool require)
.choose_kind = false,
};

SerializationInfoByName infos(loaded_columns, settings);
exists = metadata_manager->exists(SERIALIZATION_FILE_NAME);
if (exists)
SerializationInfoByName infos;
if (metadata_manager->exists(SERIALIZATION_FILE_NAME))
{
auto in = metadata_manager->read(SERIALIZATION_FILE_NAME);
infos.readJSON(*in);
infos = SerializationInfoByName::readJSON(loaded_columns, settings, *in);
}

setColumns(loaded_columns, infos);
Expand Down
8 changes: 8 additions & 0 deletions src/Storages/MergeTree/MergeTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (!ctx->need_remove_expired_values)
{
size_t expired_columns = 0;
auto part_serialization_infos = global_ctx->new_data_part->getSerializationInfos();

for (auto & [column_name, ttl] : global_ctx->new_data_part->ttl_infos.columns_ttl)
{
Expand All @@ -335,6 +336,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
LOG_TRACE(ctx->log, "Adding expired column {} for part {}", column_name, global_ctx->new_data_part->name);
std::erase(global_ctx->gathering_column_names, column_name);
std::erase(global_ctx->merging_column_names, column_name);
std::erase(global_ctx->all_column_names, column_name);
part_serialization_infos.erase(column_name);
++expired_columns;
}
}
Expand All @@ -343,6 +346,11 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
{
global_ctx->gathering_columns = global_ctx->gathering_columns.filter(global_ctx->gathering_column_names);
global_ctx->merging_columns = global_ctx->merging_columns.filter(global_ctx->merging_column_names);
global_ctx->storage_columns = global_ctx->storage_columns.filter(global_ctx->all_column_names);

global_ctx->new_data_part->setColumns(
global_ctx->storage_columns,
part_serialization_infos);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/Storages/MergeTree/checkDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ IMergeTreeDataPart::Checksums checkDataPart(
};

auto ratio_of_defaults = data_part->storage.getSettings()->ratio_of_defaults_for_sparse_serialization;
SerializationInfoByName serialization_infos(columns_txt, SerializationInfo::Settings{ratio_of_defaults, false});
SerializationInfoByName serialization_infos;

if (data_part_storage.exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME))
{
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, {}, std::nullopt, std::nullopt);
serialization_infos.readJSON(*serialization_file);
SerializationInfo::Settings settings{ratio_of_defaults, false};
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
}

auto get_serialization = [&serialization_infos](const auto & column)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
100000
100000
18 changes: 18 additions & 0 deletions tests/queries/0_stateless/02733_sparse_columns_reload.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
DROP TABLE IF EXISTS t_sparse_reload;

CREATE TABLE t_sparse_reload (id UInt64, v UInt64)
ENGINE = MergeTree ORDER BY id
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.95;

INSERT INTO t_sparse_reload SELECT number, 0 FROM numbers(100000);

SELECT count() FROM t_sparse_reload WHERE NOT ignore(*);

ALTER TABLE t_sparse_reload MODIFY SETTING ratio_of_defaults_for_sparse_serialization = 1.0;

DETACH TABLE t_sparse_reload;
ATTACH TABLE t_sparse_reload;

SELECT count() FROM t_sparse_reload WHERE NOT ignore(*);

DROP TABLE t_sparse_reload;

0 comments on commit 0eff70f

Please sign in to comment.