From 27427203c31a19bf2075058be15983a95def5643 Mon Sep 17 00:00:00 2001 From: JaySon Date: Fri, 6 Sep 2024 10:52:10 +0800 Subject: [PATCH] Storage: Support multiple vec indexes on the same column (#279) --- contrib/tipb | 2 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 12 +- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 10 +- .../DeltaMergeStore_InternalSegment.cpp | 13 +- .../DeltaMerge/DeltaMergeStore_Statistics.cpp | 14 +- .../src/Storages/DeltaMerge/File/ColumnStat.h | 66 ++- dbms/src/Storages/DeltaMerge/File/DMFile.h | 30 +- .../File/DMFileBlockInputStream.cpp | 7 +- .../DeltaMerge/File/DMFileIndexWriter.cpp | 138 +++-- .../DeltaMerge/File/DMFileIndexWriter.h | 4 +- .../src/Storages/DeltaMerge/File/DMFileMeta.h | 32 +- .../Storages/DeltaMerge/File/DMFileMetaV2.cpp | 55 ++ .../Storages/DeltaMerge/File/DMFileMetaV2.h | 14 +- .../DMFileWithVectorIndexBlockInputStream.cpp | 30 +- .../Storages/DeltaMerge/File/DMFileWriter.cpp | 11 +- .../Storages/DeltaMerge/Index/IndexInfo.cpp | 7 +- .../src/Storages/DeltaMerge/Index/IndexInfo.h | 11 +- .../Storages/DeltaMerge/Index/VectorIndex.cpp | 12 +- .../Storages/DeltaMerge/Index/VectorIndex.h | 9 +- .../Index/VectorIndexHNSW/Index.cpp | 25 +- .../DeltaMerge/Index/VectorIndexHNSW/Index.h | 2 +- .../DeltaMerge/LocalIndexerScheduler.h | 2 +- .../src/Storages/DeltaMerge/dtpb/dmfile.proto | 4 +- .../tests/gtest_dm_meta_version.cpp | 18 +- .../tests/gtest_dm_vector_index.cpp | 519 +++++++++++++++++- .../tests/gtest_dm_vector_index_utils.h | 2 +- .../tests/gtest_local_index_info.cpp | 38 +- .../gtest_segment_replace_stable_data.cpp | 4 +- .../tests/gtest_segment_test_basic.cpp | 2 +- .../tests/gtest_segment_test_basic.h | 2 +- dbms/src/Storages/S3/FileCache.cpp | 4 + dbms/src/Storages/S3/FileCache.h | 6 + .../src/Storages/S3/tests/gtest_filecache.cpp | 118 ++-- .../System/StorageSystemDTLocalIndexes.cpp | 2 +- .../TiDB/Schema/tests/gtest_schema_sync.cpp | 5 + tests/fullstack-test2/vector/distance.test | 61 ++ .../fullstack-test2/vector/vector-index.test | 105 ++++ 37 files changed, 1129 insertions(+), 267 deletions(-) create mode 100644 tests/fullstack-test2/vector/distance.test create mode 100644 tests/fullstack-test2/vector/vector-index.test diff --git a/contrib/tipb b/contrib/tipb index e46e4632bd2..e9fcadb2a31 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit e46e4632bd2b8c28a1a5f0986513bec8e25984e9 +Subproject commit e9fcadb2a31289d82c2ce3c07f8c60ca43d7f93a diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index b67c78ce102..df122ee2489 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -2020,12 +2020,8 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info) original_table_columns.swap(new_original_table_columns); store_columns.swap(new_store_columns); - // copy the local_index_infos to check whether any new index is created - LocalIndexInfosPtr local_index_infos_copy = nullptr; - { - std::shared_lock index_read_lock(mtx_local_index_infos); - local_index_infos_copy = std::shared_ptr(local_index_infos); - } + // Get a snapshot on the local_index_infos to check whether any new index is created + LocalIndexInfosSnapshot local_index_infos_snap = getLocalIndexInfosSnapshot(); std::atomic_store(&original_table_header, std::make_shared(toEmptyBlock(original_table_columns))); @@ -2033,11 +2029,11 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info) // and generate tasks on segments lock.unlock(); - auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_copy, table_info, log); + auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_snap, table_info, log); if (new_local_index_infos) { { - // new index created, update the info in-memory + // new index created, update the info in-memory thread safety between `getLocalIndexInfosSnapshot` std::unique_lock index_write_lock(mtx_local_index_infos); local_index_infos.swap(new_local_index_infos); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 194315eb905..b71d913cb8a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -176,7 +176,6 @@ struct StoreStats struct LocalIndexStats { - String column_name{}; UInt64 column_id{}; UInt64 index_id{}; String index_kind{}; @@ -864,13 +863,16 @@ class DeltaMergeStore const SegmentPtr & segment, const DMFiles & new_dm_files); - // Get a snap of local_index_infos to check whether any new index is created. - LocalIndexInfosPtr getLocalIndexInfosSnapshot() const + // Get a snap of local_index_infos for checking. + // Note that this is just a shallow copy of `local_index_infos`, do not + // modify the local indexes inside the snapshot. + LocalIndexInfosSnapshot getLocalIndexInfosSnapshot() const { std::shared_lock index_read_lock(mtx_local_index_infos); if (!local_index_infos || local_index_infos->empty()) return nullptr; - return std::make_shared(*local_index_infos); + // only make a shallow copy on the shared_ptr is OK + return local_index_infos; } /** diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index d20acabfa2e..f63e7f79788 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -528,7 +528,6 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment) { RUNTIME_CHECK(segment != nullptr); - // TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL auto local_index_infos_snap = getLocalIndexInfosSnapshot(); if (!local_index_infos_snap) return false; @@ -569,7 +568,6 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co { RUNTIME_CHECK(segment != nullptr); - // TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL auto local_index_infos_snap = getLocalIndexInfosSnapshot(); if (!local_index_infos_snap) return true; @@ -597,12 +595,11 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co bool all_indexes_built = true; for (const auto & index : *build_info.indexes_to_build) { - auto col_id = index.column_id; - // The dmfile may be built before col_id is added. Skip build indexes for it - if (!dmfile->isColumnExist(col_id)) - continue; - - all_indexes_built = all_indexes_built && dmfile->getColumnStat(col_id).index_bytes > 0; + const auto [state, bytes] = dmfile->getLocalIndexState(index.column_id, index.index_id); + UNUSED(bytes); + all_indexes_built = all_indexes_built + // dmfile built before the column_id added or index already built + && (state == DMFileMeta::LocalIndexState::NoNeed || state == DMFileMeta::LocalIndexState::IndexBuilt); } if (all_indexes_built) return true; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp index 90e899e53f7..7a0a553fbc9 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -206,7 +206,6 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats() LocalIndexStats index_stats; index_stats.column_id = index_info.column_id; index_stats.index_id = index_info.index_id; - index_stats.column_name = index_info.column_name; index_stats.index_kind = "HNSW"; // TODO: Support more. for (const auto & [handle, segment] : segments) @@ -221,13 +220,14 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats() bool is_stable_indexed = true; for (const auto & dmfile : stable->getDMFiles()) { - if (!dmfile->isColumnExist(index_info.column_id)) - continue; // Regard as indexed, because column does not need any index - - auto column_stat = dmfile->getColumnStat(index_info.column_id); - - if (column_stat.index_bytes == 0 && column_stat.data_bytes > 0) + const auto [state, bytes] = dmfile->getLocalIndexState(index_info.column_id, index_info.index_id); + UNUSED(bytes); + switch (state) { + case DMFileMeta::LocalIndexState::NoNeed: // Regard as indexed, because column does not need any index + case DMFileMeta::LocalIndexState::IndexBuilt: + break; + case DMFileMeta::LocalIndexState::IndexPending: is_stable_indexed = false; break; } diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStat.h b/dbms/src/Storages/DeltaMerge/File/ColumnStat.h index 4f44acb7b25..d213a5038c2 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStat.h +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStat.h @@ -18,10 +18,9 @@ #include #include #include +#include -namespace DB -{ -namespace DM +namespace DB::DM { struct ColumnStat { @@ -29,7 +28,7 @@ struct ColumnStat DataTypePtr type; // The average size of values. A hint for speeding up deserialize. double avg_size; - // The serialized size of the column data on disk. + // The serialized size of the column data on disk. (including column data and nullmap) size_t serialized_bytes = 0; // These members are only useful when using metav2 @@ -41,7 +40,7 @@ struct ColumnStat size_t array_sizes_bytes = 0; size_t array_sizes_mark_bytes = 0; - std::optional vector_index = std::nullopt; + std::vector vector_index; #ifndef NDEBUG // This field is only used for testing @@ -63,8 +62,11 @@ struct ColumnStat stat.set_array_sizes_bytes(array_sizes_bytes); stat.set_array_sizes_mark_bytes(array_sizes_mark_bytes); - if (vector_index.has_value()) - stat.mutable_vector_index()->CopyFrom(vector_index.value()); + for (const auto & vec_idx : vector_index) + { + auto * pb_idx = stat.add_vector_indexes(); + pb_idx->CopyFrom(vec_idx); + } #ifndef NDEBUG stat.set_additional_data_for_test(additional_data_for_test); @@ -88,14 +90,29 @@ struct ColumnStat array_sizes_mark_bytes = proto.array_sizes_mark_bytes(); if (proto.has_vector_index()) - vector_index = proto.vector_index(); + { + // For backward compatibility, loaded `vector_index` into `vector_indexes` + // with index_id == EmptyIndexID + vector_index.emplace_back(proto.vector_index()); + auto & idx = vector_index.back(); + idx.set_index_id(EmptyIndexID); + idx.set_index_bytes(index_bytes); + } + vector_index.reserve(vector_index.size() + proto.vector_indexes_size()); + for (const auto & pb_idx : proto.vector_indexes()) + { + vector_index.emplace_back(pb_idx); + } + #ifndef NDEBUG additional_data_for_test = proto.additional_data_for_test(); #endif } - // @deprecated. New fields should be added via protobuf. Use `toProto` instead - void serializeToBuffer(WriteBuffer & buf) const + // New fields should be added via protobuf. Use `toProto` instead + [[deprecated("Use ColumnStat::toProto instead")]] // + void + serializeToBuffer(WriteBuffer & buf) const { writeIntBinary(col_id, buf); writeStringBinary(type->getName(), buf); @@ -108,8 +125,10 @@ struct ColumnStat writeIntBinary(index_bytes, buf); } - // @deprecated. This only presents for reading with old data. Use `mergeFromProto` instead - void parseFromBuffer(ReadBuffer & buf) + // This only presents for reading with old data. Use `mergeFromProto` instead + [[deprecated("Use ColumnStat::mergeFromProto instead")]] // + void + parseFromBuffer(ReadBuffer & buf) { readIntBinary(col_id, buf); String type_name; @@ -127,7 +146,9 @@ struct ColumnStat using ColumnStats = std::unordered_map; -inline void readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadBuffer & buf) +[[deprecated("Used by DMFileMeta v1. Use ColumnStat::mergeFromProto instead")]] // +inline void +readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadBuffer & buf) { DataTypeFactory & data_type_factory = DataTypeFactory::instance(); @@ -155,11 +176,23 @@ inline void readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadB DB::assertChar('\n', buf); auto type = data_type_factory.getOrSet(type_name); - column_sats.emplace(id, ColumnStat{id, type, avg_size, serialized_bytes}); + column_sats.emplace( + id, + ColumnStat{ + .col_id = id, + .type = type, + .avg_size = avg_size, + .serialized_bytes = serialized_bytes, + // ... here ignore some fields with default initializers + .vector_index = {}, + .additional_data_for_test = {}, + }); } } -inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver, WriteBuffer & buf) +[[deprecated("Used by DMFileMeta v1. Use ColumnStat::toProto instead")]] // +inline void +writeText(const ColumnStats & column_sats, DMFileFormat::Version ver, WriteBuffer & buf) { DB::writeString("Columns: ", buf); DB::writeText(column_sats.size(), buf); @@ -182,5 +215,4 @@ inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver } } -} // namespace DM -} // namespace DB +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index c59a69ac140..b5d569445ed 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -14,9 +14,12 @@ #pragma once +#include +#include #include #include #include +#include #include #include #include @@ -144,6 +147,29 @@ class DMFile : private boost::noncopyable } bool isColumnExist(ColId col_id) const { return meta->column_stats.contains(col_id); } + std::tuple getLocalIndexState(ColId col_id, IndexID index_id) const + { + return meta->getLocalIndexState(col_id, index_id); + } + + // Check whether the local index of given col_id and index_id has been built on this dmfile. + // Return false if + // - the col_id is not exist in the dmfile + // - the index has not been built + bool isLocalIndexExist(ColId col_id, IndexID index_id) const + { + return std::get<0>(meta->getLocalIndexState(col_id, index_id)) == DMFileMeta::LocalIndexState::IndexBuilt; + } + + // Try to get the local index of given col_id and index_id. + // Return std::nullopt if + // - the col_id is not exist in the dmfile + // - the index has not been built + std::optional getLocalIndex(ColId col_id, IndexID index_id) const + { + return meta->getLocalIndex(col_id, index_id); + } + /* * TODO: This function is currently unused. We could use it when: * 1. The content is polished (e.g. including at least file ID, and use a format easy for grep). @@ -183,7 +209,6 @@ class DMFile : private boost::noncopyable void switchToRemote(const S3::DMFileOID & oid) const; UInt32 metaVersion() const { return meta->metaVersion(); } - UInt32 bumpMetaVersion() const { return meta->bumpMetaVersion(); } private: DMFile( @@ -279,6 +304,9 @@ class DMFile : private boost::noncopyable return IDataType::getFileNameForStream(DB::toString(col_id), substream); } + static String vectorIndexFileName(IndexID index_id) { return fmt::format("idx_{}.vector", index_id); } + String vectorIndexPath(IndexID index_id) const { return subFilePath(vectorIndexFileName(index_id)); } + void addPack(const DMFileMeta::PackStat & pack_stat) const { meta->pack_stats.push_back(pack_stat); } DMFileStatus getStatus() const { return meta->status; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index ec90d605feb..fd8cf0f107c 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -162,6 +162,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn bool is_matching_ann_query = false; for (const auto & cd : read_columns) { + // Note that it requires ann_query_info->column_id match if (cd.id == ann_query_info->column_id()) { is_matching_ann_query = true; @@ -191,8 +192,10 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn RUNTIME_CHECK(rest_columns.size() + 1 == read_columns.size(), rest_columns.size(), read_columns.size()); - const auto & vec_column_stat = dmfile->getColumnStat(vec_column->id); - if (vec_column_stat.index_bytes == 0 || !vec_column_stat.vector_index.has_value()) + const IndexID ann_query_info_index_id = ann_query_info->index_id() > 0 // + ? ann_query_info->index_id() + : EmptyIndexID; + if (!dmfile->isLocalIndexExist(vec_column->id, ann_query_info_index_id)) // Vector index is defined but does not exist on the data file, // or there is no data at all return fallback(); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp index 01410a57970..e0485dfdfb4 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp @@ -22,7 +22,11 @@ #include #include #include +#include #include +#include + +#include namespace DB::ErrorCodes { @@ -33,7 +37,7 @@ namespace DB::DM { DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo( - const LocalIndexInfosPtr & index_infos, + const LocalIndexInfosSnapshot & index_infos, const DMFiles & dm_files) { assert(index_infos != nullptr); @@ -48,18 +52,22 @@ DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo for (const auto & index : *index_infos) { auto col_id = index.column_id; - // The dmfile may be built before col_id is added. Skip build indexes for it - if (!dmfile->isColumnExist(col_id)) - continue; - - if (dmfile->getColumnStat(col_id).index_bytes > 0) - continue; - - any_new_index_build = true; - - auto col_stat = dmfile->getColumnStat(col_id); - build.indexes_to_build->emplace_back(index); - build.estimated_memory_bytes += col_stat.serialized_bytes * VECTOR_INDEX_SIZE_FACTOR; + const auto [state, data_bytes] = dmfile->getLocalIndexState(col_id, index.index_id); + switch (state) + { + case DMFileMeta::LocalIndexState::NoNeed: + case DMFileMeta::LocalIndexState::IndexBuilt: + // The dmfile may be built before col_id is added, or has been built. Skip build indexes for it + break; + case DMFileMeta::LocalIndexState::IndexPending: + { + any_new_index_build = true; + + build.indexes_to_build->emplace_back(index); + build.estimated_memory_bytes += data_bytes * VECTOR_INDEX_SIZE_FACTOR; + break; + } + } } if (any_new_index_build) @@ -82,36 +90,50 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, P dm_file_mutable->path()); // read_columns are: DEL_MARK, COL_A, COL_B, ... - // index_builders are: COL_A, COL_B, ... + // index_builders are: COL_A -> {idx_M, idx_N}, COL_B -> {idx_O}, ... ColumnDefines read_columns{*del_cd_iter}; read_columns.reserve(options.index_infos->size() + 1); - std::vector index_builders; - index_builders.reserve(options.index_infos->size()); + std::unordered_map> index_builders; - // The caller should avoid building index for the same column multiple times. + std::unordered_map> col_indexes; for (const auto & index_info : *options.index_infos) + { + if (index_info.type != IndexType::Vector) + continue; + col_indexes[index_info.column_id].emplace_back(index_info); + } + + for (const auto & [col_id, index_infos] : col_indexes) { const auto cd_iter = std::find_if(column_defines.cbegin(), column_defines.cend(), [&](const auto & cd) { - return cd.id == index_info.column_id; + return cd.id == col_id; }); RUNTIME_CHECK_MSG( cd_iter != column_defines.cend(), "Cannot find column_id={} in file={}", - index_info.column_id, + col_id, dm_file_mutable->path()); - // Index already built. We don't allow. The caller should filter away. - RUNTIME_CHECK(dm_file_mutable->getColumnStat(index_info.column_id).index_bytes == 0, index_info.column_id); - + for (const auto & idx_info : index_infos) + { + // Index already built. We don't allow. The caller should filter away, + RUNTIME_CHECK( + !dm_file_mutable->isLocalIndexExist(idx_info.column_id, idx_info.index_id), + idx_info.column_id, + idx_info.index_id); + index_builders[col_id].emplace_back( + VectorIndexBuilder::create(idx_info.index_id, idx_info.index_definition)); + } read_columns.push_back(*cd_iter); - index_builders.push_back(VectorIndexBuilder::create(index_info.index_definition)); } - // If no index to build. - if (index_builders.empty()) + if (read_columns.size() == 1 || index_builders.empty()) + { + // No index to build. return 0; + } DMFileV3IncrementWriter::Options iw_options{ .dm_file = dm_file_mutable, @@ -133,6 +155,7 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, P scan_context); // Read all blocks and build index + const size_t num_cols = read_columns.size(); while (true) { if (!should_proceed()) @@ -142,7 +165,7 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, P if (!block) break; - RUNTIME_CHECK(block.columns() == read_columns.size()); + RUNTIME_CHECK(block.columns() == num_cols); RUNTIME_CHECK(block.getByPosition(0).column_id == TAG_COLUMN_ID); auto del_mark_col = block.safeGetByPosition(0).column; @@ -150,49 +173,60 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, P const auto * del_mark = static_cast *>(del_mark_col.get()); RUNTIME_CHECK(del_mark != nullptr); - for (size_t col_idx = 0, col_idx_max = index_builders.size(); col_idx < col_idx_max; ++col_idx) + for (size_t col_idx = 1; col_idx < num_cols; ++col_idx) { - const auto & index_builder = index_builders[col_idx]; - const auto & col_with_type_and_name = block.safeGetByPosition(col_idx + 1); - RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns[col_idx + 1].id); + const auto & col_with_type_and_name = block.safeGetByPosition(col_idx); + RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns[col_idx].id); const auto & col = col_with_type_and_name.column; - index_builder->addBlock(*col, del_mark, should_proceed); + for (const auto & index_builder : index_builders[read_columns[col_idx].id]) + { + index_builder->addBlock(*col, del_mark, should_proceed); + } } } // Write down the index size_t total_built_index_bytes = 0; - for (size_t col_idx = 0, col_idx_max = index_builders.size(); col_idx < col_idx_max; col_idx++) + std::unordered_map> new_indexes_on_cols; + for (size_t col_idx = 1; col_idx < num_cols; ++col_idx) { - const auto & index_builder = index_builders[col_idx]; - const auto & cd = read_columns[col_idx + 1]; - + const auto & cd = read_columns[col_idx]; // Save index and update column stats auto callback = [&](const IDataType::SubstreamPath & substream_path) -> void { if (IDataType::isNullMap(substream_path) || IDataType::isArraySizes(substream_path)) return; - const auto stream_name = DMFile::getFileNameBase(cd.id, substream_path); - const auto index_file_name = colIndexFileName(stream_name); - const auto index_path = iw->localPath() + "/" + index_file_name; - index_builder->save(index_path); - - auto & col_stat = dm_file_mutable->meta->getColumnStats().at(cd.id); - col_stat.index_bytes = Poco::File(index_path).getSize(); - total_built_index_bytes += col_stat.index_bytes; - // Memorize what kind of vector index it is, so that we can correctly restore it when reading. - col_stat.vector_index.emplace(); - col_stat.vector_index->set_index_kind(tipb::VectorIndexKind_Name(index_builder->definition->kind)); - col_stat.vector_index->set_distance_metric( - tipb::VectorDistanceMetric_Name(index_builder->definition->distance_metric)); - col_stat.vector_index->set_dimensions(index_builder->definition->dimension); - - iw->include(index_file_name); + std::vector new_indexes; + for (const auto & index_builder : index_builders[cd.id]) + { + const IndexID index_id = index_builder->index_id; + const auto index_file_name = index_id > 0 + ? dm_file_mutable->vectorIndexFileName(index_id) + : colIndexFileName(DMFile::getFileNameBase(cd.id, substream_path)); + const auto index_path = iw->localPath() + "/" + index_file_name; + index_builder->save(index_path); + + // Memorize what kind of vector index it is, so that we can correctly restore it when reading. + dtpb::VectorIndexFileProps pb_idx; + pb_idx.set_index_kind(tipb::VectorIndexKind_Name(index_builder->definition->kind)); + pb_idx.set_distance_metric(tipb::VectorDistanceMetric_Name(index_builder->definition->distance_metric)); + pb_idx.set_dimensions(index_builder->definition->dimension); + pb_idx.set_index_id(index_id); + pb_idx.set_index_bytes(Poco::File(index_path).getSize()); + new_indexes.emplace_back(std::move(pb_idx)); + + total_built_index_bytes += pb_idx.index_bytes(); + iw->include(index_file_name); + } + // Inorder to avoid concurrency reading on ColumnStat, the new added indexes + // will be insert into DMFile instance in `bumpMetaVersion`. + new_indexes_on_cols.emplace(cd.id, std::move(new_indexes)); }; + cd.type->enumerateStreams(callback); } - dm_file_mutable->meta->bumpMetaVersion(); + dm_file_mutable->meta->bumpMetaVersion(DMFileMetaChangeset{new_indexes_on_cols}); iw->finalize(); // Note: There may be S3 uploads here. return total_built_index_bytes; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h index 3b4e13868bd..2747a40d260 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h @@ -47,7 +47,9 @@ class DMFileIndexWriter LocalIndexInfosPtr indexes_to_build; }; - static LocalIndexBuildInfo getLocalIndexBuildInfo(const LocalIndexInfosPtr & index_infos, const DMFiles & dm_files); + static LocalIndexBuildInfo getLocalIndexBuildInfo( + const LocalIndexInfosSnapshot & index_infos, + const DMFiles & dm_files); struct Options { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h index 744e278652a..8566f144980 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h @@ -40,6 +40,11 @@ class DMFile; class DMFileWriter; class DMFileV3IncrementWriter; +struct DMFileMetaChangeset +{ + std::unordered_map> new_indexes_on_cols; +}; + class DMFileMeta { public: @@ -187,10 +192,35 @@ class DMFileMeta * @brief metaVersion += 1. Returns the new meta version. * This is only supported in MetaV2. */ - virtual UInt32 bumpMetaVersion() { RUNTIME_CHECK_MSG(false, "MetaV1 cannot bump meta version"); } + virtual UInt32 bumpMetaVersion(DMFileMetaChangeset &&) + { + RUNTIME_CHECK_MSG(false, "MetaV1 cannot bump meta version"); + } virtual EncryptionPath encryptionMetaPath() const; virtual UInt64 getReadFileSize(ColId col_id, const String & filename) const; + +public: + enum LocalIndexState + { + NoNeed, + IndexPending, + IndexBuilt + }; + virtual std::tuple getLocalIndexState(ColId, IndexID) const + { + RUNTIME_CHECK_MSG(false, "MetaV1 does not support getLocalIndexState"); + } + + // Try to get the local index of given col_id and index_id. + // Return std::nullopt if + // - the col_id is not exist in the dmfile + // - the index has not been built + virtual std::optional getLocalIndex(ColId, IndexID) const + { + RUNTIME_CHECK_MSG(false, "MetaV1 does not support getLocalIndexState"); + } + protected: PackStats pack_stats; PackProperties pack_properties; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp index e37080ce4c4..2e6ca7276c4 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp @@ -412,4 +412,59 @@ UInt64 DMFileMetaV2::getMergedFileSizeOfColumn(const MergedSubFileInfo & file_in return itr->size; } +UInt32 DMFileMetaV2::bumpMetaVersion(DMFileMetaChangeset && changeset) +{ + std::scoped_lock lock(mtx_bump); + + for (auto & [col_id, col_stat] : column_stats) + { + auto changed_col_iter = changeset.new_indexes_on_cols.find(col_id); + if (changed_col_iter == changeset.new_indexes_on_cols.end()) + continue; + col_stat.vector_index.insert( + col_stat.vector_index.end(), + changed_col_iter->second.begin(), + changed_col_iter->second.end()); + } + + // bump the version + ++meta_version; + return meta_version; +} + +std::tuple DMFileMetaV2::getLocalIndexState(ColId col_id, IndexID index_id) const +{ + // acquire a lock on meta to ensure the atomically on col_stat.vector_index + std::scoped_lock lock(mtx_bump); + auto it = column_stats.find(col_id); + if (unlikely(it == column_stats.end())) + return {LocalIndexState::NoNeed, 0}; + + const auto & col_stat = it->second; + bool built = std::any_of( // + col_stat.vector_index.cbegin(), + col_stat.vector_index.cend(), + [index_id](const auto & idx) { return idx.index_id() == index_id; }); + if (built) + return {LocalIndexState::IndexBuilt, 0}; + // index is pending for build, return the column data bytes + return {LocalIndexState::IndexPending, col_stat.data_bytes}; +} + +std::optional DMFileMetaV2::getLocalIndex(ColId col_id, IndexID index_id) const +{ + // acquire a lock on meta to ensure the atomically on col_stat.vector_index + std::scoped_lock lock(mtx_bump); + auto it = column_stats.find(col_id); + if (unlikely(it == column_stats.end())) + return std::nullopt; + + const auto & col_stat = it->second; + for (const auto & vec_idx : col_stat.vector_index) + { + if (vec_idx.index_id() == index_id) + return vec_idx; + } + return std::nullopt; +} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h index 7a8d61b4264..2fadf6d9f72 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h @@ -102,17 +102,21 @@ class DMFileMetaV2 : public DMFileMeta EncryptionPath encryptionMergedPath(UInt32 number) const; static String mergedFilename(UInt32 number) { return fmt::format("{}.merged", number); } +public: + std::tuple getLocalIndexState(ColId col_id, IndexID index_id) const override; + + std::optional getLocalIndex(ColId col_id, IndexID index_id) const override; + +public: UInt32 metaVersion() const override { return meta_version; } - UInt32 bumpMetaVersion() override - { - ++meta_version; - return meta_version; - } + UInt32 bumpMetaVersion(DMFileMetaChangeset && changeset) override; UInt64 small_file_size_threshold; UInt64 merged_file_max_size; UInt64 meta_version = 0; // Note: meta_version affects the output file name. + mutable std::mutex mtx_bump; + private: UInt64 getMergedFileSizeOfColumn(const MergedSubFileInfo & file_info) const; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp index 015ae523ee6..b2a1dab4266 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp @@ -92,10 +92,11 @@ DMFileWithVectorIndexBlockInputStream::~DMFileWithVectorIndexBlockInputStream() LOG_DEBUG( // log, "Finished read DMFile with vector index for column dmf_{}/{}(id={}), " - "query_top_k={} load_index+result={:.2f}s read_from_index={:.2f}s read_from_others={:.2f}s", + "index_id={} query_top_k={} load_index+result={:.2f}s read_from_index={:.2f}s read_from_others={:.2f}s", dmfile->fileId(), vec_cd.name, vec_cd.id, + ann_query_info->index_id(), ann_query_info->top_k(), duration_load_vec_index_and_results_seconds, duration_read_from_vec_index_seconds, @@ -273,19 +274,20 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorIndex() double duration_load_index = 0; // include download from s3 and load from fs - auto col_id = ann_query_info->column_id(); + const auto col_id = ann_query_info->column_id(); + const auto index_id = ann_query_info->index_id() > 0 ? ann_query_info->index_id() : EmptyIndexID; RUNTIME_CHECK(dmfile->useMetaV2()); // v3 // Check vector index exists on the column - const auto & column_stat = dmfile->getColumnStat(col_id); - RUNTIME_CHECK(column_stat.index_bytes > 0); - RUNTIME_CHECK(column_stat.vector_index.has_value()); + auto vector_index = dmfile->getLocalIndex(col_id, index_id); + RUNTIME_CHECK(vector_index.has_value(), col_id, index_id); // If local file is invalidated, cache is not valid anymore. So we // need to ensure file exists on local fs first. - const auto file_name_base = DMFile::getFileNameBase(col_id); - const auto index_file_path = dmfile->colIndexPath(file_name_base); + const auto index_file_path = index_id > 0 // + ? dmfile->vectorIndexPath(index_id) // + : dmfile->colIndexPath(DMFile::getFileNameBase(col_id)); String local_index_file_path; FileSegmentPtr file_guard = nullptr; if (auto s3_file_name = S3::S3FilenameView::fromKeyWithPrefix(index_file_path); s3_file_name.isValid()) @@ -305,7 +307,7 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorIndex() { file_guard = file_cache->downloadFileForLocalRead( // s3_file_name, - column_stat.index_bytes); + vector_index->index_bytes()); if (file_guard) { local_index_file_path = file_guard->getLocalFileName(); @@ -342,7 +344,7 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorIndex() auto load_from_file = [&]() { has_load_from_file = true; - return VectorIndexViewer::view(*column_stat.vector_index, local_index_file_path); + return VectorIndexViewer::view(*vector_index, local_index_file_path); }; Stopwatch watch; @@ -370,12 +372,13 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorIndex() LOG_DEBUG( // log, - "Loaded vector index for column dmf_{}/{}(id={}), index_size={} kind={} cost={:.2f}s {} {}", + "Loaded vector index for column dmf_{}/{}(id={}), index_id={} index_size={} kind={} cost={:.2f}s {} {}", dmfile->fileId(), vec_cd.name, vec_cd.id, - column_stat.index_bytes, - column_stat.vector_index->index_kind(), + vector_index->index_id(), + vector_index->index_bytes(), + vector_index->index_kind(), duration_load_index, has_s3_download ? "(S3)" : "", has_load_from_file ? "(LoadFile)" : ""); @@ -446,7 +449,7 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() LOG_DEBUG( // log, - "Finished vector search over column dmf_{}/{}(id={}), cost={:.2f}s " + "Finished vector search over column dmf_{}/{}(id={}), index_id={} cost={:.3f}s " "top_k_[query/visited/discarded/result]={}/{}/{}/{} " "rows_[file/after_search]={}/{} " "pack_[total/before_search/after_search]={}/{}/{}", @@ -454,6 +457,7 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() dmfile->fileId(), vec_cd.name, vec_cd.id, + ann_query_info->index_id(), search_duration, ann_query_info->top_k(), diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 4b82aaf4ce3..9c37eb6e86c 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -65,7 +65,16 @@ DMFileWriter::DMFileWriter( bool do_index = cd.id == EXTRA_HANDLE_COLUMN_ID || type->isInteger() || type->isDateOrDateTime(); addStreams(cd.id, cd.type, do_index); - dmfile->meta->getColumnStats().emplace(cd.id, ColumnStat{cd.id, cd.type, /*avg_size=*/0}); + dmfile->meta->getColumnStats().emplace( + cd.id, + ColumnStat{ + .col_id = cd.id, + .type = cd.type, + .avg_size = 0, + // ... here ignore some fields with default initializers + .vector_index = {}, + .additional_data_for_test = {}, + }); } } diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp index afeaf90a02f..8eafd257ba4 100644 --- a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp @@ -95,7 +95,6 @@ LocalIndexInfosPtr initLocalIndexInfos(const TiDB::TableInfo & table_info, const index_infos->emplace_back(LocalIndexInfo{ .type = IndexType::Vector, .column_id = col.id, - .column_name = col.name, .index_definition = col.vector_index, }); LOG_INFO(logger, "Add a new index by column comments, column_id={}, table_id={}.", col.id, table_info.id); @@ -121,7 +120,6 @@ LocalIndexInfosPtr initLocalIndexInfos(const TiDB::TableInfo & table_info, const .type = IndexType::Vector, .index_id = idx.id, .column_id = column.id, - .column_name = column.name, .index_definition = idx.vector_index, }); } @@ -131,7 +129,7 @@ LocalIndexInfosPtr initLocalIndexInfos(const TiDB::TableInfo & table_info, const } LocalIndexInfosPtr generateLocalIndexInfos( - const LocalIndexInfosPtr & existing_indexes, + const LocalIndexInfosSnapshot & existing_indexes, const TiDB::TableInfo & new_table_info, const LoggerPtr & logger) { @@ -172,7 +170,6 @@ LocalIndexInfosPtr generateLocalIndexInfos( .type = IndexType::Vector, .index_id = idx.id, .column_id = column.id, - .column_name = column.name, .index_definition = idx.vector_index, }; new_index_infos->emplace_back(std::move(index_info)); @@ -189,7 +186,7 @@ LocalIndexInfosPtr generateLocalIndexInfos( } } - // drop nonexistent indices + // drop nonexistent indexes for (auto & iter : original_local_index_id_map) { // It means this index is create by column comments which we don't support drop index. diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h index 34a5db7d1b6..711ed349646 100644 --- a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h +++ b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h @@ -39,9 +39,10 @@ enum class IndexType struct LocalIndexInfo { IndexType type; + // If the index is defined on TiDB::ColumnInfo, use EmptyIndexID as index_id IndexID index_id = DB::EmptyIndexID; + // Which column_id the index is built on ColumnID column_id = DB::EmptyColumnID; - String column_name; // Now we only support vector index. // In the future, we may support more types of indexes, using std::variant. TiDB::VectorIndexDefinitionPtr index_definition; @@ -49,15 +50,11 @@ struct LocalIndexInfo using LocalIndexInfos = std::vector; using LocalIndexInfosPtr = std::shared_ptr; +using LocalIndexInfosSnapshot = std::shared_ptr; -bool isVectorIndexSupported(const LoggerPtr & logger); LocalIndexInfosPtr initLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger); -TiDB::ColumnInfo getVectorIndxColumnInfo( - const TiDB::TableInfo & table_info, - const TiDB::IndexInfo & idx_info, - const LoggerPtr & logger); LocalIndexInfosPtr generateLocalIndexInfos( - const LocalIndexInfosPtr & existing_indexes, + const LocalIndexInfosSnapshot & existing_indexes, const TiDB::TableInfo & new_table_info, const LoggerPtr & logger); diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp index 525d2192748..652d595281e 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp @@ -24,6 +24,7 @@ namespace DB::ErrorCodes { extern const int BAD_ARGUMENTS; +extern const int INCORRECT_QUERY; } // namespace DB::ErrorCodes namespace DB::DM @@ -38,7 +39,7 @@ bool VectorIndexBuilder::isSupportedType(const IDataType & type) return checkDataTypeArray(&type); } -VectorIndexBuilderPtr VectorIndexBuilder::create(const TiDB::VectorIndexDefinitionPtr & definition) +VectorIndexBuilderPtr VectorIndexBuilder::create(IndexID index_id, const TiDB::VectorIndexDefinitionPtr & definition) { RUNTIME_CHECK(definition->dimension > 0); RUNTIME_CHECK(definition->dimension <= TiDB::MAX_VECTOR_DIMENSION); @@ -46,11 +47,12 @@ VectorIndexBuilderPtr VectorIndexBuilder::create(const TiDB::VectorIndexDefiniti switch (definition->kind) { case tipb::VectorIndexKind::HNSW: - return std::make_shared(definition); + return std::make_shared(index_id, definition); default: - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Unsupported vector index {}", + throw Exception( // + ErrorCodes::INCORRECT_QUERY, + "Unsupported vector index, index_id={} def={}", + index_id, tipb::VectorIndexKind_Name(definition->kind)); } } diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h index 5302cdd3788..b86d9ea6a3b 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -38,13 +39,14 @@ class VectorIndexBuilder using ProceedCheckFn = std::function; public: - static VectorIndexBuilderPtr create(const TiDB::VectorIndexDefinitionPtr & definition); + static VectorIndexBuilderPtr create(IndexID index_id, const TiDB::VectorIndexDefinitionPtr & definition); static bool isSupportedType(const IDataType & type); public: - explicit VectorIndexBuilder(const TiDB::VectorIndexDefinitionPtr & definition_) - : definition(definition_) + explicit VectorIndexBuilder(IndexID index_id_, const TiDB::VectorIndexDefinitionPtr & definition_) + : index_id(index_id_) + , definition(definition_) {} virtual ~VectorIndexBuilder() = default; @@ -58,6 +60,7 @@ class VectorIndexBuilder virtual void save(std::string_view path) const = 0; public: + const IndexID index_id; const TiDB::VectorIndexDefinitionPtr definition; }; diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp index 5b803075d5c..79841675e01 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp @@ -57,8 +57,8 @@ unum::usearch::metric_kind_t getUSearchMetricKind(tipb::VectorDistanceMetric d) } } -VectorIndexHNSWBuilder::VectorIndexHNSWBuilder(const TiDB::VectorIndexDefinitionPtr & definition_) - : VectorIndexBuilder(definition_) +VectorIndexHNSWBuilder::VectorIndexHNSWBuilder(IndexID index_id_, const TiDB::VectorIndexDefinitionPtr & definition_) + : VectorIndexBuilder(index_id_, definition_) , index(USearchImplType::make(unum::usearch::metric_punned_t( // definition_->dimension, getUSearchMetricKind(definition->distance_metric)))) @@ -139,7 +139,7 @@ void VectorIndexHNSWBuilder::save(std::string_view path) const SCOPE_EXIT({ total_duration += w.elapsedSeconds(); }); auto result = index.save(unum::usearch::output_file_t(path.data())); - RUNTIME_CHECK_MSG(result, "Failed to save vector index: {}", result.error.what()); + RUNTIME_CHECK_MSG(result, "Failed to save vector index: {} path={}", result.error.what(), path); } VectorIndexHNSWBuilder::~VectorIndexHNSWBuilder() @@ -182,7 +182,12 @@ VectorIndexViewerPtr VectorIndexHNSWViewer::view(const dtpb::VectorIndexFileProp vi->index.reserve(limit); auto result = vi->index.view(unum::usearch::memory_mapped_file_t(path.data())); - RUNTIME_CHECK_MSG(result, "Failed to load vector index: {}", result.error.what()); + RUNTIME_CHECK_MSG( + result, + "Failed to load vector index: {} props={} path={}", + result.error.what(), + file_props.ShortDebugString(), + path); auto current_memory_usage = vi->index.memory_usage(); GET_METRIC(tiflash_vector_index_memory_usage, type_view).Increment(static_cast(current_memory_usage)); @@ -200,18 +205,22 @@ std::vector VectorIndexHNSWViewer::search( if (query_vec_size != file_props.dimensions()) throw Exception( ErrorCodes::INCORRECT_QUERY, - "Query vector size {} does not match index dimensions {}", + "Query vector size {} does not match index dimensions {}, index_id={} column_id={}", query_vec_size, - file_props.dimensions()); + file_props.dimensions(), + query_info->index_id(), + query_info->column_id()); RUNTIME_CHECK(query_info->ref_vec_f32().size() >= sizeof(UInt32) + query_vec_size * sizeof(Float32)); if (tipb::VectorDistanceMetric_Name(query_info->distance_metric()) != file_props.distance_metric()) throw Exception( ErrorCodes::INCORRECT_QUERY, - "Query distance metric {} does not match index distance metric {}", + "Query distance metric {} does not match index distance metric {}, index_id={} column_id={}", tipb::VectorDistanceMetric_Name(query_info->distance_metric()), - file_props.distance_metric()); + file_props.distance_metric(), + query_info->index_id(), + query_info->column_id()); std::atomic visited_nodes = 0; std::atomic discarded_nodes = 0; diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h index 1804fbe80ea..59161db7cc3 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h @@ -29,7 +29,7 @@ class VectorIndexHNSWBuilder : public VectorIndexBuilder public: static tipb::VectorIndexKind kind(); - explicit VectorIndexHNSWBuilder(const TiDB::VectorIndexDefinitionPtr & definition_); + explicit VectorIndexHNSWBuilder(IndexID index_id_, const TiDB::VectorIndexDefinitionPtr & definition_); ~VectorIndexHNSWBuilder() override; diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index 0b7221deeb9..0f64f68cddb 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -180,7 +180,7 @@ class LocalIndexerScheduler const LoggerPtr logger; - /// The thread pool for creating indices in the background. + /// The thread pool for creating indexes in the background. std::unique_ptr pool; /// The current memory usage of the pool. It is not accurate and the memory /// is determined when task is adding to the pool. diff --git a/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto b/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto index 7ed7a142405..59aff511a2b 100644 --- a/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto +++ b/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto @@ -64,8 +64,8 @@ message ColumnStat { // Only used in tests. Modifying other fields of ColumnStat is hard. optional string additional_data_for_test = 101; - // TODO(vector-index) Support multiple vector index on the same column optional VectorIndexFileProps vector_index = 102; + repeated VectorIndexFileProps vector_indexes = 103; } message ColumnStats { @@ -93,4 +93,6 @@ message VectorIndexFileProps { optional string index_kind = 1; // The value is tipb.VectorIndexKind optional string distance_metric = 2; // The value is tipb.VectorDistanceMetric optional uint64 dimensions = 3; + optional int64 index_id = 4; + optional uint64 index_bytes = 5; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp index 7d11d41c8ca..92db6f9daeb 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp @@ -159,7 +159,7 @@ try .disagg_ctx = db_context->getSharedContextDisagg(), }); dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; - ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion({})); iw->finalize(); // Read out meta version = 0 @@ -203,7 +203,7 @@ try .disagg_ctx = db_context->getSharedContextDisagg(), }); dm_file_for_write->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; - ASSERT_EQ(1, dm_file_for_write->meta->bumpMetaVersion()); + ASSERT_EQ(1, dm_file_for_write->meta->bumpMetaVersion({})); iw->finalize(); auto dm_file_for_read_v1 = DMFile::restore( @@ -226,7 +226,7 @@ try .disagg_ctx = db_context->getSharedContextDisagg(), }); dm_file_for_write->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test2"; - ASSERT_EQ(2, dm_file_for_write->meta->bumpMetaVersion()); + ASSERT_EQ(2, dm_file_for_write->meta->bumpMetaVersion({})); iw->finalize(); // Current DMFile instance does not affect @@ -262,7 +262,7 @@ try .disagg_ctx = db_context->getSharedContextDisagg(), }); dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; - ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion({})); iw->finalize(); // Overwrite meta v1. @@ -282,7 +282,7 @@ try .disagg_ctx = db_context->getSharedContextDisagg(), }); dm_file_2->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test_overwrite"; - ASSERT_EQ(1, dm_file_2->meta->bumpMetaVersion()); + ASSERT_EQ(1, dm_file_2->meta->bumpMetaVersion({})); ASSERT_NO_THROW({ iw->finalize(); }); // No exception should be thrown because it may be a file left by previous writes but segment failed to update meta version. @@ -318,12 +318,12 @@ try .disagg_ctx = db_context->getSharedContextDisagg(), }); dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; - dm_file->meta->bumpMetaVersion(); + dm_file->meta->bumpMetaVersion({}); iw->finalize(); ASSERT_THROW({ iw->finalize(); }, DB::Exception); - dm_file->meta->bumpMetaVersion(); + dm_file->meta->bumpMetaVersion({}); ASSERT_THROW({ iw->finalize(); }, DB::Exception); } CATCH @@ -443,7 +443,7 @@ try .disagg_ctx = db_context->getSharedContextDisagg(), }); dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; - ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion({})); iw->finalize(); // Read out meta version = 0 @@ -491,7 +491,7 @@ try .disagg_ctx = db_context->getSharedContextDisagg(), }); dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; - ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion({})); iw->finalize(); { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index e722d2cb821..ab37b728f4c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -30,18 +31,21 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include #include #include +#include namespace CurrentMetrics @@ -135,6 +139,21 @@ class VectorIndexDMFileTest return new_dmfiles[0]; } + DMFilePtr buildMultiIndex(const LocalIndexInfosPtr & index_infos) + { + assert(index_infos != nullptr); + auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(index_infos, {dm_file}); + DMFileIndexWriter iw(DMFileIndexWriter::Options{ + .path_pool = path_pool, + .index_infos = build_info.indexes_to_build, + .dm_files = {dm_file}, + .dm_context = *dm_context, + }); + auto new_dmfiles = iw.build(); + assert(new_dmfiles.size() == 1); + return new_dmfiles[0]; + } + Context & dbContext() { return *db_context; } protected: @@ -390,7 +409,8 @@ try } catch (const DB::Exception & ex) { - ASSERT_STREQ("Query vector size 1 does not match index dimensions 3", ex.message().c_str()); + EXPECT_TRUE(ex.message().find("Query vector size 1 does not match index dimensions 3") != std::string::npos) + << ex.message(); } catch (...) { @@ -449,7 +469,10 @@ try } catch (const DB::Exception & ex) { - ASSERT_STREQ("Query distance metric COSINE does not match index distance metric L2", ex.message().c_str()); + EXPECT_TRUE( + ex.message().find("Query distance metric COSINE does not match index distance metric L2") + != std::string::npos) + << ex.message(); } catch (...) { @@ -486,6 +509,328 @@ try } CATCH +TEST_P(VectorIndexDMFileTest, OnePackWithMultipleVecIndexes) +try +{ + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto vec_cd = ColumnDefine(vec_column_id, vec_column_name, tests::typeFromString("Array(Float32)")); + cols->emplace_back(vec_cd); + + ColumnDefines read_cols = *cols; + if (test_only_vec_column) + read_cols = {vec_cd}; + + // Prepare DMFile + { + Block block = DMTestEnv::prepareSimpleWriteBlockWithNullable(0, 3); + block.insert( + createVecFloat32Column({{1.0, 2.0, 3.0}, {0.0, 0.0, 0.0}, {1.0, 2.0, 3.5}}, vec_cd.name, vec_cd.id)); + auto stream = std::make_shared(dbContext(), dm_file, *cols); + stream->writePrefix(); + stream->write(block, DMFileBlockOutputStream::BlockProperty{0, 0, 0, 0}); + stream->writeSuffix(); + } + + // Generate vec indexes + dm_file = restoreDMFile(); + auto index_infos = std::make_shared(LocalIndexInfos{ + // index with index_id == 3 + LocalIndexInfo{ + .type = IndexType::Vector, + .index_id = 3, + .column_id = vec_column_id, + .index_definition = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }), + }, + // index with index_id == 4 + LocalIndexInfo{ + .type = IndexType::Vector, + .index_id = 4, + .column_id = vec_column_id, + .index_definition = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::COSINE, + }), + }, + // index with index_id == EmptyIndexID, column_id = vec_column_id + LocalIndexInfo{ + .type = IndexType::Vector, + .index_id = EmptyIndexID, + .column_id = vec_column_id, + .index_definition = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }), + }, + }); + dm_file = buildMultiIndex(index_infos); + + { + EXPECT_TRUE(dm_file->isLocalIndexExist(vec_column_id, EmptyIndexID)); + EXPECT_TRUE(dm_file->isLocalIndexExist(vec_column_id, 3)); + EXPECT_TRUE(dm_file->isLocalIndexExist(vec_column_id, 4)); + } + + { + /// ===== index_id=3 ==== /// + + // Read with approximate match + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_index_id(3); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView::createWithFilter(3, true)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({2}), + createVecFloat32Column({{1.0, 2.0, 3.5}}), + })); + } + + // Read multiple rows + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_index_id(3); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + ann_query_info->set_top_k(2); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView::createWithFilter(3, true)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({0, 2}), + createVecFloat32Column({{1.0, 2.0, 3.0}, {1.0, 2.0, 3.5}}), + })); + } + + // Read with MVCC filter + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_index_id(3); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); + + auto bitmap_filter = std::make_shared(3, true); + bitmap_filter->set(/* start */ 2, /* limit */ 1, false); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView(bitmap_filter, 0, 3)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({0}), + createVecFloat32Column({{1.0, 2.0, 3.0}}), + })); + } + } + + { + /// ===== index_id=4 ==== /// + + // Read with approximate match + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_index_id(4); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::COSINE); + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView::createWithFilter(3, true)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({2}), + createVecFloat32Column({{1.0, 2.0, 3.5}}), + })); + } + + // Read multiple rows + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_index_id(4); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::COSINE); + ann_query_info->set_top_k(2); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView::createWithFilter(3, true)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({0, 2}), + createVecFloat32Column({{1.0, 2.0, 3.0}, {1.0, 2.0, 3.5}}), + })); + } + + // Read with MVCC filter + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_index_id(4); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::COSINE); + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); + + auto bitmap_filter = std::make_shared(3, true); + bitmap_filter->set(/* start */ 2, /* limit */ 1, false); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView(bitmap_filter, 0, 3)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({0}), + createVecFloat32Column({{1.0, 2.0, 3.0}}), + })); + } + } + + + { + /// ===== column_id=100, index_id not set ==== /// + + // Read with approximate match + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView::createWithFilter(3, true)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({2}), + createVecFloat32Column({{1.0, 2.0, 3.5}}), + })); + } + + // Read multiple rows + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + ann_query_info->set_top_k(2); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView::createWithFilter(3, true)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({0, 2}), + createVecFloat32Column({{1.0, 2.0, 3.0}, {1.0, 2.0, 3.5}}), + })); + } + + // Read with MVCC filter + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); + + auto bitmap_filter = std::make_shared(3, true); + bitmap_filter->set(/* start */ 2, /* limit */ 1, false); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView(bitmap_filter, 0, 3)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({0}), + createVecFloat32Column({{1.0, 2.0, 3.0}}), + })); + } + } +} +CATCH + TEST_P(VectorIndexDMFileTest, OnePackWithDuplicateVectors) try { @@ -1519,9 +1864,44 @@ class VectorIndexSegmentOnS3Test RUNTIME_CHECK(file_cache->getAll().empty()); auto dm_files = segment->getStable()->getDMFiles(); - auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(indexInfo(index_info), dm_files); + auto index_infos = std::make_shared(LocalIndexInfos{ + // index with index_id == 3 + LocalIndexInfo{ + .type = IndexType::Vector, + .index_id = 3, + .column_id = vec_column_id, + .index_definition = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::L2, + }), + }, + // index with index_id == 4 + LocalIndexInfo{ + .type = IndexType::Vector, + .index_id = 4, + .column_id = vec_column_id, + .index_definition = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::COSINE, + }), + }, + // index with index_id == EmptyIndexID, column_id = vec_column_id + LocalIndexInfo{ + .type = IndexType::Vector, + .index_id = EmptyIndexID, + .column_id = vec_column_id, + .index_definition = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::L2, + }), + }, + }); + auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(index_infos, dm_files); - // Build index + // Build multiple index DMFileIndexWriter iw(DMFileIndexWriter::Options{ .path_pool = storage_path_pool, .index_infos = build_info.indexes_to_build, @@ -1548,10 +1928,12 @@ class VectorIndexSegmentOnS3Test BlockInputStreamPtr computeNodeANNQuery( const std::vector ref_vec, + IndexID index_id, UInt32 top_k = 1, const ScanContextPtr & read_scan_context = nullptr) { auto ann_query_info = std::make_shared(); + ann_query_info->set_index_id(index_id); ann_query_info->set_column_id(vec_column_id); ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(top_k); @@ -1618,7 +2000,7 @@ try prepareWriteNodeStable(); FileCache::shutdown(); - auto stream = computeNodeANNQuery({5.0}); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID); try { @@ -1673,7 +2055,7 @@ try } { auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1695,7 +2077,7 @@ try // Read again, we should be reading from memory cache. auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1711,6 +2093,99 @@ try } CATCH +TEST_F(VectorIndexSegmentOnS3Test, ReadFromIndexWithMultipleVecIndexes) +try +{ + prepareWriteNodeStable(); + { + auto * file_cache = FileCache::instance(); + ASSERT_EQ(0, file_cache->getAll().size()); + } + { + // index_id == EmptyIndexID + IndexID query_index_id = EmptyIndexID; + { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, query_index_id, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + ASSERT_FALSE(std::filesystem::is_empty(file_cache->cache_dir)); + } + { + // Read again, we should be reading from memory cache. + + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, query_index_id, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 1); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 0); + } + } + { + // index_id == 3 + IndexID query_index_id = 3; + { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, query_index_id, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + ASSERT_FALSE(std::filesystem::is_empty(file_cache->cache_dir)); + } + { + // Read again, we should be reading from memory cache. + + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, query_index_id, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 1); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 0); + } + } +} +CATCH + TEST_F(VectorIndexSegmentOnS3Test, FileCacheEvict) try { @@ -1721,7 +2196,7 @@ try } { auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1752,7 +2227,7 @@ try { // When cache is evicted (but memory cache exists), the query should be fine. auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1769,7 +2244,7 @@ try // Read again, we should be reading from memory cache. auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1795,7 +2270,7 @@ try } { auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1831,7 +2306,7 @@ try { // When cache is evicted (and memory cache is dropped), the query should be fine. auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1848,7 +2323,7 @@ try // Read again, we should be reading from memory cache. auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1874,7 +2349,7 @@ try } { auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1898,7 +2373,7 @@ try { // Query should be fine. auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1915,7 +2390,7 @@ try // Read again, we should be reading from memory cache. auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1941,7 +2416,7 @@ try } { auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1971,7 +2446,7 @@ try { // Query should be fine. auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -1988,7 +2463,7 @@ try // Read again, we should be reading from memory cache. auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -2018,7 +2493,7 @@ try auto th_1 = std::async([&]() { auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -2040,7 +2515,7 @@ try auto th_2 = std::async([&]() { auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({7.0}, 1, scan_context); + auto stream = computeNodeANNQuery({7.0}, EmptyIndexID, 1, scan_context); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name, vec_column_name}), @@ -2083,7 +2558,7 @@ try } { auto scan_context = std::make_shared(); - auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + auto stream = computeNodeANNQuery({5.0}, EmptyIndexID, 1, scan_context); ASSERT_THROW( { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h index b28c6d91239..f01d08899a8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h @@ -86,8 +86,8 @@ class VectorIndexTestUtils const LocalIndexInfos index_infos = LocalIndexInfos{ LocalIndexInfo{ .type = IndexType::Vector, + .index_id = EmptyIndexID, .column_id = vec_column_id, - .column_name = "", .index_definition = std::make_shared(definition), }, }; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp index 1f767c45ac6..8caf59b0eb4 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp @@ -19,10 +19,10 @@ #include #include -namespace DB::tests +namespace DB::DM::tests { -TEST(LocalIndexInfo, CheckIndexChanged) +TEST(LocalIndexInfoTest, CheckIndexChanged) try { TiDB::TableInfo table_info; @@ -34,7 +34,7 @@ try } auto logger = Logger::get(); - DM::LocalIndexInfosPtr index_info = nullptr; + LocalIndexInfosPtr index_info = nullptr; // check the same { auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); @@ -69,7 +69,7 @@ try ASSERT_NE(new_index_info, nullptr); ASSERT_EQ(new_index_info->size(), 1); const auto & idx = (*new_index_info)[0]; - ASSERT_EQ(DM::IndexType::Vector, idx.type); + ASSERT_EQ(IndexType::Vector, idx.type); ASSERT_EQ(expect_idx.id, idx.index_id); ASSERT_EQ(100, idx.column_id); ASSERT_NE(nullptr, idx.index_definition); @@ -102,7 +102,7 @@ try ASSERT_NE(new_index_info, nullptr); ASSERT_EQ(new_index_info->size(), 2); const auto & idx0 = (*new_index_info)[0]; - ASSERT_EQ(DM::IndexType::Vector, idx0.type); + ASSERT_EQ(IndexType::Vector, idx0.type); ASSERT_EQ(expect_idx.id, idx0.index_id); ASSERT_EQ(100, idx0.column_id); ASSERT_NE(nullptr, idx0.index_definition); @@ -110,7 +110,7 @@ try ASSERT_EQ(expect_idx.vector_index->dimension, idx0.index_definition->dimension); ASSERT_EQ(expect_idx.vector_index->distance_metric, idx0.index_definition->distance_metric); const auto & idx1 = (*new_index_info)[1]; - ASSERT_EQ(DM::IndexType::Vector, idx1.type); + ASSERT_EQ(IndexType::Vector, idx1.type); ASSERT_EQ(expect_idx2.id, idx1.index_id); ASSERT_EQ(100, idx1.column_id); ASSERT_NE(nullptr, idx1.index_definition); @@ -146,7 +146,7 @@ try ASSERT_NE(new_index_info, nullptr); ASSERT_EQ(new_index_info->size(), 2); const auto & idx0 = (*new_index_info)[0]; - ASSERT_EQ(DM::IndexType::Vector, idx0.type); + ASSERT_EQ(IndexType::Vector, idx0.type); ASSERT_EQ(expect_idx.id, idx0.index_id); ASSERT_EQ(100, idx0.column_id); ASSERT_NE(nullptr, idx0.index_definition); @@ -154,7 +154,7 @@ try ASSERT_EQ(expect_idx.vector_index->dimension, idx0.index_definition->dimension); ASSERT_EQ(expect_idx.vector_index->distance_metric, idx0.index_definition->distance_metric); const auto & idx1 = (*new_index_info)[1]; - ASSERT_EQ(DM::IndexType::Vector, idx1.type); + ASSERT_EQ(IndexType::Vector, idx1.type); ASSERT_EQ(expect_idx3.id, idx1.index_id); ASSERT_EQ(100, idx1.column_id); ASSERT_NE(nullptr, idx1.index_definition); @@ -168,7 +168,7 @@ try } CATCH -TEST(LocalIndexInfo, CheckIndexAddWithVecIndexOnColumnInfo) +TEST(LocalIndexInfoTest, CheckIndexAddWithVecIndexOnColumnInfo) try { // The serverless branch, vector index may directly defined on the ColumnInfo. @@ -211,15 +211,15 @@ try // check the different auto logger = Logger::get(); - DM::LocalIndexInfosPtr index_info = nullptr; + LocalIndexInfosPtr index_info = nullptr; { auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); ASSERT_NE(new_index_info, nullptr); ASSERT_EQ(new_index_info->size(), 2); const auto & idx0 = (*new_index_info)[0]; - ASSERT_EQ(DM::IndexType::Vector, idx0.type); - ASSERT_EQ(EmptyIndexID, idx0.index_id); + ASSERT_EQ(IndexType::Vector, idx0.type); + ASSERT_EQ(EmptyIndexID, idx0.index_id); // defined on TiDB::ColumnInfo ASSERT_EQ(99, idx0.column_id); ASSERT_NE(nullptr, idx0.index_definition); ASSERT_EQ(col_vector_index->kind, idx0.index_definition->kind); @@ -227,7 +227,7 @@ try ASSERT_EQ(col_vector_index->distance_metric, idx0.index_definition->distance_metric); const auto & idx1 = (*new_index_info)[1]; - ASSERT_EQ(DM::IndexType::Vector, idx1.type); + ASSERT_EQ(IndexType::Vector, idx1.type); ASSERT_EQ(expect_idx.id, idx1.index_id); ASSERT_EQ(98, idx1.column_id); ASSERT_NE(nullptr, idx1.index_definition); @@ -235,8 +235,10 @@ try ASSERT_EQ(expect_idx.vector_index->dimension, idx1.index_definition->dimension); ASSERT_EQ(expect_idx.vector_index->distance_metric, idx1.index_definition->distance_metric); // check again, table_info.index_infos doesn't change and return them - DM::LocalIndexInfosPtr empty_index_info = nullptr; + LocalIndexInfosPtr empty_index_info = nullptr; ASSERT_EQ(2, generateLocalIndexInfos(empty_index_info, table_info, logger)->size()); + // check again with the same table_info, nothing changed, return nullptr + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); // update index_info = new_index_info; @@ -264,8 +266,8 @@ try ASSERT_EQ(new_index_info->size(), 2); const auto & idx0 = (*new_index_info)[0]; - ASSERT_EQ(DM::IndexType::Vector, idx0.type); - ASSERT_EQ(EmptyIndexID, idx0.index_id); + ASSERT_EQ(IndexType::Vector, idx0.type); + ASSERT_EQ(EmptyIndexID, idx0.index_id); // defined on TiDB::ColumnInfo ASSERT_EQ(99, idx0.column_id); ASSERT_NE(nullptr, idx0.index_definition); ASSERT_EQ(col_vector_index->kind, idx0.index_definition->kind); @@ -273,7 +275,7 @@ try ASSERT_EQ(col_vector_index->distance_metric, idx0.index_definition->distance_metric); const auto & idx1 = (*new_index_info)[1]; - ASSERT_EQ(DM::IndexType::Vector, idx1.type); + ASSERT_EQ(IndexType::Vector, idx1.type); ASSERT_EQ(expect_idx2.id, idx1.index_id); ASSERT_EQ(98, idx1.column_id); ASSERT_NE(nullptr, idx1.index_definition); @@ -287,4 +289,4 @@ try } CATCH -} // namespace DB::tests +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp index 0ae0248cb3c..189eb26dda4 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp @@ -103,7 +103,7 @@ class SegmentReplaceStableData RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); column_stats[::DB::TiDBPkColumnID].additional_data_for_test = pk_additiona_data; - new_dm_file->meta->bumpMetaVersion(); + new_dm_file->meta->bumpMetaVersion({}); iw->finalize(); new_dm_files.emplace_back(new_dm_file); @@ -572,7 +572,7 @@ try RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); column_stats[::DB::TiDBPkColumnID].additional_data_for_test = "tiflash_foo"; - new_dm_file->meta->bumpMetaVersion(); + new_dm_file->meta->bumpMetaVersion({}); iw->finalize(); auto lock = wn_segment->mustGetUpdateLock(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 732607589ec..7e2f4155c2c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -736,7 +736,7 @@ bool SegmentTestBasic::replaceSegmentStableData(PageIdU64 segment_id, const DMFi return success; } -bool SegmentTestBasic::ensureSegmentStableIndex(PageIdU64 segment_id, LocalIndexInfosPtr local_index_infos) +bool SegmentTestBasic::ensureSegmentStableIndex(PageIdU64 segment_id, const LocalIndexInfosPtr & local_index_infos) { LOG_INFO(logger_op, "EnsureSegmentStableIndex, segment_id={}", segment_id); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 6bbecb41593..fbcd1fb0834 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -104,7 +104,7 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic /** * Returns whether segment stable index is created. */ - bool ensureSegmentStableIndex(PageIdU64 segment_id, LocalIndexInfosPtr local_index_infos); + bool ensureSegmentStableIndex(PageIdU64 segment_id, const LocalIndexInfosPtr & local_index_infos); Block prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted = false); Block prepareWriteBlockInSegmentRange( diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index c810cdc7fc0..9ea59004bc5 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -638,6 +638,10 @@ FileType FileCache::getFileType(const String & fname) { return getFileTypeOfColData(p.stem()); } + else if (ext == ".vector") + { + return FileType::VectorIndex; + } else if (ext == ".meta") { // Example: v1.meta diff --git a/dbms/src/Storages/S3/FileCache.h b/dbms/src/Storages/S3/FileCache.h index efb5ae2aff7..971cdc3f10d 100644 --- a/dbms/src/Storages/S3/FileCache.h +++ b/dbms/src/Storages/S3/FileCache.h @@ -44,10 +44,15 @@ class FileSegment Failed, }; + // The smaller the enum value, the higher the cache priority. enum class FileType : UInt64 { Unknow = 0, Meta, + // Vector index is always stored as a separate file and requires to be read through `mmap` + // which must be downloaded to the local disk. + // So the priority of caching is relatively high + VectorIndex, Merged, Index, Mark, // .mkr, .null.mrk @@ -293,6 +298,7 @@ class FileCache static constexpr UInt64 estimated_size_of_file_type[] = { 0, // Unknow type, currently never cache it. 8 * 1024, // Estimated size of meta. + 12 * 1024 * 1024, // Estimated size of vector index 1 * 1024 * 1024, // Estimated size of merged. 8 * 1024, // Estimated size of index. 8 * 1024, // Estimated size of mark. diff --git a/dbms/src/Storages/S3/tests/gtest_filecache.cpp b/dbms/src/Storages/S3/tests/gtest_filecache.cpp index 567a1eb4725..e88571e79ab 100644 --- a/dbms/src/Storages/S3/tests/gtest_filecache.cpp +++ b/dbms/src/Storages/S3/tests/gtest_filecache.cpp @@ -432,6 +432,8 @@ try s3_fname, IDataType::getFileNameForStream(std::to_string(EXTRA_HANDLE_COLUMN_ID), {})); ASSERT_EQ(FileCache::getFileType(handle_fname), FileType::HandleColData); + auto vec_index_fname = fmt::format("{}/idx_{}.vector", s3_fname, /*index_id*/ 50); // DMFile::vectorIndexFileName + ASSERT_EQ(FileCache::getFileType(vec_index_fname), FileType::VectorIndex); auto version_fname = fmt::format("{}/{}.dat", s3_fname, IDataType::getFileNameForStream(std::to_string(VERSION_COLUMN_ID), {})); ASSERT_EQ(FileCache::getFileType(version_fname), FileType::VersionColData); @@ -444,12 +446,9 @@ try ASSERT_EQ(FileCache::getFileType(unknow_fname1), FileType::Unknow); { - const UInt64 cache_level = 0; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 0; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_FALSE(file_cache.canCache(FileType::Meta)); @@ -463,15 +462,13 @@ try ASSERT_FALSE(file_cache.canCache(FileType::ColData)); } { - const UInt64 cache_level = 1; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 1; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_FALSE(file_cache.canCache(FileType::VectorIndex)); ASSERT_FALSE(file_cache.canCache(FileType::Merged)); ASSERT_FALSE(file_cache.canCache(FileType::Index)); ASSERT_FALSE(file_cache.canCache(FileType::Mark)); @@ -482,15 +479,30 @@ try ASSERT_FALSE(file_cache.canCache(FileType::ColData)); } { - const UInt64 cache_level = 2; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 2; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_TRUE(file_cache.canCache(FileType::VectorIndex)); + ASSERT_FALSE(file_cache.canCache(FileType::Merged)); + ASSERT_FALSE(file_cache.canCache(FileType::Index)); + ASSERT_FALSE(file_cache.canCache(FileType::Mark)); + ASSERT_FALSE(file_cache.canCache(FileType::NullMap)); + ASSERT_FALSE(file_cache.canCache(FileType::DeleteMarkColData)); + ASSERT_FALSE(file_cache.canCache(FileType::VersionColData)); + ASSERT_FALSE(file_cache.canCache(FileType::HandleColData)); + ASSERT_FALSE(file_cache.canCache(FileType::ColData)); + } + { + UInt64 level = 3; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; + FileCache file_cache(capacity_metrics, cache_config); + ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); + ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_TRUE(file_cache.canCache(FileType::VectorIndex)); ASSERT_TRUE(file_cache.canCache(FileType::Merged)); ASSERT_FALSE(file_cache.canCache(FileType::Index)); ASSERT_FALSE(file_cache.canCache(FileType::Mark)); @@ -501,15 +513,13 @@ try ASSERT_FALSE(file_cache.canCache(FileType::ColData)); } { - const UInt64 cache_level = 3; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 4; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_TRUE(file_cache.canCache(FileType::VectorIndex)); ASSERT_TRUE(file_cache.canCache(FileType::Merged)); ASSERT_TRUE(file_cache.canCache(FileType::Index)); ASSERT_FALSE(file_cache.canCache(FileType::Mark)); @@ -520,15 +530,13 @@ try ASSERT_FALSE(file_cache.canCache(FileType::ColData)); } { - const UInt64 cache_level = 4; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 5; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_TRUE(file_cache.canCache(FileType::VectorIndex)); ASSERT_TRUE(file_cache.canCache(FileType::Merged)); ASSERT_TRUE(file_cache.canCache(FileType::Index)); ASSERT_TRUE(file_cache.canCache(FileType::Mark)); @@ -539,15 +547,13 @@ try ASSERT_FALSE(file_cache.canCache(FileType::ColData)); } { - const UInt64 cache_level = 5; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 6; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_TRUE(file_cache.canCache(FileType::VectorIndex)); ASSERT_TRUE(file_cache.canCache(FileType::Merged)); ASSERT_TRUE(file_cache.canCache(FileType::Index)); ASSERT_TRUE(file_cache.canCache(FileType::Mark)); @@ -558,15 +564,13 @@ try ASSERT_FALSE(file_cache.canCache(FileType::ColData)); } { - const UInt64 cache_level = 6; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 7; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_TRUE(file_cache.canCache(FileType::VectorIndex)); ASSERT_TRUE(file_cache.canCache(FileType::Merged)); ASSERT_TRUE(file_cache.canCache(FileType::Index)); ASSERT_TRUE(file_cache.canCache(FileType::Mark)); @@ -577,15 +581,13 @@ try ASSERT_FALSE(file_cache.canCache(FileType::ColData)); } { - const UInt64 cache_level = 7; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 8; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_TRUE(file_cache.canCache(FileType::VectorIndex)); ASSERT_TRUE(file_cache.canCache(FileType::Merged)); ASSERT_TRUE(file_cache.canCache(FileType::Index)); ASSERT_TRUE(file_cache.canCache(FileType::Mark)); @@ -596,15 +598,13 @@ try ASSERT_FALSE(file_cache.canCache(FileType::ColData)); } { - const UInt64 cache_level = 8; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 9; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_TRUE(file_cache.canCache(FileType::VectorIndex)); ASSERT_TRUE(file_cache.canCache(FileType::Merged)); ASSERT_TRUE(file_cache.canCache(FileType::Index)); ASSERT_TRUE(file_cache.canCache(FileType::Mark)); @@ -615,15 +615,13 @@ try ASSERT_FALSE(file_cache.canCache(FileType::ColData)); } { - const UInt64 cache_level = 9; - auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, cache_level); - StorageRemoteCacheConfig cache_config{ - .dir = cache_dir, - .capacity = cache_capacity, - .dtfile_level = cache_level}; + UInt64 level = 10; + auto cache_dir = fmt::format("{}/filetype{}", tmp_dir, level); + StorageRemoteCacheConfig cache_config{.dir = cache_dir, .capacity = cache_capacity, .dtfile_level = level}; FileCache file_cache(capacity_metrics, cache_config); ASSERT_FALSE(file_cache.canCache(FileType::Unknow)); ASSERT_TRUE(file_cache.canCache(FileType::Meta)); + ASSERT_TRUE(file_cache.canCache(FileType::VectorIndex)); ASSERT_TRUE(file_cache.canCache(FileType::Merged)); ASSERT_TRUE(file_cache.canCache(FileType::Index)); ASSERT_TRUE(file_cache.canCache(FileType::Mark)); diff --git a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp index 54b1a34446a..1a1daf2b447 100644 --- a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp +++ b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp @@ -119,7 +119,7 @@ BlockInputStreams StorageSystemDTLocalIndexes::read( res_columns[j++]->insert(table_id); res_columns[j++]->insert(table_info.belonging_table_id); - res_columns[j++]->insert(stat.column_name); + res_columns[j++]->insert(String("")); // TODO: let tidb set the column_name and index_name by itself res_columns[j++]->insert(stat.column_id); res_columns[j++]->insert(stat.index_id); res_columns[j++]->insert(stat.index_kind); diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index 90675aa00c0..eb41dd6e2af 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -833,6 +834,7 @@ try { // check stable index has built for all segments dmsv.waitStableIndexReady(); + LOG_INFO(Logger::get(), "waitStableIndexReady done"); const auto range = DM::RowKeyRange::newAll(dmsv.store->is_common_handle, dmsv.store->rowkey_column_size); // read from store @@ -847,11 +849,13 @@ try } auto ann_query_info = std::make_shared(); + ann_query_info->set_index_id(idx_id); ann_query_info->set_column_id(dmsv.vec_column_id); ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); // read with ANN query { + SCOPED_TRACE(fmt::format("after add vector index, read with ANN query 1")); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(dmsv.encodeVectorFloat32({1.0, 2.0, 3.5})); @@ -862,6 +866,7 @@ try // read with ANN query { + SCOPED_TRACE(fmt::format("after add vector index, read with ANN query 2")); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(dmsv.encodeVectorFloat32({1.0, 2.0, 3.8})); diff --git a/tests/fullstack-test2/vector/distance.test b/tests/fullstack-test2/vector/distance.test new file mode 100644 index 00000000000..e655c472249 --- /dev/null +++ b/tests/fullstack-test2/vector/distance.test @@ -0,0 +1,61 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Preparation. +mysql> drop table if exists test.t; + +mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL); +mysql> INSERT INTO test.t VALUES ('[8.7, 5.7, 7.7, 9.8, 1.5]'),('[3.6, 9.7, 2.4, 6.6, 4.9]'),('[4.7, 4.9, 2.6, 5.2, 7.4]'),('[7.7, 6.7, 8.3, 7.8, 5.7]'),('[1.4, 4.5, 8.5, 7.7, 6.2]'); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t + +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_L2_DISTANCE(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [1.4,4.5,8.5,7.7,6.2] | +| [4.7,4.9,2.6,5.2,7.4] | +| [7.7,6.7,8.3,7.8,5.7] | ++-----------------------+ + +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_COSINE_DISTANCE(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [1.4,4.5,8.5,7.7,6.2] | +| [7.7,6.7,8.3,7.8,5.7] | +| [4.7,4.9,2.6,5.2,7.4] | ++-----------------------+ + +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_NEGATIVE_INNER_PRODUCT(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [7.7,6.7,8.3,7.8,5.7] | +| [1.4,4.5,8.5,7.7,6.2] | +| [8.7,5.7,7.7,9.8,1.5] | ++-----------------------+ + +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_L1_DISTANCE(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [1.4,4.5,8.5,7.7,6.2] | +| [7.7,6.7,8.3,7.8,5.7] | +| [4.7,4.9,2.6,5.2,7.4] | ++-----------------------+ + + +# Cleanup +mysql> drop table if exists test.t diff --git a/tests/fullstack-test2/vector/vector-index.test b/tests/fullstack-test2/vector/vector-index.test new file mode 100644 index 00000000000..2018b4465db --- /dev/null +++ b/tests/fullstack-test2/vector/vector-index.test @@ -0,0 +1,105 @@ +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Preparation. +mysql> drop table if exists test.t; + +mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL); +mysql> INSERT INTO test.t VALUES ('[8.7, 5.7, 7.7, 9.8, 1.5]'),('[3.6, 9.7, 2.4, 6.6, 4.9]'),('[4.7, 4.9, 2.6, 5.2, 7.4]'),('[7.7, 6.7, 8.3, 7.8, 5.7]'),('[1.4, 4.5, 8.5, 7.7, 6.2]'); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t + +# build vector index with "L2" +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_L2_DISTANCE(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [1.4,4.5,8.5,7.7,6.2] | +| [4.7,4.9,2.6,5.2,7.4] | +| [7.7,6.7,8.3,7.8,5.7] | ++-----------------------+ +mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_l2 ((VEC_L2_DISTANCE(v))) USING HNSW; +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_L2_DISTANCE(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [1.4,4.5,8.5,7.7,6.2] | +| [4.7,4.9,2.6,5.2,7.4] | +| [7.7,6.7,8.3,7.8,5.7] | ++-----------------------+ + +# build vector index with "cosine" +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_COSINE_DISTANCE(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [1.4,4.5,8.5,7.7,6.2] | +| [7.7,6.7,8.3,7.8,5.7] | +| [4.7,4.9,2.6,5.2,7.4] | ++-----------------------+ +mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_cos ((VEC_COSINE_DISTANCE(v))) USING HNSW; +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_COSINE_DISTANCE(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [1.4,4.5,8.5,7.7,6.2] | +| [7.7,6.7,8.3,7.8,5.7] | +| [4.7,4.9,2.6,5.2,7.4] | ++-----------------------+ + +#TODO: support "negative inner product" and "L1" +#RETURN + +# build vector index with "negative inner product" +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_NEGATIVE_INNER_PRODUCT(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [7.7,6.7,8.3,7.8,5.7] | +| [1.4,4.5,8.5,7.7,6.2] | +| [8.7,5.7,7.7,9.8,1.5] | ++-----------------------+ +## FIXME: not yet support +mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_cos ((VEC_NEGATIVE_INNER_PRODUCT(v))) USING HNSW; +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_NEGATIVE_INNER_PRODUCT(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [7.7,6.7,8.3,7.8,5.7] | +| [1.4,4.5,8.5,7.7,6.2] | +| [8.7,5.7,7.7,9.8,1.5] | ++-----------------------+ + +# build vector index with "L1" +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_L1_DISTANCE(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [1.4,4.5,8.5,7.7,6.2] | +| [7.7,6.7,8.3,7.8,5.7] | +| [4.7,4.9,2.6,5.2,7.4] | ++-----------------------+ +## FIXME: not yet support +mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_cos ((VEC_L1_DISTANCE(v))) USING HNSW; +mysql> set tidb_isolation_read_engines='tiflash';SELECT * FROM test.t ORDER BY VEC_L1_DISTANCE(v, '[1.0,4.0,8.0,7.0,6.0]') LIMIT 3; ++-----------------------+ +| v | ++-----------------------+ +| [1.4,4.5,8.5,7.7,6.2] | +| [7.7,6.7,8.3,7.8,5.7] | +| [4.7,4.9,2.6,5.2,7.4] | ++-----------------------+ + +# Cleanup +mysql> drop table if exists test.t