Skip to content

Commit

Permalink
Storage: Support multiple vec indexes on the same column (pingcap#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored and Lloyd-Pottiger committed Sep 25, 2024
1 parent 1a781c7 commit 25a3c6b
Show file tree
Hide file tree
Showing 37 changed files with 1,129 additions and 267 deletions.
12 changes: 4 additions & 8 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2020,24 +2020,20 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info)
original_table_columns.swap(new_original_table_columns);
store_columns.swap(new_store_columns);

// copy the local_index_infos to check whether any new index is created
LocalIndexInfosPtr local_index_infos_copy = nullptr;
{
std::shared_lock index_read_lock(mtx_local_index_infos);
local_index_infos_copy = std::shared_ptr<LocalIndexInfos>(local_index_infos);
}
// Get a snapshot on the local_index_infos to check whether any new index is created
LocalIndexInfosSnapshot local_index_infos_snap = getLocalIndexInfosSnapshot();

std::atomic_store(&original_table_header, std::make_shared<Block>(toEmptyBlock(original_table_columns)));

// release the lock because `checkAllSegmentsLocalIndex` will try to acquire the lock
// and generate tasks on segments
lock.unlock();

auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_copy, table_info, log);
auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_snap, table_info, log);
if (new_local_index_infos)
{
{
// new index created, update the info in-memory
// new index created, update the info in-memory thread safety between `getLocalIndexInfosSnapshot`
std::unique_lock index_write_lock(mtx_local_index_infos);
local_index_infos.swap(new_local_index_infos);
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ struct StoreStats

struct LocalIndexStats
{
String column_name{};
UInt64 column_id{};
UInt64 index_id{};
String index_kind{};
Expand Down Expand Up @@ -864,13 +863,16 @@ class DeltaMergeStore
const SegmentPtr & segment,
const DMFiles & new_dm_files);

// Get a snap of local_index_infos to check whether any new index is created.
LocalIndexInfosPtr getLocalIndexInfosSnapshot() const
// Get a snap of local_index_infos for checking.
// Note that this is just a shallow copy of `local_index_infos`, do not
// modify the local indexes inside the snapshot.
LocalIndexInfosSnapshot getLocalIndexInfosSnapshot() const
{
std::shared_lock index_read_lock(mtx_local_index_infos);
if (!local_index_infos || local_index_infos->empty())
return nullptr;
return std::make_shared<LocalIndexInfos>(*local_index_infos);
// only make a shallow copy on the shared_ptr is OK
return local_index_infos;
}

/**
Expand Down
13 changes: 5 additions & 8 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,6 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment)
{
RUNTIME_CHECK(segment != nullptr);

// TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
if (!local_index_infos_snap)
return false;
Expand Down Expand Up @@ -569,7 +568,6 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co
{
RUNTIME_CHECK(segment != nullptr);

// TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
if (!local_index_infos_snap)
return true;
Expand Down Expand Up @@ -597,12 +595,11 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co
bool all_indexes_built = true;
for (const auto & index : *build_info.indexes_to_build)
{
auto col_id = index.column_id;
// The dmfile may be built before col_id is added. Skip build indexes for it
if (!dmfile->isColumnExist(col_id))
continue;

all_indexes_built = all_indexes_built && dmfile->getColumnStat(col_id).index_bytes > 0;
const auto [state, bytes] = dmfile->getLocalIndexState(index.column_id, index.index_id);
UNUSED(bytes);
all_indexes_built = all_indexes_built
// dmfile built before the column_id added or index already built
&& (state == DMFileMeta::LocalIndexState::NoNeed || state == DMFileMeta::LocalIndexState::IndexBuilt);
}
if (all_indexes_built)
return true;
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
LocalIndexStats index_stats;
index_stats.column_id = index_info.column_id;
index_stats.index_id = index_info.index_id;
index_stats.column_name = index_info.column_name;
index_stats.index_kind = "HNSW"; // TODO: Support more.

for (const auto & [handle, segment] : segments)
Expand All @@ -221,13 +220,14 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
bool is_stable_indexed = true;
for (const auto & dmfile : stable->getDMFiles())
{
if (!dmfile->isColumnExist(index_info.column_id))
continue; // Regard as indexed, because column does not need any index

auto column_stat = dmfile->getColumnStat(index_info.column_id);

if (column_stat.index_bytes == 0 && column_stat.data_bytes > 0)
const auto [state, bytes] = dmfile->getLocalIndexState(index_info.column_id, index_info.index_id);
UNUSED(bytes);
switch (state)
{
case DMFileMeta::LocalIndexState::NoNeed: // Regard as indexed, because column does not need any index
case DMFileMeta::LocalIndexState::IndexBuilt:
break;
case DMFileMeta::LocalIndexState::IndexPending:
is_stable_indexed = false;
break;
}
Expand Down
66 changes: 49 additions & 17 deletions dbms/src/Storages/DeltaMerge/File/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
#include <IO/WriteHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>
#include <Storages/KVStore/Types.h>

namespace DB
{
namespace DM
namespace DB::DM
{
struct ColumnStat
{
ColId col_id;
DataTypePtr type;
// The average size of values. A hint for speeding up deserialize.
double avg_size;
// The serialized size of the column data on disk.
// The serialized size of the column data on disk. (including column data and nullmap)
size_t serialized_bytes = 0;

// These members are only useful when using metav2
Expand All @@ -41,7 +40,7 @@ struct ColumnStat
size_t array_sizes_bytes = 0;
size_t array_sizes_mark_bytes = 0;

std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;
std::vector<dtpb::VectorIndexFileProps> vector_index;

#ifndef NDEBUG
// This field is only used for testing
Expand All @@ -63,8 +62,11 @@ struct ColumnStat
stat.set_array_sizes_bytes(array_sizes_bytes);
stat.set_array_sizes_mark_bytes(array_sizes_mark_bytes);

if (vector_index.has_value())
stat.mutable_vector_index()->CopyFrom(vector_index.value());
for (const auto & vec_idx : vector_index)
{
auto * pb_idx = stat.add_vector_indexes();
pb_idx->CopyFrom(vec_idx);
}

#ifndef NDEBUG
stat.set_additional_data_for_test(additional_data_for_test);
Expand All @@ -88,14 +90,29 @@ struct ColumnStat
array_sizes_mark_bytes = proto.array_sizes_mark_bytes();

if (proto.has_vector_index())
vector_index = proto.vector_index();
{
// For backward compatibility, loaded `vector_index` into `vector_indexes`
// with index_id == EmptyIndexID
vector_index.emplace_back(proto.vector_index());
auto & idx = vector_index.back();
idx.set_index_id(EmptyIndexID);
idx.set_index_bytes(index_bytes);
}
vector_index.reserve(vector_index.size() + proto.vector_indexes_size());
for (const auto & pb_idx : proto.vector_indexes())
{
vector_index.emplace_back(pb_idx);
}

#ifndef NDEBUG
additional_data_for_test = proto.additional_data_for_test();
#endif
}

// @deprecated. New fields should be added via protobuf. Use `toProto` instead
void serializeToBuffer(WriteBuffer & buf) const
// New fields should be added via protobuf. Use `toProto` instead
[[deprecated("Use ColumnStat::toProto instead")]] //
void
serializeToBuffer(WriteBuffer & buf) const
{
writeIntBinary(col_id, buf);
writeStringBinary(type->getName(), buf);
Expand All @@ -108,8 +125,10 @@ struct ColumnStat
writeIntBinary(index_bytes, buf);
}

// @deprecated. This only presents for reading with old data. Use `mergeFromProto` instead
void parseFromBuffer(ReadBuffer & buf)
// This only presents for reading with old data. Use `mergeFromProto` instead
[[deprecated("Use ColumnStat::mergeFromProto instead")]] //
void
parseFromBuffer(ReadBuffer & buf)
{
readIntBinary(col_id, buf);
String type_name;
Expand All @@ -127,7 +146,9 @@ struct ColumnStat

using ColumnStats = std::unordered_map<ColId, ColumnStat>;

inline void readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadBuffer & buf)
[[deprecated("Used by DMFileMeta v1. Use ColumnStat::mergeFromProto instead")]] //
inline void
readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadBuffer & buf)
{
DataTypeFactory & data_type_factory = DataTypeFactory::instance();

Expand Down Expand Up @@ -155,11 +176,23 @@ inline void readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadB
DB::assertChar('\n', buf);

auto type = data_type_factory.getOrSet(type_name);
column_sats.emplace(id, ColumnStat{id, type, avg_size, serialized_bytes});
column_sats.emplace(
id,
ColumnStat{
.col_id = id,
.type = type,
.avg_size = avg_size,
.serialized_bytes = serialized_bytes,
// ... here ignore some fields with default initializers
.vector_index = {},
.additional_data_for_test = {},
});
}
}

inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver, WriteBuffer & buf)
[[deprecated("Used by DMFileMeta v1. Use ColumnStat::toProto instead")]] //
inline void
writeText(const ColumnStats & column_sats, DMFileFormat::Version ver, WriteBuffer & buf)
{
DB::writeString("Columns: ", buf);
DB::writeText(column_sats.size(), buf);
Expand All @@ -182,5 +215,4 @@ inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver
}
}

} // namespace DM
} // namespace DB
} // namespace DB::DM
30 changes: 29 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

#pragma once

#include <DataTypes/IDataType.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFileMetaV2.h>
#include <Storages/DeltaMerge/File/DMFileUtil.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/FormatVersion.h>
Expand Down Expand Up @@ -144,6 +147,29 @@ class DMFile : private boost::noncopyable
}
bool isColumnExist(ColId col_id) const { return meta->column_stats.contains(col_id); }

std::tuple<DMFileMeta::LocalIndexState, size_t> getLocalIndexState(ColId col_id, IndexID index_id) const
{
return meta->getLocalIndexState(col_id, index_id);
}

// Check whether the local index of given col_id and index_id has been built on this dmfile.
// Return false if
// - the col_id is not exist in the dmfile
// - the index has not been built
bool isLocalIndexExist(ColId col_id, IndexID index_id) const
{
return std::get<0>(meta->getLocalIndexState(col_id, index_id)) == DMFileMeta::LocalIndexState::IndexBuilt;
}

// Try to get the local index of given col_id and index_id.
// Return std::nullopt if
// - the col_id is not exist in the dmfile
// - the index has not been built
std::optional<dtpb::VectorIndexFileProps> getLocalIndex(ColId col_id, IndexID index_id) const
{
return meta->getLocalIndex(col_id, index_id);
}

/*
* TODO: This function is currently unused. We could use it when:
* 1. The content is polished (e.g. including at least file ID, and use a format easy for grep).
Expand Down Expand Up @@ -183,7 +209,6 @@ class DMFile : private boost::noncopyable
void switchToRemote(const S3::DMFileOID & oid) const;

UInt32 metaVersion() const { return meta->metaVersion(); }
UInt32 bumpMetaVersion() const { return meta->bumpMetaVersion(); }

private:
DMFile(
Expand Down Expand Up @@ -279,6 +304,9 @@ class DMFile : private boost::noncopyable
return IDataType::getFileNameForStream(DB::toString(col_id), substream);
}

static String vectorIndexFileName(IndexID index_id) { return fmt::format("idx_{}.vector", index_id); }
String vectorIndexPath(IndexID index_id) const { return subFilePath(vectorIndexFileName(index_id)); }

void addPack(const DMFileMeta::PackStat & pack_stat) const { meta->pack_stats.push_back(pack_stat); }

DMFileStatus getStatus() const { return meta->status; }
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
bool is_matching_ann_query = false;
for (const auto & cd : read_columns)
{
// Note that it requires ann_query_info->column_id match
if (cd.id == ann_query_info->column_id())
{
is_matching_ann_query = true;
Expand Down Expand Up @@ -191,8 +192,10 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn

RUNTIME_CHECK(rest_columns.size() + 1 == read_columns.size(), rest_columns.size(), read_columns.size());

const auto & vec_column_stat = dmfile->getColumnStat(vec_column->id);
if (vec_column_stat.index_bytes == 0 || !vec_column_stat.vector_index.has_value())
const IndexID ann_query_info_index_id = ann_query_info->index_id() > 0 //
? ann_query_info->index_id()
: EmptyIndexID;
if (!dmfile->isLocalIndexExist(vec_column->id, ann_query_info_index_id))
// Vector index is defined but does not exist on the data file,
// or there is no data at all
return fallback();
Expand Down
Loading

0 comments on commit 25a3c6b

Please sign in to comment.