diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 1c909b9b36c..50019e4d2b1 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -106,6 +106,7 @@ namespace DB M(proactive_flush_force_set_type) \ M(exception_when_fetch_disagg_pages) \ M(cop_send_failure) \ + M(file_cache_fg_download_fail) \ M(force_set_parallel_prehandle_threshold) \ M(force_raise_prehandle_exception) \ M(force_agg_on_partial_block) \ diff --git a/dbms/src/Common/LRUCache.h b/dbms/src/Common/LRUCache.h index 6d32fedde12..3961b46eee7 100644 --- a/dbms/src/Common/LRUCache.h +++ b/dbms/src/Common/LRUCache.h @@ -71,6 +71,14 @@ class LRUCache return res; } + /// Returns whether a specific key is in the LRU cache + /// without updating the LRU order. + bool contains(const Key & key) + { + std::lock_guard cache_lock(mutex); + return cells.contains(key); + } + void set(const Key & key, const MappedPtr & mapped) { std::scoped_lock cache_lock(mutex); diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 09b7d06f1f4..618e0cbe7ce 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -54,7 +54,7 @@ #include #include #include -#include +#include #include #include #include @@ -1388,13 +1388,13 @@ void Context::dropMinMaxIndexCache() const shared->minmax_index_cache->reset(); } -void Context::setVectorIndexCache(size_t cache_size_in_bytes) +void Context::setVectorIndexCache(size_t cache_entities) { auto lock = getLock(); RUNTIME_CHECK(!shared->vector_index_cache); - shared->vector_index_cache = std::make_shared(cache_size_in_bytes); + shared->vector_index_cache = std::make_shared(cache_entities); } DM::VectorIndexCachePtr Context::getVectorIndexCache() const @@ -1407,7 +1407,7 @@ void Context::dropVectorIndexCache() const { auto lock = getLock(); if (shared->vector_index_cache) - shared->vector_index_cache->reset(); + shared->vector_index_cache.reset(); } bool Context::isDeltaIndexLimited() const diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index c75c9b12ff7..1bc9a753c68 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -392,7 +392,7 @@ class Context std::shared_ptr getMinMaxIndexCache() const; void dropMinMaxIndexCache() const; - void setVectorIndexCache(size_t cache_size_in_bytes); + void setVectorIndexCache(size_t cache_entities); std::shared_ptr getVectorIndexCache() const; void dropVectorIndexCache() const; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index ec48b5e0661..ca0c5fc711a 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1439,10 +1439,10 @@ int Server::main(const std::vector & /*args*/) if (minmax_index_cache_size) global_context->setMinMaxIndexCache(minmax_index_cache_size); - // 1GiB vector index cache. - size_t vec_index_cache_size = config().getUInt64("vec_index_cache_size", 1ULL * 1024 * 1024 * 1024); - if (vec_index_cache_size) - global_context->setVectorIndexCache(vec_index_cache_size); + /// The vector index cache by number instead of bytes. Because it use `mmap` and let the operator system decide the memory usage. + size_t vec_index_cache_entities = config().getUInt64("vec_index_cache_entities", 1000); + if (vec_index_cache_entities) + global_context->setVectorIndexCache(vec_index_cache_entities); /// Size of max memory usage of DeltaIndex, used by DeltaMerge engine. /// - In non-disaggregated mode, its default value is 0, means unlimited, and it diff --git a/dbms/src/Storages/DeltaMerge/DMContext.h b/dbms/src/Storages/DeltaMerge/DMContext.h index fb0c80ad7c6..1a1d5266958 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.h +++ b/dbms/src/Storages/DeltaMerge/DMContext.h @@ -133,7 +133,8 @@ struct DMContext : private boost::noncopyable TableID physical_table_id_, bool is_common_handle_, size_t rowkey_column_size_, - const DB::Settings & settings) + const DB::Settings & settings, + const ScanContextPtr & scan_context = nullptr) { return std::unique_ptr(new DMContext( session_context_, @@ -145,7 +146,7 @@ struct DMContext : private boost::noncopyable is_common_handle_, rowkey_column_size_, settings, - nullptr, + scan_context, "")); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h index 0ebe68b8d28..98f86356d78 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h @@ -92,14 +92,14 @@ struct ColumnDefine /// Note: ColumnDefine is used in both Write path and Read path. /// In the read path, vector_index is usually not available. Use AnnQueryInfo for /// read related vector index information. - TiDB::VectorIndexInfoPtr vector_index; + TiDB::VectorIndexDefinitionPtr vector_index; explicit ColumnDefine( ColId id_ = 0, String name_ = "", DataTypePtr type_ = nullptr, Field default_value_ = Field{}, - TiDB::VectorIndexInfoPtr vector_index_ = nullptr) + TiDB::VectorIndexDefinitionPtr vector_index_ = nullptr) : id(id_) , name(std::move(name_)) , type(std::move(type_)) diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStat.h b/dbms/src/Storages/DeltaMerge/File/ColumnStat.h index 07a251388e7..f23b743ec77 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStat.h +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStat.h @@ -41,7 +41,7 @@ struct ColumnStat size_t array_sizes_bytes = 0; size_t array_sizes_mark_bytes = 0; - std::optional vector_index = std::nullopt; + std::optional vector_index = std::nullopt; dtpb::ColumnStat toProto() const { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp new file mode 100644 index 00000000000..a65a8b123bf --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp @@ -0,0 +1,480 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include + + +namespace DB::ErrorCodes +{ +extern const int S3_ERROR; +} // namespace DB::ErrorCodes + +namespace DB::DM +{ + +DMFileWithVectorIndexBlockInputStream::DMFileWithVectorIndexBlockInputStream( + const ANNQueryInfoPtr & ann_query_info_, + const DMFilePtr & dmfile_, + Block && header_layout_, + DMFileReader && reader_, + ColumnDefine && vec_cd_, + const FileProviderPtr & file_provider_, + const ReadLimiterPtr & read_limiter_, + const ScanContextPtr & scan_context_, + const VectorIndexCachePtr & vec_index_cache_, + const BitmapFilterView & valid_rows_, + const String & tracing_id) + : log(Logger::get(tracing_id)) + , ann_query_info(ann_query_info_) + , dmfile(dmfile_) + , header_layout(std::move(header_layout_)) + , reader(std::move(reader_)) + , vec_cd(std::move(vec_cd_)) + , file_provider(file_provider_) + , read_limiter(read_limiter_) + , scan_context(scan_context_) + , vec_index_cache(vec_index_cache_) + , valid_rows(valid_rows_) +{ + RUNTIME_CHECK(ann_query_info); + RUNTIME_CHECK(vec_cd.id == ann_query_info->column_id()); + for (const auto & cd : reader.read_columns) + { + RUNTIME_CHECK(header_layout.has(cd.name), cd.name); + RUNTIME_CHECK(cd.id != vec_cd.id); + } + RUNTIME_CHECK(header_layout.has(vec_cd.name)); + RUNTIME_CHECK(header_layout.columns() == reader.read_columns.size() + 1); + + // Fill start_offset_to_pack_id + const auto & pack_stats = dmfile->getPackStats(); + start_offset_to_pack_id.reserve(pack_stats.size()); + UInt32 start_offset = 0; + for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id) + { + start_offset_to_pack_id[start_offset] = pack_id; + start_offset += pack_stats[pack_id].rows; + } + + // Fill header + header = toEmptyBlock(reader.read_columns); + addColumnToBlock(header, vec_cd.id, vec_cd.name, vec_cd.type, vec_cd.type->createColumn(), vec_cd.default_value); +} + +DMFileWithVectorIndexBlockInputStream::~DMFileWithVectorIndexBlockInputStream() +{ + if (!vec_column_reader) + return; + + scan_context->total_vector_idx_read_vec_time_ms += static_cast(duration_read_from_vec_index_seconds * 1000); + scan_context->total_vector_idx_read_others_time_ms + += static_cast(duration_read_from_other_columns_seconds * 1000); + + LOG_DEBUG( // + log, + "Finished read DMFile with vector index for column dmf_{}/{}(id={}), " + "query_top_k={} load_index+result={:.2f}s read_from_index={:.2f}s read_from_others={:.2f}s", + dmfile->fileId(), + vec_cd.name, + vec_cd.id, + ann_query_info->top_k(), + duration_load_vec_index_and_results_seconds, + duration_read_from_vec_index_seconds, + duration_read_from_other_columns_seconds); +} + + +Block DMFileWithVectorIndexBlockInputStream::read(FilterPtr & res_filter, bool return_filter) +{ + if (return_filter) + return readImpl(res_filter); + + // If return_filter == false, we must filter by ourselves. + + FilterPtr filter = nullptr; + auto res = readImpl(filter); + if (filter != nullptr) + { + for (auto & col : res) + col.column = col.column->filter(*filter, -1); + } + // filter == nullptr means all rows are valid and no need to filter. + + return res; +} + +Block DMFileWithVectorIndexBlockInputStream::readImpl(FilterPtr & res_filter) +{ + load(); + + Block res; + if (!reader.read_columns.empty()) + res = readByFollowingOtherColumns(); + else + res = readByIndexReader(); + + if (!res) + return {}; + + // Assign output filter according to sorted_results. + // + // For example, if sorted_results is [3, 10], the complete filter array is: + // [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1] + // And we should only return filter array starting from res.startOffset(), like: + // [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1] + // ^startOffset ^startOffset+rows + // filter: [0, 0, 0, 0, 0] + // + // We will use startOffset as lowerBound (inclusive), ans startOffset+rows + // as upperBound (exclusive) to find whether this range has a match in sorted_results. + + const auto start_offset = res.startOffset(); + const auto max_rowid_exclusive = start_offset + res.rows(); + + filter.clear(); + filter.resize_fill(res.rows(), 0); + + auto it = std::lower_bound(sorted_results.begin(), sorted_results.end(), start_offset); + while (it != sorted_results.end()) + { + auto rowid = *it; + if (rowid >= max_rowid_exclusive) + break; + filter[rowid - start_offset] = 1; + ++it; + } + + res_filter = &filter; + return res; +} + +Block DMFileWithVectorIndexBlockInputStream::readByIndexReader() +{ + const auto & pack_stats = dmfile->getPackStats(); + size_t all_packs = pack_stats.size(); + const auto & pack_res = reader.pack_filter.getPackResConst(); + + RUNTIME_CHECK(pack_res.size() == all_packs); + + // Skip as many packs as possible according to Pack Filter + while (index_reader_next_pack_id < all_packs) + { + if (pack_res[index_reader_next_pack_id].isUse()) + break; + index_reader_next_row_id += pack_stats[index_reader_next_pack_id].rows; + index_reader_next_pack_id++; + } + + if (index_reader_next_pack_id >= all_packs) + // Finished + return {}; + + auto read_pack_id = index_reader_next_pack_id; + auto block_start_row_id = index_reader_next_row_id; + auto read_rows = pack_stats[read_pack_id].rows; + + index_reader_next_pack_id++; + index_reader_next_row_id += read_rows; + + Block block; + block.setStartOffset(block_start_row_id); + + auto vec_column = vec_cd.type->createColumn(); + vec_column->reserve(read_rows); + + Stopwatch w; + vec_column_reader->read(vec_column, read_pack_id, read_rows); + duration_read_from_vec_index_seconds += w.elapsedSeconds(); + + block.insert(ColumnWithTypeAndName{std::move(vec_column), vec_cd.type, vec_cd.name, vec_cd.id}); + + return block; +} + +Block DMFileWithVectorIndexBlockInputStream::readByFollowingOtherColumns() +{ + // First read other columns. + Stopwatch w; + auto block_others = reader.read(); + duration_read_from_other_columns_seconds += w.elapsedSeconds(); + + if (!block_others) + return {}; + + auto read_rows = block_others.rows(); + + // Using vec_cd.type to construct a Column directly instead of using + // the type from dmfile, so that we don't need extra transforms + // (e.g. wrap with a Nullable). vec_column_reader is compatible with + // both Nullable and NotNullable. + auto vec_column = vec_cd.type->createColumn(); + vec_column->reserve(read_rows); + + // Then read from vector index for the same pack. + w.restart(); + + vec_column_reader->read(vec_column, getPackIdFromBlock(block_others), read_rows); + duration_read_from_vec_index_seconds += w.elapsedSeconds(); + + // Re-assemble block using the same layout as header_layout. + Block res = header_layout.cloneEmpty(); + // Note: the start offset counts from the beginning of THIS dmfile. It + // is not a global offset. + res.setStartOffset(block_others.startOffset()); + for (const auto & elem : block_others) + { + RUNTIME_CHECK(res.has(elem.name)); + res.getByName(elem.name).column = std::move(elem.column); + } + RUNTIME_CHECK(res.has(vec_cd.name)); + res.getByName(vec_cd.name).column = std::move(vec_column); + + return res; +} + +void DMFileWithVectorIndexBlockInputStream::load() +{ + if (loaded) + return; + + Stopwatch w; + + loadVectorIndex(); + loadVectorSearchResult(); + + duration_load_vec_index_and_results_seconds = w.elapsedSeconds(); + + loaded = true; +} + +void DMFileWithVectorIndexBlockInputStream::loadVectorIndex() +{ + bool has_s3_download = false; + bool has_load_from_file = false; + + double duration_load_index = 0; // include download from s3 and load from fs + + auto col_id = ann_query_info->column_id(); + + RUNTIME_CHECK(dmfile->useMetaV2()); // v3 + + // Check vector index exists on the column + const auto & column_stat = dmfile->getColumnStat(col_id); + RUNTIME_CHECK(column_stat.index_bytes > 0); + RUNTIME_CHECK(column_stat.vector_index.has_value()); + + // If local file is invalidated, cache is not valid anymore. So we + // need to ensure file exists on local fs first. + const auto file_name_base = DMFile::getFileNameBase(col_id); + const auto index_file_path = dmfile->colIndexPath(file_name_base); + String local_index_file_path; + FileSegmentPtr file_guard = nullptr; + if (auto s3_file_name = S3::S3FilenameView::fromKeyWithPrefix(index_file_path); s3_file_name.isValid()) + { + // Disaggregated mode + auto * file_cache = FileCache::instance(); + RUNTIME_CHECK_MSG(file_cache, "Must enable S3 file cache to use vector index"); + + Stopwatch watch; + + auto perf_begin = PerfContext::file_cache; + + // If download file failed, retry a few times. + for (auto i = 3; i > 0; --i) + { + try + { + file_guard = file_cache->downloadFileForLocalRead( // + s3_file_name, + column_stat.index_bytes); + if (file_guard) + { + local_index_file_path = file_guard->getLocalFileName(); + break; // Successfully downloaded index into local cache + } + + throw Exception( // + ErrorCodes::S3_ERROR, + "Failed to download vector index file {}", + index_file_path); + } + catch (...) + { + if (i <= 1) + throw; + } + } + + if ( // + PerfContext::file_cache.fg_download_from_s3 > perf_begin.fg_download_from_s3 || // + PerfContext::file_cache.fg_wait_download_from_s3 > perf_begin.fg_wait_download_from_s3) + has_s3_download = true; + + auto download_duration = watch.elapsedSeconds(); + duration_load_index += download_duration; + } + else + { + // Not disaggregated mode + local_index_file_path = index_file_path; + } + + auto load_from_file = [&]() { + has_load_from_file = true; + return VectorIndexViewer::view(*column_stat.vector_index, local_index_file_path); + }; + + Stopwatch watch; + if (vec_index_cache) + // Note: must use local_index_file_path as the cache key, because cache + // will check whether file is still valid and try to remove memory references + // when file is dropped. + vec_index = vec_index_cache->getOrSet(local_index_file_path, load_from_file); + else + vec_index = load_from_file(); + + duration_load_index += watch.elapsedSeconds(); + RUNTIME_CHECK(vec_index != nullptr); + + scan_context->total_vector_idx_load_time_ms += static_cast(duration_load_index * 1000); + if (has_s3_download) + // it could be possible that s3=true but load_from_file=false, it means we download a file + // and then reuse the memory cache. The majority time comes from s3 download + // so we still count it as s3 download. + scan_context->total_vector_idx_load_from_s3++; + else if (has_load_from_file) + scan_context->total_vector_idx_load_from_disk++; + else + scan_context->total_vector_idx_load_from_cache++; + + LOG_DEBUG( // + log, + "Loaded vector index for column dmf_{}/{}(id={}), index_size={} kind={} cost={:.2f}s {} {}", + dmfile->fileId(), + vec_cd.name, + vec_cd.id, + column_stat.index_bytes, + column_stat.vector_index->index_kind(), + duration_load_index, + has_s3_download ? "(S3)" : "", + has_load_from_file ? "(LoadFile)" : ""); +} + +void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() +{ + Stopwatch watch; + + auto perf_begin = PerfContext::vector_search; + + RUNTIME_CHECK(valid_rows.size() >= dmfile->getRows(), valid_rows.size(), dmfile->getRows()); + sorted_results = vec_index->search(ann_query_info, valid_rows); + std::sort(sorted_results.begin(), sorted_results.end()); + // results must not contain duplicates. Usually there should be no duplicates. + sorted_results.erase(std::unique(sorted_results.begin(), sorted_results.end()), sorted_results.end()); + + auto discarded_nodes = PerfContext::vector_search.discarded_nodes - perf_begin.discarded_nodes; + auto visited_nodes = PerfContext::vector_search.visited_nodes - perf_begin.visited_nodes; + + double search_duration = watch.elapsedSeconds(); + scan_context->total_vector_idx_search_time_ms += static_cast(search_duration * 1000); + scan_context->total_vector_idx_search_discarded_nodes += discarded_nodes; + scan_context->total_vector_idx_search_visited_nodes += visited_nodes; + + vec_column_reader = std::make_shared(dmfile, vec_index, sorted_results); + + // Vector index is very likely to filter out some packs. For example, + // if we query for Top 1, then only 1 pack will be remained. So we + // update the pack filter used by the DMFileReader to avoid reading + // unnecessary data for other columns. + size_t valid_packs_before_search = 0; + size_t valid_packs_after_search = 0; + const auto & pack_stats = dmfile->getPackStats(); + auto & pack_res = reader.pack_filter.getPackRes(); + + size_t results_it = 0; + const size_t results_it_max = sorted_results.size(); + + UInt32 pack_start = 0; + + for (size_t pack_id = 0, pack_id_max = dmfile->getPacks(); pack_id < pack_id_max; pack_id++) + { + if (pack_res[pack_id].isUse()) + ++valid_packs_before_search; + + bool pack_has_result = false; + + UInt32 pack_end = pack_start + pack_stats[pack_id].rows; + while (results_it < results_it_max // + && sorted_results[results_it] >= pack_start // + && sorted_results[results_it] < pack_end) + { + pack_has_result = true; + results_it++; + } + + if (!pack_has_result) + pack_res[pack_id] = RSResult::None; + + if (pack_res[pack_id].isUse()) + ++valid_packs_after_search; + + pack_start = pack_end; + } + + RUNTIME_CHECK_MSG(results_it == results_it_max, "All packs has been visited but not all results are consumed"); + + LOG_DEBUG( // + log, + "Finished vector search over column dmf_{}/{}(id={}), cost={:.2f}s " + "top_k_[query/visited/discarded/result]={}/{}/{}/{} " + "rows_[file/after_search]={}/{} " + "pack_[total/before_search/after_search]={}/{}/{}", + + dmfile->fileId(), + vec_cd.name, + vec_cd.id, + search_duration, + + ann_query_info->top_k(), + visited_nodes, // Visited nodes will be larger than query_top_k when there are MVCC rows + discarded_nodes, // How many nodes are skipped by MVCC + sorted_results.size(), + + dmfile->getRows(), + sorted_results.size(), + + pack_stats.size(), + valid_packs_before_search, + valid_packs_after_search); +} + +UInt32 DMFileWithVectorIndexBlockInputStream::getPackIdFromBlock(const Block & block) +{ + // The start offset of a block is ensured to be aligned with the pack. + // This is how we know which pack the block comes from. + auto start_offset = block.startOffset(); + auto it = start_offset_to_pack_id.find(start_offset); + RUNTIME_CHECK(it != start_offset_to_pack_id.end()); + return it->second; +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h index 7839752e0bb..6f82fb293d8 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h @@ -14,12 +14,12 @@ #pragma once -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include #include @@ -91,72 +91,9 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream const ScanContextPtr & scan_context_, const VectorIndexCachePtr & vec_index_cache_, const BitmapFilterView & valid_rows_, - const String & tracing_id) - : log(Logger::get(tracing_id)) - , ann_query_info(ann_query_info_) - , dmfile(dmfile_) - , header_layout(std::move(header_layout_)) - , reader(std::move(reader_)) - , vec_cd(std::move(vec_cd_)) - , file_provider(file_provider_) - , read_limiter(read_limiter_) - , scan_context(scan_context_) - , vec_index_cache(vec_index_cache_) - , valid_rows(valid_rows_) - { - RUNTIME_CHECK(ann_query_info); - RUNTIME_CHECK(vec_cd.id == ann_query_info->column_id()); - for (const auto & cd : reader.read_columns) - { - RUNTIME_CHECK(header_layout.has(cd.name), cd.name); - RUNTIME_CHECK(cd.id != vec_cd.id); - } - RUNTIME_CHECK(header_layout.has(vec_cd.name)); - RUNTIME_CHECK(header_layout.columns() == reader.read_columns.size() + 1); - - // Fill start_offset_to_pack_id - const auto & pack_stats = dmfile->getPackStats(); - start_offset_to_pack_id.reserve(pack_stats.size()); - UInt32 start_offset = 0; - for (size_t pack_id = 0, pack_id_max = pack_stats.size(); pack_id < pack_id_max; pack_id++) - { - start_offset_to_pack_id[start_offset] = pack_id; - start_offset += pack_stats[pack_id].rows; - } - - // Fill header - header = toEmptyBlock(reader.read_columns); - addColumnToBlock( - header, - vec_cd.id, - vec_cd.name, - vec_cd.type, - vec_cd.type->createColumn(), - vec_cd.default_value); - } - - ~DMFileWithVectorIndexBlockInputStream() override - { - if (!vec_column_reader) - return; - - scan_context->total_vector_idx_read_vec_time_ms - += static_cast(duration_read_from_vec_index_seconds * 1000); - scan_context->total_vector_idx_read_others_time_ms - += static_cast(duration_read_from_other_columns_seconds * 1000); + const String & tracing_id); - LOG_DEBUG( // - log, - "Finished read DMFile with vector index for column dmf_{}/{}(id={}), " - "query_top_k={} load_index+result={:.2f}s read_from_index={:.2f}s read_from_others={:.2f}s", - dmfile->fileId(), - vec_cd.name, - vec_cd.id, - ann_query_info->top_k(), - duration_load_vec_index_and_results_seconds, - duration_read_from_vec_index_seconds, - duration_read_from_other_columns_seconds); - } + ~DMFileWithVectorIndexBlockInputStream() override; public: Block read() override @@ -168,48 +105,12 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream // When all rows in block are not filtered out, // `res_filter` will be set to null. // The caller needs to do handle this situation. - Block read(FilterPtr & res_filter, bool return_filter) override - { - if (return_filter) - return readImpl(res_filter); - - // If return_filter == false, we must filter by ourselves. - - FilterPtr filter = nullptr; - auto res = readImpl(filter); - if (filter != nullptr) - { - for (auto & col : res) - col.column = col.column->filter(*filter, -1); - } - // filter == nullptr means all rows are valid and no need to filter. - - return res; - } + Block read(FilterPtr & res_filter, bool return_filter) override; // When all rows in block are not filtered out, // `res_filter` will be set to null. // The caller needs to do handle this situation. - Block readImpl(FilterPtr & res_filter) - { - load(); - - Block res; - if (!reader.read_columns.empty()) - res = readByFollowingOtherColumns(); - else - res = readByIndexReader(); - - // Filter the output rows. If no rows need to filter, res_filter is nullptr. - filter.resize(res.rows()); - bool all_match = valid_rows_after_search.get(filter, res.startOffset(), res.rows()); - - if unlikely (all_match) - res_filter = nullptr; - else - res_filter = &filter; - return res; - } + Block readImpl(FilterPtr & res_filter); bool getSkippedRows(size_t &) override { @@ -240,323 +141,20 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream // Read data totally from the VectorColumnFromIndexReader. This is used // when there is no other column to read. - Block readByIndexReader() - { - const auto & pack_stats = dmfile->getPackStats(); - size_t all_packs = pack_stats.size(); - const auto & use_packs = reader.pack_filter.getPackResConst(); - - RUNTIME_CHECK(use_packs.size() == all_packs); - - // Skip as many packs as possible according to Pack Filter - while (index_reader_next_pack_id < all_packs) - { - if (use_packs[index_reader_next_pack_id] != RSResult::None) - break; - index_reader_next_row_id += pack_stats[index_reader_next_pack_id].rows; - index_reader_next_pack_id++; - } - - if (index_reader_next_pack_id >= all_packs) - // Finished - return {}; - - auto read_pack_id = index_reader_next_pack_id; - auto block_start_row_id = index_reader_next_row_id; - auto read_rows = pack_stats[read_pack_id].rows; - - index_reader_next_pack_id++; - index_reader_next_row_id += read_rows; - - Block block; - block.setStartOffset(block_start_row_id); - - auto vec_column = vec_cd.type->createColumn(); - - Stopwatch w; - vec_column_reader->read(vec_column, read_pack_id, read_rows); - duration_read_from_vec_index_seconds += w.elapsedSeconds(); - - block.insert(ColumnWithTypeAndName{// - std::move(vec_column), - vec_cd.type, - vec_cd.name, - vec_cd.id}); - - return block; - } + Block readByIndexReader(); // Read data from other columns first, then read from VectorColumnFromIndexReader. This is used // when there are other columns to read. - Block readByFollowingOtherColumns() - { - // First read other columns. - Stopwatch w; - auto block_others = reader.read(); - duration_read_from_other_columns_seconds += w.elapsedSeconds(); - - if (!block_others) - return {}; - - // Using vec_cd.type to construct a Column directly instead of using - // the type from dmfile, so that we don't need extra transforms - // (e.g. wrap with a Nullable). vec_column_reader is compatible with - // both Nullable and NotNullable. - auto vec_column = vec_cd.type->createColumn(); - - // Then read from vector index for the same pack. - w.restart(); - - vec_column_reader->read(vec_column, getPackIdFromBlock(block_others), block_others.rows()); - duration_read_from_vec_index_seconds += w.elapsedSeconds(); - - // Re-assemble block using the same layout as header_layout. - Block res = header_layout.cloneEmpty(); - // Note: the start offset counts from the beginning of THIS dmfile. It - // is not a global offset. - res.setStartOffset(block_others.startOffset()); - for (const auto & elem : block_others) - { - RUNTIME_CHECK(res.has(elem.name)); - res.getByName(elem.name).column = std::move(elem.column); - } - RUNTIME_CHECK(res.has(vec_cd.name)); - res.getByName(vec_cd.name).column = std::move(vec_column); - - return res; - } + Block readByFollowingOtherColumns(); private: - void load() - { - if (loaded) - return; - - Stopwatch w; - - loadVectorIndex(); - loadVectorSearchResult(); - - duration_load_vec_index_and_results_seconds = w.elapsedSeconds(); - - loaded = true; - } + void load(); - void loadVectorIndex() - { - bool is_index_load_from_cache = true; - - auto col_id = ann_query_info->column_id(); - - RUNTIME_CHECK(dmfile->useMetaV2()); // v3 - - // Check vector index exists on the column - const auto & column_stat = dmfile->getColumnStat(col_id); - RUNTIME_CHECK(column_stat.index_bytes > 0); - - const auto & type = column_stat.type; - RUNTIME_CHECK(VectorIndex::isSupportedType(*type)); - RUNTIME_CHECK(column_stat.vector_index.has_value()); - - const auto file_name_base = DMFile::getFileNameBase(col_id); - auto load_vector_index = [&]() { - is_index_load_from_cache = false; - - auto index_guard = S3::S3RandomAccessFile::setReadFileInfo( - {dmfile->getReadFileSize(col_id, colIndexFileName(file_name_base)), scan_context}); - - auto * dmfile_meta = typeid_cast(dmfile->meta.get()); - assert(dmfile_meta != nullptr); - - auto info = dmfile_meta->merged_sub_file_infos.find(colIndexFileName(file_name_base)); - if (info == dmfile_meta->merged_sub_file_infos.end()) - { - throw Exception( - fmt::format("Unknown index file {}", dmfile->colIndexPath(file_name_base)), - ErrorCodes::LOGICAL_ERROR); - } - - auto file_path = dmfile_meta->mergedPath(info->second.number); - auto encryp_path = dmfile_meta->encryptionMergedPath(info->second.number); - auto offset = info->second.offset; - auto data_size = info->second.size; - - auto buffer = ReadBufferFromRandomAccessFileBuilder::build( - file_provider, - file_path, - encryp_path, - dmfile->getConfiguration()->getChecksumFrameLength(), - read_limiter); - buffer.seek(offset); - - // TODO(vector-index): Read from file directly? - String raw_data; - raw_data.resize(data_size); - buffer.read(reinterpret_cast(raw_data.data()), data_size); - - auto buf = ChecksumReadBufferBuilder::build( - std::move(raw_data), - dmfile->colDataPath(file_name_base), - dmfile->getConfiguration()->getChecksumFrameLength(), - dmfile->getConfiguration()->getChecksumAlgorithm(), - dmfile->getConfiguration()->getChecksumFrameLength()); - - auto index_kind = magic_enum::enum_cast(column_stat.vector_index->index_kind()); - RUNTIME_CHECK(index_kind.has_value()); - RUNTIME_CHECK(index_kind.value() != TiDB::VectorIndexKind::INVALID); - - auto index_distance_metric - = magic_enum::enum_cast(column_stat.vector_index->distance_metric()); - RUNTIME_CHECK(index_distance_metric.has_value()); - RUNTIME_CHECK(index_distance_metric.value() != TiDB::DistanceMetric::INVALID); - - auto index = VectorIndex::load(index_kind.value(), index_distance_metric.value(), *buf); - return index; - }; - - Stopwatch watch; - - if (vec_index_cache) - { - // TODO(vector-index): Is cache key valid on Compute Node for different Write Nodes? - vec_index = vec_index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load_vector_index); - } - else - { - // try load from the cache first - if (vec_index_cache) - vec_index = vec_index_cache->get(dmfile->colIndexCacheKey(file_name_base)); - if (vec_index == nullptr) - vec_index = load_vector_index(); - } - - double duration_load_index = watch.elapsedSeconds(); - RUNTIME_CHECK(vec_index != nullptr); - scan_context->total_vector_idx_load_time_ms += static_cast(duration_load_index * 1000); - if (is_index_load_from_cache) - scan_context->total_vector_idx_load_from_cache++; - else - scan_context->total_vector_idx_load_from_disk++; - - LOG_DEBUG( // - log, - "Loaded vector index for column dmf_{}/{}(id={}), index_size={} kind={} cost={:.2f}s from_cache={}", - dmfile->fileId(), - vec_cd.name, - vec_cd.id, - column_stat.index_bytes, - column_stat.vector_index->index_kind(), - duration_load_index, - is_index_load_from_cache); - } - - void loadVectorSearchResult() - { - Stopwatch watch; + void loadVectorIndex(); - VectorIndex::SearchStatistics statistics; - auto results_rowid = vec_index->search(ann_query_info, valid_rows, statistics); + void loadVectorSearchResult(); - double search_duration = watch.elapsedSeconds(); - scan_context->total_vector_idx_search_time_ms += static_cast(search_duration * 1000); - scan_context->total_vector_idx_search_discarded_nodes += statistics.discarded_nodes; - scan_context->total_vector_idx_search_visited_nodes += statistics.visited_nodes; - - size_t rows_after_mvcc = valid_rows.count(); - size_t rows_after_vector_search = results_rowid.size(); - - // After searching with the BitmapFilter, we create a bitmap - // to exclude rows that are not in the search result, because these rows - // are produced as [] or NULL, which is not a valid vector for future use. - // The bitmap will be used when returning the output to the caller. - { - valid_rows_after_search = BitmapFilter(valid_rows.size(), false); - for (auto rowid : results_rowid) - valid_rows_after_search.set(rowid, 1, true); - valid_rows_after_search.runOptimize(); - } - - vec_column_reader = std::make_shared( // - dmfile, - vec_index, - std::move(results_rowid)); - - // Vector index is very likely to filter out some packs. For example, - // if we query for Top 1, then only 1 pack will be remained. So we - // update the pack filter used by the DMFileReader to avoid reading - // unnecessary data for other columns. - size_t valid_packs_before_search = 0; - size_t valid_packs_after_search = 0; - const auto & pack_stats = dmfile->getPackStats(); - auto & packs_res = reader.pack_filter.getPackRes(); - - size_t results_it = 0; - const size_t results_it_max = results_rowid.size(); - - UInt32 pack_start = 0; - - for (size_t pack_id = 0, pack_id_max = dmfile->getPacks(); pack_id < pack_id_max; pack_id++) - { - if (packs_res[pack_id] != RSResult::None) - valid_packs_before_search++; - - bool pack_has_result = false; - - UInt32 pack_end = pack_start + pack_stats[pack_id].rows; - while (results_it < results_it_max // - && results_rowid[results_it] >= pack_start // - && results_rowid[results_it] < pack_end) - { - pack_has_result = true; - results_it++; - } - - if (!pack_has_result) - packs_res[pack_id] = RSResult::None; - - if (packs_res[pack_id] != RSResult::None) - valid_packs_after_search++; - - pack_start = pack_end; - } - - RUNTIME_CHECK_MSG(results_it == results_it_max, "All packs has been visited but not all results are consumed"); - - LOG_DEBUG( // - log, - "Finished vector search over column dmf_{}/{}(id={}), cost={:.2f}s " - "top_k_[query/visited/discarded/result]={}/{}/{}/{} " - "rows_[file/after_mvcc/after_search]={}/{}/{} " - "pack_[total/before_search/after_search]={}/{}/{}", - - dmfile->fileId(), - vec_cd.name, - vec_cd.id, - search_duration, - - ann_query_info->top_k(), - statistics.visited_nodes, // Visited nodes will be larger than query_top_k when there are MVCC rows - statistics.discarded_nodes, // How many nodes are skipped by MVCC - results_rowid.size(), - - dmfile->getRows(), - rows_after_mvcc, - rows_after_vector_search, - - pack_stats.size(), - valid_packs_before_search, - valid_packs_after_search); - } - - inline UInt32 getPackIdFromBlock(const Block & block) - { - // The start offset of a block is ensured to be aligned with the pack. - // This is how we know which pack the block comes from. - auto start_offset = block.startOffset(); - auto it = start_offset_to_pack_id.find(start_offset); - RUNTIME_CHECK(it != start_offset_to_pack_id.end()); - return it->second; - } + UInt32 getPackIdFromBlock(const Block & block); private: const LoggerPtr log; @@ -581,11 +179,11 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream std::unordered_map start_offset_to_pack_id; // Filled from reader in constructor // Set after load(). - VectorIndexPtr vec_index = nullptr; + VectorIndexViewerPtr vec_index = nullptr; // Set after load(). VectorColumnFromIndexReaderPtr vec_column_reader = nullptr; // Set after load(). Used to filter the output rows. - BitmapFilter valid_rows_after_search{0, false}; + std::vector sorted_results{}; // Key is rowid IColumn::Filter filter; bool loaded = false; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index eabb5b8d287..9b3c5bc36ab 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -18,9 +18,9 @@ #include #include #include +#include #ifndef NDEBUG -#include #include #include #endif @@ -61,7 +61,7 @@ DMFileWriter::DMFileWriter( for (auto & cd : write_columns) { if (cd.vector_index) - RUNTIME_CHECK(VectorIndex::isSupportedType(*cd.type)); + RUNTIME_CHECK(VectorIndexBuilder::isSupportedType(*cd.type)); // TODO: currently we only generate index for Integers, Date, DateTime types, and this should be configurable by user. /// for handle column always generate index @@ -101,7 +101,11 @@ DMFileWriter::WriteBufferFromFileBasePtr DMFileWriter::createMetaFile() } } -void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index, TiDB::VectorIndexInfoPtr do_vector_index) +void DMFileWriter::addStreams( + ColId col_id, + DataTypePtr type, + bool do_index, + TiDB::VectorIndexDefinitionPtr do_vector_index) { auto callback = [&](const IDataType::SubstreamPath & substream_path) { const auto stream_name = DMFile::getFileNameBase(col_id, substream_path); @@ -335,34 +339,20 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) if (stream->vector_index && !is_empty_file) { - dmfile_meta->checkMergedFile(merged_file, file_provider, write_limiter); - - auto fname = colIndexFileName(stream_name); + // Vector index files are always not written into the merged file + // because we want to allow to be mmaped by the usearch. - auto buffer = ChecksumWriteBufferBuilder::build( - merged_file.buffer, - dmfile->getConfiguration()->getChecksumAlgorithm(), - dmfile->getConfiguration()->getChecksumFrameLength()); - - stream->vector_index->serializeBinary(*buffer); - - col_stat.index_bytes = buffer->getMaterializedBytes(); + const auto index_name = dmfile->colIndexPath(stream_name); + stream->vector_index->save(index_name); + col_stat.index_bytes = Poco::File(index_name).getSize(); // Memorize what kind of vector index it is, so that we can correctly restore it when reading. - col_stat.vector_index = dtpb::ColumnVectorIndexInfo{}; - col_stat.vector_index->set_index_kind(String(magic_enum::enum_name(stream->vector_index->kind))); + col_stat.vector_index.emplace(); + col_stat.vector_index->set_index_kind( + tipb::VectorIndexKind_Name(stream->vector_index->definition->kind)); col_stat.vector_index->set_distance_metric( - String(magic_enum::enum_name(stream->vector_index->distance_metric))); - - MergedSubFileInfo info{ - fname, - merged_file.file_info.number, - merged_file.file_info.size, - col_stat.index_bytes}; - dmfile_meta->merged_sub_file_infos[fname] = info; - - merged_file.file_info.size += col_stat.index_bytes; - buffer->next(); + tipb::VectorDistanceMetric_Name(stream->vector_index->definition->distance_metric)); + col_stat.vector_index->set_dimensions(stream->vector_index->definition->dimension); } // write mark into merged_file_writer diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index 5cc3cb7a480..2363790374e 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -54,7 +54,7 @@ class DMFileWriter FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter_, bool do_index, - TiDB::VectorIndexInfoPtr do_vector_index) + TiDB::VectorIndexDefinitionPtr do_vector_index) : plain_file(ChecksumWriteBufferBuilder::build( dmfile->getConfiguration().has_value(), file_provider, @@ -72,7 +72,7 @@ class DMFileWriter compression_settings, !dmfile->getConfiguration().has_value())) , minmaxes(do_index ? std::make_shared(*type) : nullptr) - , vector_index(do_vector_index ? VectorIndex::create(*do_vector_index) : nullptr) + , vector_index(do_vector_index ? VectorIndexBuilder::create(do_vector_index) : nullptr) { if (!dmfile->useMetaV2()) { @@ -98,7 +98,7 @@ class DMFileWriter WriteBufferPtr compressed_buf; MinMaxIndexPtr minmaxes; - VectorIndexPtr vector_index; + VectorIndexBuilderPtr vector_index; MarksInCompressedFilePtr marks; @@ -160,7 +160,7 @@ class DMFileWriter /// Add streams with specified column id. Since a single column may have more than one Stream, /// for example Nullable column has a NullMap column, we would track them with a mapping /// FileNameBase -> Stream. - void addStreams(ColId col_id, DataTypePtr type, bool do_index, TiDB::VectorIndexInfoPtr do_vector_index); + void addStreams(ColId col_id, DataTypePtr type, bool do_index, TiDB::VectorIndexDefinitionPtr do_vector_index); WriteBufferFromFileBasePtr createMetaFile(); void finalizeMeta(); diff --git a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp index fa2e352ba8c..8f6238bce02 100644 --- a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include + namespace DB::DM { @@ -31,18 +34,22 @@ std::vector VectorColumnFromIndexReader::calcPackStartRowID(const DMFile } MutableColumnPtr VectorColumnFromIndexReader::calcResultsByPack( - std::vector && results, + const std::vector & sorted_results, const DMFileMeta::PackStats & pack_stats, const std::vector & pack_start_rowid) { auto column = ColumnArray::create(ColumnUInt32::create()); - // results must be in ascending order. - std::sort(results.begin(), results.end()); +#ifndef NDEBUG + { + const auto sorted = std::is_sorted(sorted_results.begin(), sorted_results.end()); + RUNTIME_CHECK(sorted); + } +#endif std::vector offsets_in_pack; size_t results_it = 0; - const size_t results_it_max = results.size(); + const size_t results_it_max = sorted_results.size(); for (size_t pack_id = 0, pack_id_max = pack_start_rowid.size(); pack_id < pack_id_max; pack_id++) { offsets_in_pack.clear(); @@ -51,10 +58,10 @@ MutableColumnPtr VectorColumnFromIndexReader::calcResultsByPack( UInt32 pack_end = pack_start + pack_stats[pack_id].rows; while (results_it < results_it_max // - && results[results_it] >= pack_start // - && results[results_it] < pack_end) + && sorted_results[results_it] >= pack_start // + && sorted_results[results_it] < pack_end) { - offsets_in_pack.push_back(results[results_it] - pack_start); + offsets_in_pack.push_back(sorted_results[results_it] - pack_start); results_it++; } @@ -73,7 +80,6 @@ void VectorColumnFromIndexReader::read(MutableColumnPtr & column, size_t start_p { std::vector value; const auto * results_by_pack = checkAndGetColumn(this->results_by_pack.get()); - checkAndGetColumn(column.get()); size_t pack_id = start_pack_id; UInt32 remaining_rows_in_pack = pack_stats[pack_id].rows; @@ -110,7 +116,7 @@ void VectorColumnFromIndexReader::read(MutableColumnPtr & column, size_t start_p RUNTIME_CHECK(filled_result_rows == offset_in_pack); // TODO(vector-index): We could fill multiple rows if rowid is continuous. - VectorIndex::Key rowid = pack_start_rowid[pack_id] + offset_in_pack; + VectorIndexViewer::Key rowid = pack_start_rowid[pack_id] + offset_in_pack; index->get(rowid, value); column->insertData(reinterpret_cast(value.data()), value.size() * sizeof(Float32)); filled_result_rows++; diff --git a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h index 9eafb6824cb..5fff067dc72 100644 --- a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h +++ b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -43,7 +44,7 @@ class VectorColumnFromIndexReader const DMFileMeta::PackStats & pack_stats; const std::vector pack_start_rowid; - const VectorIndexPtr index; + const VectorIndexViewerPtr index; /// results_by_pack[i]=[a,b,c...] means pack[i]'s row offset [a,b,c,...] is contained in the result set. /// The rowid of a is pack_start_rowid[i]+a. MutableColumnPtr /* ColumnArray of UInt32 */ results_by_pack; @@ -52,7 +53,7 @@ class VectorColumnFromIndexReader static std::vector calcPackStartRowID(const DMFileMeta::PackStats & pack_stats); static MutableColumnPtr calcResultsByPack( - std::vector && results, + const std::vector & results, const DMFileMeta::PackStats & pack_stats, const std::vector & pack_start_rowid); @@ -61,13 +62,13 @@ class VectorColumnFromIndexReader /// including NULLs and delete marks. explicit VectorColumnFromIndexReader( const DMFilePtr & dmfile_, - const VectorIndexPtr & index_, - std::vector && results_) + const VectorIndexViewerPtr & index_, + const std::vector & sorted_results_) : dmfile(dmfile_) , pack_stats(dmfile_->getPackStats()) , pack_start_rowid(calcPackStartRowID(pack_stats)) , index(index_) - , results_by_pack(calcResultsByPack(std::move(results_), pack_stats, pack_start_rowid)) + , results_by_pack(calcResultsByPack(sorted_results_, pack_stats, pack_start_rowid)) {} void read(MutableColumnPtr & column, size_t start_pack_id, UInt32 read_rows); diff --git a/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto b/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto index 7d14cad0f35..257522f0529 100644 --- a/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto +++ b/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto @@ -49,13 +49,6 @@ message ChecksumConfig { repeated ChecksumDebugInfo debug_info = 5; } -// Note: This message does not contain all fields of VectorIndexInfo, -// because this message is only used for reading the vector index carried with the column. -message ColumnVectorIndexInfo { - optional string index_kind = 1; - optional string distance_metric = 2; -} - message ColumnStat { optional int64 col_id = 1; optional string type_name = 2; @@ -69,8 +62,9 @@ message ColumnStat { optional uint64 array_sizes_bytes = 10; optional uint64 array_sizes_mark_bytes = 11; + reserved 101; // used before // TODO(vector-index) Support multiple vector index on the same column - optional ColumnVectorIndexInfo vector_index = 101; + optional VectorIndexFileProps vector_index = 102; } message ColumnStats { @@ -86,3 +80,15 @@ message StableLayerMeta { optional uint64 valid_bytes = 2; repeated StableFile files = 3; } + +// Note: This message is something different to VectorIndexDefinition. +// VectorIndexDefinition defines an index, comes from table DDL. +// It includes information about how index should be constructed, +// for example, it contains HNSW's 'efConstruct' parameter. +// However, VectorIndexFileProps provides information for read out the index, +// for example, very basic information about what the index is, and how it is stored. +message VectorIndexFileProps { + optional string index_kind = 1; // The value is tipb.VectorIndexKind + optional string distance_metric = 2; // The value is tipb.VectorDistanceMetric + optional uint64 dimensions = 3; +} diff --git a/dbms/src/Storages/DeltaMerge/Index/RSIndex.h b/dbms/src/Storages/DeltaMerge/Index/RSIndex.h index 813931e1705..c10b7133eba 100644 --- a/dbms/src/Storages/DeltaMerge/Index/RSIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/RSIndex.h @@ -15,7 +15,6 @@ #pragma once #include -#include namespace DB::DM { @@ -23,17 +22,11 @@ struct RSIndex { DataTypePtr type; MinMaxIndexPtr minmax; - VectorIndexPtr vector; // TODO(vector-index): Actually this is not a rough index. We put it here for convenience. RSIndex(const DataTypePtr & type_, const MinMaxIndexPtr & minmax_) : type(type_) , minmax(minmax_) {} - - RSIndex(const DataTypePtr & type_, const VectorIndexPtr & vector_) - : type(type_) - , vector(vector_) - {} }; using ColumnIndexes = std::unordered_map; diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp index 179faa3bda8..db99b0e6055 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp @@ -15,8 +15,11 @@ #include #include #include +#include #include #include +#include +#include namespace DB::ErrorCodes { @@ -26,7 +29,7 @@ extern const int BAD_ARGUMENTS; namespace DB::DM { -bool VectorIndex::isSupportedType(const IDataType & type) +bool VectorIndexBuilder::isSupportedType(const IDataType & type) { const auto * nullable = checkAndGetDataType(&type); if (nullable) @@ -35,50 +38,37 @@ bool VectorIndex::isSupportedType(const IDataType & type) return checkDataTypeArray(&type); } -VectorIndexPtr VectorIndex::create(const TiDB::VectorIndexInfo & index_info) +VectorIndexBuilderPtr VectorIndexBuilder::create(const TiDB::VectorIndexDefinitionPtr & definition) { - RUNTIME_CHECK(index_info.dimension > 0); - RUNTIME_CHECK(index_info.dimension <= std::numeric_limits::max()); + RUNTIME_CHECK(definition->dimension > 0); + RUNTIME_CHECK(definition->dimension <= TiDB::MAX_VECTOR_DIMENSION); - switch (index_info.kind) + switch (definition->kind) { - case TiDB::VectorIndexKind::HNSW: - switch (index_info.distance_metric) - { - case TiDB::DistanceMetric::L2: - return std::make_shared>(index_info.dimension); - case TiDB::DistanceMetric::COSINE: - return std::make_shared>(index_info.dimension); - default: - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Unsupported vector index distance metric {}", - index_info.distance_metric); - } + case tipb::VectorIndexKind::HNSW: + return std::make_shared(definition); default: - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported vector index {}", index_info.kind); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Unsupported vector index {}", + tipb::VectorIndexKind_Name(definition->kind)); } } -VectorIndexPtr VectorIndex::load(TiDB::VectorIndexKind kind, TiDB::DistanceMetric distance_metric, ReadBuffer & istr) +VectorIndexViewerPtr VectorIndexViewer::view(const dtpb::VectorIndexFileProps & file_props, std::string_view path) { - RUNTIME_CHECK(kind != TiDB::VectorIndexKind::INVALID); - RUNTIME_CHECK(distance_metric != TiDB::DistanceMetric::INVALID); + RUNTIME_CHECK(file_props.dimensions() > 0); + RUNTIME_CHECK(file_props.dimensions() <= TiDB::MAX_VECTOR_DIMENSION); + + tipb::VectorIndexKind kind; + RUNTIME_CHECK(tipb::VectorIndexKind_Parse(file_props.index_kind(), &kind)); switch (kind) { - case TiDB::VectorIndexKind::HNSW: - switch (distance_metric) - { - case TiDB::DistanceMetric::L2: - return VectorIndexHNSW::deserializeBinary(istr); - case TiDB::DistanceMetric::COSINE: - return VectorIndexHNSW::deserializeBinary(istr); - default: - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported vector index distance metric {}", distance_metric); - } + case tipb::VectorIndexKind::HNSW: + return VectorIndexHNSWViewer::view(file_props, path); default: - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported vector index {}", kind); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported vector index {}", file_props.index_kind()); } } diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h index f17e36b8ca8..bb085685363 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h @@ -16,92 +16,75 @@ #include #include -#include #include #include #include #include +#include #include #include -namespace DB -{ -namespace DM + +namespace DB::DM { -class VectorIndex +/// Builds a VectorIndex in memory. +class VectorIndexBuilder { public: /// The key is the row's offset in the DMFile. using Key = UInt32; - /// True bit means the row is valid and should be kept in the search result. - /// False bit lets the row filtered out and will search for more results. - using RowFilter = BitmapFilterView; - - struct SearchStatistics - { - size_t visited_nodes = 0; - size_t discarded_nodes = 0; // Rows filtered out by MVCC - }; +public: + static VectorIndexBuilderPtr create(const TiDB::VectorIndexDefinitionPtr & definition); static bool isSupportedType(const IDataType & type); - static VectorIndexPtr create(const TiDB::VectorIndexInfo & index_info); - - static VectorIndexPtr load(TiDB::VectorIndexKind kind, TiDB::DistanceMetric distance_metric, ReadBuffer & istr); - - VectorIndex(TiDB::VectorIndexKind kind_, TiDB::DistanceMetric distance_metric_) - : kind(kind_) - , distance_metric(distance_metric_) +public: + explicit VectorIndexBuilder(const TiDB::VectorIndexDefinitionPtr & definition_) + : definition(definition_) {} - virtual ~VectorIndex() = default; + virtual ~VectorIndexBuilder() = default; virtual void addBlock(const IColumn & column, const ColumnVector * del_mark) = 0; - virtual void serializeBinary(WriteBuffer & ostr) const = 0; - - virtual size_t memoryUsage() const = 0; - - // Invalid rows in `valid_rows` will be discared when applying the search - virtual std::vector search( // - const ANNQueryInfoPtr & query_info, - const RowFilter & valid_rows, - SearchStatistics & statistics) const - = 0; - - // Get the value (i.e. vector content) of a Key. - virtual void get(Key key, std::vector & out) const = 0; + virtual void save(std::string_view path) const = 0; public: - const TiDB::VectorIndexKind kind; - const TiDB::DistanceMetric distance_metric; + const TiDB::VectorIndexDefinitionPtr definition; }; -struct VectorIndexWeightFunction +/// Views a VectorIndex file. +/// It may nor may not read the whole content of the file into memory. +class VectorIndexViewer { - size_t operator()(const String &, const VectorIndex & index) const { return index.memoryUsage(); } -}; +public: + /// The key is the row's offset in the DMFile. + using Key = VectorIndexBuilder::Key; -class VectorIndexCache : public LRUCache, VectorIndexWeightFunction> -{ -private: - using Base = LRUCache, VectorIndexWeightFunction>; + /// True bit means the row is valid and should be kept in the search result. + /// False bit lets the row filtered out and will search for more results. + using RowFilter = BitmapFilterView; + +public: + static VectorIndexViewerPtr view(const dtpb::VectorIndexFileProps & file_props, std::string_view path); public: - explicit VectorIndexCache(size_t max_size_in_bytes) - : Base(max_size_in_bytes) + explicit VectorIndexViewer(const dtpb::VectorIndexFileProps & file_props_) + : file_props(file_props_) {} - template - MappedPtr getOrSet(const Key & key, LoadFunc && load) - { - auto result = Base::getOrSet(key, load); - return result.first; - } -}; + virtual ~VectorIndexViewer() = default; + + // Invalid rows in `valid_rows` will be discared when applying the search + virtual std::vector search(const ANNQueryInfoPtr & queryInfo, const RowFilter & valid_rows) const = 0; + + // Get the value (i.e. vector content) of a Key. + virtual void get(Key key, std::vector & out) const = 0; -} // namespace DM +public: + const dtpb::VectorIndexFileProps file_props; +}; -} // namespace DB +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.cpp new file mode 100644 index 00000000000..55350df8642 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.cpp @@ -0,0 +1,100 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +namespace DB::DM +{ + +size_t VectorIndexCache::cleanOutdatedCacheEntries() +{ + size_t cleaned = 0; + + std::unordered_set files; + { + // Copy out the list to avoid occupying lock for too long. + // The complexity is just O(N) which is fine. + std::shared_lock lock(mu); + files = files_to_check; + } + + for (const auto & file_path : files) + { + if (is_shutting_down) + break; + + if (!cache.contains(file_path)) + { + // It is evicted from LRU cache + std::unique_lock lock(mu); + files_to_check.erase(file_path); + } + else if (!Poco::File(file_path).exists()) + { + LOG_INFO(log, "Dropping in-memory Vector Index cache because on-disk file is dropped, file={}", file_path); + { + std::unique_lock lock(mu); + files_to_check.erase(file_path); + } + cache.remove(file_path); + cleaned++; + } + } + + LOG_DEBUG(log, "Cleaned {} outdated Vector Index cache entries", cleaned); + + return cleaned; +} + +void VectorIndexCache::cleanOutdatedLoop() +{ + while (true) + { + { + std::unique_lock lock(shutdown_mu); + shutdown_cv.wait_for(lock, std::chrono::minutes(1), [this] { return is_shutting_down.load(); }); + } + + if (is_shutting_down) + break; + + try + { + cleanOutdatedCacheEntries(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + +VectorIndexCache::VectorIndexCache(size_t max_entities) + : cache(max_entities) + , log(Logger::get()) +{ + cleaner_thread = std::thread([this] { cleanOutdatedLoop(); }); +} + +VectorIndexCache::~VectorIndexCache() +{ + is_shutting_down = true; + shutdown_cv.notify_all(); + cleaner_thread.join(); +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h new file mode 100644 index 00000000000..013631ca1f0 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h @@ -0,0 +1,82 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace DB::DM::tests +{ +class VectorIndexTestUtils; +} + +namespace DB::DM +{ + +class VectorIndexCache +{ +private: + using Cache = LRUCache; + + Cache cache; + LoggerPtr log; + + // Note: Key exists if cache does internal eviction. However it is fine, because + // we will remove them periodically. + std::unordered_set files_to_check; + std::shared_mutex mu; + + std::atomic is_shutting_down = false; + std::condition_variable shutdown_cv; + std::mutex shutdown_mu; + +private: + friend class ::DB::DM::tests::VectorIndexTestUtils; + + // Drop the in-memory Vector Index if the on-disk file is deleted. + // mmaped file could be unmmaped so that disk space can be reclaimed. + size_t cleanOutdatedCacheEntries(); + + void cleanOutdatedLoop(); + + // TODO(vector-index): Use task on BackgroundProcessingPool instead of a raw thread + std::thread cleaner_thread; + +public: + explicit VectorIndexCache(size_t max_entities); + + ~VectorIndexCache(); + + template + Cache::MappedPtr getOrSet(const Cache::Key & file_path, LoadFunc && load) + { + { + std::scoped_lock lock(mu); + files_to_check.insert(file_path); + } + + auto result = cache.getOrSet(file_path, load); + return result.first; + } +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp index 53a42da06b4..b5ee8adb0b8 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp @@ -16,9 +16,14 @@ #include #include #include +#include #include +#include +#include #include +#include +#include namespace DB::ErrorCodes { @@ -30,69 +35,36 @@ extern const int CANNOT_ALLOCATE_MEMORY; namespace DB::DM { -template -USearchIndexWithSerialization::USearchIndexWithSerialization(size_t dimensions) - : Base(Base::make(unum::usearch::metric_punned_t(dimensions, Metric))) -{} - -template -void USearchIndexWithSerialization::serialize(WriteBuffer & ostr) const -{ - auto callback = [&ostr](void * from, size_t n) { - ostr.write(reinterpret_cast(from), n); - return true; - }; - Base::save_to_stream(callback); -} - -template -void USearchIndexWithSerialization::deserialize(ReadBuffer & istr) -{ - auto callback = [&istr](void * from, size_t n) { - istr.readStrict(reinterpret_cast(from), n); - return true; - }; - Base::load_from_stream(callback); -} - -template class USearchIndexWithSerialization; -template class USearchIndexWithSerialization; - -constexpr TiDB::DistanceMetric toTiDBDistanceMetric(unum::usearch::metric_kind_t metric) +unum::usearch::metric_kind_t getUSearchMetricKind(tipb::VectorDistanceMetric d) { - switch (metric) + switch (d) { - case unum::usearch::metric_kind_t::l2sq_k: - return TiDB::DistanceMetric::L2; - case unum::usearch::metric_kind_t::cos_k: - return TiDB::DistanceMetric::COSINE; + case tipb::VectorDistanceMetric::INNER_PRODUCT: + return unum::usearch::metric_kind_t::ip_k; + case tipb::VectorDistanceMetric::COSINE: + return unum::usearch::metric_kind_t::cos_k; + case tipb::VectorDistanceMetric::L2: + return unum::usearch::metric_kind_t::l2sq_k; default: - return TiDB::DistanceMetric::INVALID; + // Specifically, L1 is currently unsupported by usearch. + + RUNTIME_CHECK_MSG( // + false, + "Unsupported vector distance {}", + tipb::VectorDistanceMetric_Name(d)); } } -constexpr tipb::ANNQueryDistanceMetric toTiDBQueryDistanceMetric(unum::usearch::metric_kind_t metric) +VectorIndexHNSWBuilder::VectorIndexHNSWBuilder(const TiDB::VectorIndexDefinitionPtr & definition_) + : VectorIndexBuilder(definition_) + , index(USearchImplType::make(unum::usearch::metric_punned_t( // + definition_->dimension, + getUSearchMetricKind(definition->distance_metric)))) { - switch (metric) - { - case unum::usearch::metric_kind_t::l2sq_k: - return tipb::ANNQueryDistanceMetric::L2; - case unum::usearch::metric_kind_t::cos_k: - return tipb::ANNQueryDistanceMetric::Cosine; - default: - return tipb::ANNQueryDistanceMetric::InvalidMetric; - } + RUNTIME_CHECK(definition_->kind == tipb::VectorIndexKind::HNSW); } -template -VectorIndexHNSW::VectorIndexHNSW(UInt32 dimensions_) - : VectorIndex(TiDB::VectorIndexKind::HNSW, toTiDBDistanceMetric(Metric)) - , dimensions(dimensions_) - , index(std::make_shared>(static_cast(dimensions_))) -{} - -template -void VectorIndexHNSW::addBlock(const IColumn & column, const ColumnVector * del_mark) +void VectorIndexHNSWBuilder::addBlock(const IColumn & column, const ColumnVector * del_mark) { // Note: column may be nullable. const ColumnArray * col_array; @@ -106,7 +78,7 @@ void VectorIndexHNSW::addBlock(const IColumn & column, const ColumnVecto const auto * del_mark_data = (!del_mark) ? nullptr : &(del_mark->getData()); - if (!index->reserve(unum::usearch::ceil2(index->size() + column.size()))) + if (!index.reserve(unum::usearch::ceil2(index.size() + column.size()))) { throw Exception(ErrorCodes::CANNOT_ALLOCATE_MEMORY, "Could not reserve memory for HNSW index"); } @@ -125,12 +97,12 @@ void VectorIndexHNSW::addBlock(const IColumn & column, const ColumnVecto continue; // Expect all data to have matching dimensions. - RUNTIME_CHECK(col_array->sizeAt(i) == dimensions); + RUNTIME_CHECK(col_array->sizeAt(i) == definition->dimension); auto data = col_array->getDataAt(i); - RUNTIME_CHECK(data.size == dimensions * sizeof(Float32)); + RUNTIME_CHECK(data.size == definition->dimension * sizeof(Float32)); - if (auto rc = index->add(row_offset, reinterpret_cast(data.data)); !rc) + if (auto rc = index.add(row_offset, reinterpret_cast(data.data)); !rc) throw Exception( ErrorCodes::INCORRECT_DATA, "Failed to add vector to HNSW index, i={} row_offset={} error={}", @@ -140,77 +112,90 @@ void VectorIndexHNSW::addBlock(const IColumn & column, const ColumnVecto } } -template -void VectorIndexHNSW::serializeBinary(WriteBuffer & ostr) const +void VectorIndexHNSWBuilder::save(std::string_view path) const { - writeStringBinary(magic_enum::enum_name(kind), ostr); - writeStringBinary(magic_enum::enum_name(distance_metric), ostr); - writeIntBinary(dimensions, ostr); - index->serialize(ostr); + auto result = index.save(unum::usearch::output_file_t(path.data())); + RUNTIME_CHECK_MSG(result, "Failed to save vector index: {}", result.error.what()); } -template -VectorIndexPtr VectorIndexHNSW::deserializeBinary(ReadBuffer & istr) +VectorIndexViewerPtr VectorIndexHNSWViewer::view(const dtpb::VectorIndexFileProps & file_props, std::string_view path) { - String kind; - readStringBinary(kind, istr); - RUNTIME_CHECK(magic_enum::enum_cast(kind) == TiDB::VectorIndexKind::HNSW); + RUNTIME_CHECK(file_props.index_kind() == tipb::VectorIndexKind_Name(tipb::VectorIndexKind::HNSW)); - String distance_metric; - readStringBinary(distance_metric, istr); - RUNTIME_CHECK(magic_enum::enum_cast(distance_metric) == toTiDBDistanceMetric(Metric)); + tipb::VectorDistanceMetric metric; + RUNTIME_CHECK(tipb::VectorDistanceMetric_Parse(file_props.distance_metric(), &metric)); + RUNTIME_CHECK(metric != tipb::VectorDistanceMetric::INVALID_DISTANCE_METRIC); - UInt32 dimensions; - readIntBinary(dimensions, istr); + auto vi = std::make_shared(file_props); + vi->index = USearchImplType::make(unum::usearch::metric_punned_t( // + file_props.dimensions(), + getUSearchMetricKind(metric))); + auto result = vi->index.view(unum::usearch::memory_mapped_file_t(path.data())); + RUNTIME_CHECK_MSG(result, "Failed to load vector index: {}", result.error.what()); - auto vi = std::make_shared>(dimensions); - vi->index->deserialize(istr); return vi; } -template -std::vector VectorIndexHNSW::search( +std::vector VectorIndexHNSWViewer::search( const ANNQueryInfoPtr & query_info, - const RowFilter & valid_rows, - SearchStatistics & statistics) const + const RowFilter & valid_rows) const { RUNTIME_CHECK(query_info->ref_vec_f32().size() >= sizeof(UInt32)); auto query_vec_size = readLittleEndian(query_info->ref_vec_f32().data()); - if (query_vec_size != dimensions) + if (query_vec_size != file_props.dimensions()) throw Exception( ErrorCodes::INCORRECT_QUERY, "Query vector size {} does not match index dimensions {}", query_vec_size, - dimensions); + file_props.dimensions()); RUNTIME_CHECK(query_info->ref_vec_f32().size() >= sizeof(UInt32) + query_vec_size * sizeof(Float32)); - if (query_info->distance_metric() != toTiDBQueryDistanceMetric(Metric)) + if (tipb::VectorDistanceMetric_Name(query_info->distance_metric()) != file_props.distance_metric()) throw Exception( ErrorCodes::INCORRECT_QUERY, "Query distance metric {} does not match index distance metric {}", - tipb::ANNQueryDistanceMetric_Name(query_info->distance_metric()), - tipb::ANNQueryDistanceMetric_Name(toTiDBQueryDistanceMetric(Metric))); + tipb::VectorDistanceMetric_Name(query_info->distance_metric()), + file_props.distance_metric()); - RUNTIME_CHECK(index != nullptr); + std::atomic visited_nodes = 0; + std::atomic discarded_nodes = 0; + std::atomic has_exception_in_search = false; // The non-valid rows should be discarded by this lambda - auto predicate - = [&valid_rows, &statistics](typename USearchIndexWithSerialization::member_cref_t const & member) { - statistics.visited_nodes++; - if (!valid_rows[member.key]) - statistics.discarded_nodes++; - return valid_rows[member.key]; - }; + auto predicate = [&](typename USearchImplType::member_cref_t const & member) { + // Must catch exceptions in the predicate, because search runs on other threads. + try + { + // Note: We don't increase the thread_local perf, because search runs on other threads. + visited_nodes++; + if (!valid_rows[member.key]) + discarded_nodes++; + return valid_rows[member.key]; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + has_exception_in_search = true; + return false; + } + }; // TODO(vector-index): Support efSearch. - auto result = index->search( // + auto result = index.search( // reinterpret_cast(query_info->ref_vec_f32().data() + sizeof(UInt32)), query_info->top_k(), predicate); + + if (has_exception_in_search) + throw Exception(ErrorCodes::INCORRECT_QUERY, "Exception happened occurred during search"); + std::vector keys(result.size()); result.dump_to(keys.data()); + PerfContext::vector_search.visited_nodes += visited_nodes; + PerfContext::vector_search.discarded_nodes += discarded_nodes; + // For some reason usearch does not always do the predicate for all search results. // So we need to filter again. keys.erase( @@ -220,14 +205,10 @@ std::vector VectorIndexHNSW::search( return keys; } -template -void VectorIndexHNSW::get(Key key, std::vector & out) const +void VectorIndexHNSWViewer::get(Key key, std::vector & out) const { - out.resize(dimensions); - index->get(key, out.data()); + out.resize(file_props.dimensions()); + index.get(key, out.data()); } -template class VectorIndexHNSW; -template class VectorIndexHNSW; - } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h index dd1ab4581d3..1086fdce0c0 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h @@ -14,54 +14,45 @@ #pragma once +#include #include #include namespace DB::DM { -using USearchImplType - = unum::usearch::index_dense_gt; +using USearchImplType = unum::usearch:: + index_dense_gt; -template -class USearchIndexWithSerialization : public USearchImplType +class VectorIndexHNSWBuilder : public VectorIndexBuilder { - using Base = USearchImplType; - public: - explicit USearchIndexWithSerialization(size_t dimensions); - void serialize(WriteBuffer & ostr) const; - void deserialize(ReadBuffer & istr); -}; + explicit VectorIndexHNSWBuilder(const TiDB::VectorIndexDefinitionPtr & definition_); -template -using USearchIndexWithSerializationPtr = std::shared_ptr>; + void addBlock(const IColumn & column, const ColumnVector * del_mark) override; -template -class VectorIndexHNSW : public VectorIndex -{ -public: - explicit VectorIndexHNSW(UInt32 dimensions_); + void save(std::string_view path) const override; - void addBlock(const IColumn & column, const ColumnVector * del_mark) override; +private: + USearchImplType index; + UInt64 added_rows = 0; // Includes nulls and deletes. Used as the index key. +}; - void serializeBinary(WriteBuffer & ostr) const override; - static VectorIndexPtr deserializeBinary(ReadBuffer & istr); +class VectorIndexHNSWViewer : public VectorIndexViewer +{ +public: + static VectorIndexViewerPtr view(const dtpb::VectorIndexFileProps & props, std::string_view path); - size_t memoryUsage() const override { return index->memory_usage(); } + explicit VectorIndexHNSWViewer(const dtpb::VectorIndexFileProps & props) + : VectorIndexViewer(props) + {} - std::vector search( // - const ANNQueryInfoPtr & query_info, - const RowFilter & valid_rows, - SearchStatistics & statistics) const override; + std::vector search(const ANNQueryInfoPtr & query_info, const RowFilter & valid_rows) const override; void get(Key key, std::vector & out) const override; private: - const UInt32 dimensions; - const USearchIndexWithSerializationPtr index; - - UInt64 added_rows = 0; // Includes nulls and deletes. Used as the index key. + USearchImplType index; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex_fwd.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndex_fwd.h index abf481411f7..131715302e5 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex_fwd.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex_fwd.h @@ -21,8 +21,11 @@ namespace DB::DM using ANNQueryInfoPtr = std::shared_ptr; -class VectorIndex; -using VectorIndexPtr = std::shared_ptr; +class VectorIndexBuilder; +using VectorIndexBuilderPtr = std::shared_ptr; + +class VectorIndexViewer; +using VectorIndexViewerPtr = std::shared_ptr; class VectorIndexCache; using VectorIndexCachePtr = std::shared_ptr; diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorSearchPerf.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorSearchPerf.cpp new file mode 100644 index 00000000000..a7cca6be6a6 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/VectorSearchPerf.cpp @@ -0,0 +1,22 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::PerfContext +{ + +thread_local VectorSearchPerfContext vector_search = {}; + +} diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorSearchPerf.h b/dbms/src/Storages/DeltaMerge/Index/VectorSearchPerf.h new file mode 100644 index 00000000000..6fb3f1a7405 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/VectorSearchPerf.h @@ -0,0 +1,37 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +/// Remove the population of thread_local from Poco +#ifdef thread_local +#undef thread_local +#endif + +namespace DB::PerfContext +{ + +struct VectorSearchPerfContext +{ + size_t visited_nodes = 0; + size_t discarded_nodes = 0; // Rows filtered out by MVCC + + void reset() { *this = {}; } +}; + +extern thread_local VectorSearchPerfContext vector_search; + +} // namespace DB::PerfContext diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.h b/dbms/src/Storages/DeltaMerge/Remote/Serializer.h index 9bffb6b56cd..fbcc6f5f56c 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.h +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.h @@ -84,7 +84,6 @@ struct Serializer const IColumnFileDataProviderPtr & data_provider, bool need_mem_data); -private: static RemotePb::RemoteSegment serializeSegment( const SegmentSnapshotPtr & snap, PageIdU64 segment_id, @@ -94,6 +93,7 @@ struct Serializer MemTrackerWrapper & mem_tracker_wrapper, bool need_mem_data); +private: static google::protobuf::RepeatedPtrField serializeColumnFileSet( const ColumnFileSetSnapshotPtr & snap, MemTrackerWrapper & mem_tracker_wrapper, diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index fa66a40d259..785e3135f62 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -87,6 +87,7 @@ class ScanContext // Building bitmap std::atomic build_bitmap_time_ns{0}; + std::atomic total_vector_idx_load_from_s3{0}; std::atomic total_vector_idx_load_from_disk{0}; std::atomic total_vector_idx_load_from_cache{0}; std::atomic total_vector_idx_load_time_ms{0}; @@ -143,6 +144,7 @@ class ScanContext deserializeRegionNumberOfInstance(tiflash_scan_context_pb); + total_vector_idx_load_from_s3 = tiflash_scan_context_pb.total_vector_idx_load_from_s3(); total_vector_idx_load_from_disk = tiflash_scan_context_pb.total_vector_idx_load_from_disk(); total_vector_idx_load_from_cache = tiflash_scan_context_pb.total_vector_idx_load_from_cache(); total_vector_idx_load_time_ms = tiflash_scan_context_pb.total_vector_idx_load_time_ms(); @@ -194,6 +196,7 @@ class ScanContext serializeRegionNumOfInstance(tiflash_scan_context_pb); + tiflash_scan_context_pb.set_total_vector_idx_load_from_s3(total_vector_idx_load_from_s3); tiflash_scan_context_pb.set_total_vector_idx_load_from_disk(total_vector_idx_load_from_disk); tiflash_scan_context_pb.set_total_vector_idx_load_from_cache(total_vector_idx_load_from_cache); tiflash_scan_context_pb.set_total_vector_idx_load_time_ms(total_vector_idx_load_time_ms); @@ -254,6 +257,7 @@ class ScanContext mergeRegionNumberOfInstance(other); + total_vector_idx_load_from_s3 += other.total_vector_idx_load_from_s3; total_vector_idx_load_from_disk += other.total_vector_idx_load_from_disk; total_vector_idx_load_from_cache += other.total_vector_idx_load_from_cache; total_vector_idx_load_time_ms += other.total_vector_idx_load_time_ms; @@ -305,6 +309,7 @@ class ScanContext mergeRegionNumberOfInstance(other); + total_vector_idx_load_from_s3 += other.total_vector_idx_load_from_s3(); total_vector_idx_load_from_disk += other.total_vector_idx_load_from_disk(); total_vector_idx_load_from_cache += other.total_vector_idx_load_from_cache(); total_vector_idx_load_time_ms += other.total_vector_idx_load_time_ms(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index 5e369b0c8f8..8c0b68014d2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -12,21 +12,44 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include #include #include #include +#include +#include +#include #include +#include #include #include #include +#include #include +#include +#include #include #include #include #include #include +#include +#include + +namespace CurrentMetrics +{ +extern const Metric DT_SnapshotOfRead; +} // namespace CurrentMetrics + +namespace DB::FailPoints +{ +extern const char force_use_dmfile_format_v3[]; +extern const char file_cache_fg_download_fail[]; +} // namespace DB::FailPoints + namespace DB::DM::tests { @@ -66,6 +89,17 @@ class VectorIndexTestUtils EncodeVectorFloat32(arr, wb); return wb.str(); } + + ColumnDefine cdVec() + { + // When used in read, no need to assign vector_index. + return ColumnDefine(vec_column_id, vec_column_name, tests::typeFromString("Array(Float32)")); + } + + static size_t cleanVectorCacheEntries(const std::shared_ptr & cache) + { + return cache->cleanOutdatedCacheEntries(); + } }; class VectorIndexDMFileTest @@ -173,10 +207,10 @@ try { auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); auto vec_cd = ColumnDefine(vec_column_id, vec_column_name, tests::typeFromString("Array(Float32)")); - vec_cd.vector_index = std::make_shared(TiDB::VectorIndexInfo{ - .kind = TiDB::VectorIndexKind::HNSW, + vec_cd.vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, .dimension = 3, - .distance_metric = TiDB::DistanceMetric::L2, + .distance_metric = tipb::VectorDistanceMetric::L2, }); cols->emplace_back(vec_cd); @@ -201,7 +235,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.5})); @@ -226,7 +260,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); @@ -251,7 +285,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(2); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); @@ -276,7 +310,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); @@ -304,7 +338,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(0); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); @@ -329,7 +363,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(10); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); @@ -354,7 +388,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(10); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0})); @@ -388,7 +422,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(5); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); @@ -413,7 +447,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::Cosine); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::COSINE); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); @@ -434,7 +468,7 @@ try } catch (const DB::Exception & ex) { - ASSERT_STREQ("Query distance metric Cosine does not match index distance metric L2", ex.message().c_str()); + ASSERT_STREQ("Query distance metric COSINE does not match index distance metric L2", ex.message().c_str()); } catch (...) { @@ -448,7 +482,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(EXTRA_HANDLE_COLUMN_ID); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.8})); @@ -471,15 +505,82 @@ try } CATCH +TEST_P(VectorIndexDMFileTest, OnePackWithDuplicateVectors) +try +{ + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto vec_cd = ColumnDefine(vec_column_id, vec_column_name, tests::typeFromString("Array(Float32)")); + vec_cd.vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + cols->emplace_back(vec_cd); + + ColumnDefines read_cols = *cols; + if (test_only_vec_column) + read_cols = {vec_cd}; + + // Prepare DMFile + { + Block block = DMTestEnv::prepareSimpleWriteBlockWithNullable(0, 5); + block.insert(createVecFloat32Column( + {// + {1.0, 2.0, 3.0}, + {1.0, 2.0, 3.0}, + {0.0, 0.0, 0.0}, + {1.0, 2.0, 3.0}, + {1.0, 2.0, 3.5}}, + vec_cd.name, + vec_cd.id)); + auto stream = std::make_shared(dbContext(), dm_file, *cols); + stream->writePrefix(); + stream->write(block, DMFileBlockOutputStream::BlockProperty{0, 0, 0, 0}); + stream->writeSuffix(); + } + + dm_file = restoreDMFile(); + + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_cd.id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + ann_query_info->set_top_k(4); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.5})); + + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.setRSOperator(wrapWithANNQueryInfo(nullptr, ann_query_info)) + .setBitmapFilter(BitmapFilterView(std::make_shared(5, true), 0, 5)) + .tryBuildWithVectorIndex( + dm_file, + read_cols, + RowKeyRanges{RowKeyRange::newAll(false, 1)}, + std::make_shared()); + + ASSERT_INPUTSTREAM_COLS_UR( + stream, + createColumnNames(), + createColumnData({ + createColumn({0, 1, 3, 4}), + createVecFloat32Column({// + {1.0, 2.0, 3.0}, + {1.0, 2.0, 3.0}, + {1.0, 2.0, 3.0}, + {1.0, 2.0, 3.5}}), + })); + } +} +CATCH + TEST_P(VectorIndexDMFileTest, MultiPacks) try { auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); auto vec_cd = ColumnDefine(vec_column_id, vec_column_name, tests::typeFromString("Array(Float32)")); - vec_cd.vector_index = std::make_shared(TiDB::VectorIndexInfo{ - .kind = TiDB::VectorIndexKind::HNSW, + vec_cd.vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, .dimension = 3, - .distance_metric = TiDB::DistanceMetric::L2, + .distance_metric = tipb::VectorDistanceMetric::L2, }); cols->emplace_back(vec_cd); @@ -510,7 +611,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({5.0, 5.0, 5.5})); @@ -535,7 +636,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0, 2.0, 3.0})); @@ -560,7 +661,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(2); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({0.0, 0.0, 0.0})); @@ -585,7 +686,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(2); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({0.0, 0.0, 0.0})); @@ -616,10 +717,10 @@ try { auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); auto vec_cd = ColumnDefine(vec_column_id, vec_column_name, tests::typeFromString("Array(Float32)")); - vec_cd.vector_index = std::make_shared(TiDB::VectorIndexInfo{ - .kind = TiDB::VectorIndexKind::HNSW, + vec_cd.vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, .dimension = 1, - .distance_metric = TiDB::DistanceMetric::L2, + .distance_metric = tipb::VectorDistanceMetric::L2, }); cols->emplace_back(vec_cd); @@ -652,7 +753,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({8.0})); @@ -693,7 +794,7 @@ try { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_cd.id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(3); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({8.0})); @@ -735,7 +836,7 @@ class VectorIndexSegmentTestBase { auto ann_query_info = std::make_shared(); ann_query_info->set_column_id(vec_column_id); - ann_query_info->set_distance_metric(tipb::ANNQueryDistanceMetric::L2); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); ann_query_info->set_top_k(top_k); ann_query_info->set_ref_vec_f32(encodeVectorFloat32(ref_vec)); return read(segment_id, begin, end, columns_to_read, ann_query_info); @@ -774,12 +875,6 @@ class VectorIndexSegmentTestBase ColumnDefine cdPK() { return getExtraHandleColumnDefine(options.is_common_handle); } - ColumnDefine cdVec() - { - // When used in read, no need to assign vector_index. - return ColumnDefine(vec_column_id, vec_column_name, tests::typeFromString("Array(Float32)")); - } - protected: Block prepareWriteBlockImpl(Int64 start_key, Int64 end_key, bool is_deleted) override { @@ -791,10 +886,10 @@ class VectorIndexSegmentTestBase void prepareColumns(const ColumnDefinesPtr & columns) override { auto vec_cd = ColumnDefine(vec_column_id, vec_column_name, tests::typeFromString("Array(Float32)")); - vec_cd.vector_index = std::make_shared(TiDB::VectorIndexInfo{ - .kind = TiDB::VectorIndexKind::HNSW, + vec_cd.vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, .dimension = 1, - .distance_metric = TiDB::DistanceMetric::L2, + .distance_metric = tipb::VectorDistanceMetric::L2, }); columns->emplace_back(vec_cd); } @@ -1109,4 +1204,714 @@ try } CATCH +class VectorIndexSegmentOnS3Test + : public VectorIndexTestUtils + , public DB::base::TiFlashStorageTestBasic +{ +public: + void SetUp() override + { + FailPointHelper::enableFailPoint(FailPoints::force_use_dmfile_format_v3); + + DB::tests::TiFlashTestEnv::enableS3Config(); + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client)); + TiFlashStorageTestBasic::SetUp(); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + + global_context.getSharedContextDisagg()->initRemoteDataStore( + global_context.getFileProvider(), + /*s3_enabled*/ true); + ASSERT_TRUE(global_context.getSharedContextDisagg()->remote_data_store != nullptr); + + orig_mode = global_context.getPageStorageRunMode(); + global_context.setPageStorageRunMode(PageStorageRunMode::UNI_PS); + global_context.tryReleaseWriteNodePageStorageForTest(); + global_context.initializeWriteNodePageStorageIfNeed(global_context.getPathPool()); + + global_context.setVectorIndexCache(1000); + + auto kvstore = db_context->getTMTContext().getKVStore(); + { + auto meta_store = metapb::Store{}; + meta_store.set_id(100); + kvstore->setStore(meta_store); + } + + TiFlashStorageTestBasic::reload(DB::Settings()); + storage_path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); + page_id_allocator = std::make_shared(); + storage_pool = std::make_shared( + *db_context, + NullspaceID, + ns_id, + *storage_path_pool, + page_id_allocator, + "test.t1"); + storage_pool->restore(); + + StorageRemoteCacheConfig file_cache_config{ + .dir = fmt::format("{}/fs_cache", getTemporaryPath()), + .capacity = 1 * 1000 * 1000 * 1000, + }; + FileCache::initialize(global_context.getPathCapacity(), file_cache_config); + + auto cols = DMTestEnv::getDefaultColumns(); + auto vec_cd = cdVec(); + vec_cd.vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + cols->emplace_back(vec_cd); + setColumns(cols); + + auto dm_context = dmContext(); + wn_segment = Segment::newSegment( + Logger::get(), + *dm_context, + table_columns, + RowKeyRange::newAll(false, 1), + DELTA_MERGE_FIRST_SEGMENT_ID, + 0); + ASSERT_EQ(wn_segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); + } + + void TearDown() override + { + FailPointHelper::disableFailPoint(FailPoints::force_use_dmfile_format_v3); + + FileCache::shutdown(); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + global_context.dropVectorIndexCache(); + global_context.getSharedContextDisagg()->remote_data_store = nullptr; + global_context.setPageStorageRunMode(orig_mode); + + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + ::DB::tests::TiFlashTestEnv::deleteBucket(*s3_client); + DB::tests::TiFlashTestEnv::disableS3Config(); + } + + static ColumnDefine cdPK() { return getExtraHandleColumnDefine(false); } + + BlockInputStreamPtr createComputeNodeStream( + const SegmentPtr & write_node_segment, + const ColumnDefines & columns_to_read, + const PushDownFilterPtr & filter, + const ScanContextPtr & read_scan_context = nullptr) + { + auto write_dm_context = dmContext(); + auto snap = write_node_segment->createSnapshot(*write_dm_context, false, CurrentMetrics::DT_SnapshotOfRead); + auto snap_proto = Remote::Serializer::serializeSegment( + snap, + write_node_segment->segmentId(), + 0, + write_node_segment->rowkey_range, + {write_node_segment->rowkey_range}, + dummy_mem_tracker, + /*need_mem_data*/ true); + + auto cn_segment = std::make_shared( + Logger::get(), + /*epoch*/ 0, + write_node_segment->getRowKeyRange(), + write_node_segment->segmentId(), + /*next_segment_id*/ 0, + nullptr, + nullptr); + + auto read_dm_context = dmContext(read_scan_context); + auto cn_segment_snap = Remote::Serializer::deserializeSegment( + *read_dm_context, + /* store_id */ 100, + /* keyspace_id */ 0, + /* table_id */ 100, + snap_proto); + + auto stream = cn_segment->getInputStream( + ReadMode::Bitmap, + *read_dm_context, + columns_to_read, + cn_segment_snap, + {write_node_segment->getRowKeyRange()}, + filter, + std::numeric_limits::max(), + DEFAULT_BLOCK_SIZE); + + return stream; + } + + static void removeAllFileCache() + { + auto * file_cache = FileCache::instance(); + auto file_segments = file_cache->getAll(); + for (const auto & file_seg : file_cache->getAll()) + file_cache->remove(file_cache->toS3Key(file_seg->getLocalFileName()), true); + + RUNTIME_CHECK(file_cache->getAll().empty()); + } + + void prepareWriteNodeStable() + { + auto dm_context = dmContext(); + Block block = DMTestEnv::prepareSimpleWriteBlockWithNullable(0, 100); + block.insert(colVecFloat32("[0, 100)", vec_column_name, vec_column_id)); + wn_segment->write(*dm_context, std::move(block), true); + wn_segment = wn_segment->mergeDelta(*dm_context, tableColumns()); + + // Let's just make sure we are later indeed reading from S3 + RUNTIME_CHECK(wn_segment->stable->getDMFiles()[0]->path().rfind("s3://") == 0); + } + + BlockInputStreamPtr computeNodeTableScan() + { + return createComputeNodeStream(wn_segment, {cdPK(), cdVec()}, nullptr); + } + + BlockInputStreamPtr computeNodeANNQuery( + const std::vector ref_vec, + UInt32 top_k = 1, + const ScanContextPtr & read_scan_context = nullptr) + { + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + ann_query_info->set_top_k(top_k); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32(ref_vec)); + + auto stream = createComputeNodeStream( + wn_segment, + {cdPK(), cdVec()}, + std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)), + read_scan_context); + return stream; + } + +protected: + // setColumns should update dm_context at the same time + void setColumns(const ColumnDefinesPtr & columns) { table_columns = columns; } + + const ColumnDefinesPtr & tableColumns() const { return table_columns; } + + DMContextPtr dmContext(const ScanContextPtr & scan_context = nullptr) + { + return DMContext::createUnique( + *db_context, + storage_path_pool, + storage_pool, + /*min_version_*/ 0, + NullspaceID, + /*physical_table_id*/ 100, + false, + 1, + db_context->getSettingsRef(), + scan_context); + } + +protected: + /// all these var lives as ref in dm_context + GlobalPageIdAllocatorPtr page_id_allocator; + std::shared_ptr storage_path_pool; + std::shared_ptr storage_pool; + ColumnDefinesPtr table_columns; + DM::DeltaMergeStore::Settings settings; + + NamespaceID ns_id = 100; + + // the segment we are going to test + SegmentPtr wn_segment; + + DB::PageStorageRunMode orig_mode = PageStorageRunMode::ONLY_V3; + + // MemoryTrackerPtr memory_tracker; + MemTrackerWrapper dummy_mem_tracker = MemTrackerWrapper(0, root_of_query_mem_trackers.get()); +}; + +TEST_F(VectorIndexSegmentOnS3Test, FileCacheNotEnabled) +try +{ + prepareWriteNodeStable(); + + FileCache::shutdown(); + auto stream = computeNodeANNQuery({5.0}); + + try + { + stream->readPrefix(); + stream->read(); + FAIL(); + } + catch (const DB::Exception & ex) + { + ASSERT_STREQ("Check file_cache failed: Must enable S3 file cache to use vector index", ex.message().c_str()); + } + catch (...) + { + FAIL(); + } +} +CATCH + +TEST_F(VectorIndexSegmentOnS3Test, ReadWithoutIndex) +try +{ + prepareWriteNodeStable(); + { + auto * file_cache = FileCache::instance(); + ASSERT_EQ(0, file_cache->getAll().size()); + } + { + auto stream = computeNodeTableScan(); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[0, 100)"), + colVecFloat32("[0, 100)"), + })); + } + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + ASSERT_FALSE(std::filesystem::is_empty(file_cache->cache_dir)); + } +} +CATCH + +TEST_F(VectorIndexSegmentOnS3Test, ReadFromIndex) +try +{ + prepareWriteNodeStable(); + { + auto * file_cache = FileCache::instance(); + ASSERT_EQ(0, file_cache->getAll().size()); + } + { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + ASSERT_FALSE(std::filesystem::is_empty(file_cache->cache_dir)); + } + { + // Read again, we should be reading from memory cache. + + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 1); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 0); + } +} +CATCH + +TEST_F(VectorIndexSegmentOnS3Test, FileCacheEvict) +try +{ + prepareWriteNodeStable(); + { + auto * file_cache = FileCache::instance(); + ASSERT_EQ(0, file_cache->getAll().size()); + } + { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + ASSERT_FALSE(std::filesystem::is_empty(file_cache->cache_dir)); + } + { + // Simulate cache evict. + removeAllFileCache(); + } + { + // Check whether on-disk file is successfully unlinked when there is a memory + // cache. + auto * file_cache = FileCache::instance(); + ASSERT_TRUE(std::filesystem::is_empty(file_cache->cache_dir)); + } + { + // When cache is evicted (but memory cache exists), the query should be fine. + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + // Read again, we should be reading from memory cache. + + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 1); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 0); + } +} +CATCH + +TEST_F(VectorIndexSegmentOnS3Test, FileCacheEvictAndVectorCacheDrop) +try +{ + prepareWriteNodeStable(); + { + auto * file_cache = FileCache::instance(); + ASSERT_EQ(0, file_cache->getAll().size()); + } + { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + ASSERT_FALSE(std::filesystem::is_empty(file_cache->cache_dir)); + } + { + // Simulate cache evict. + removeAllFileCache(); + } + { + // Check whether on-disk file is successfully unlinked when there is a memory + // cache. + auto * file_cache = FileCache::instance(); + ASSERT_TRUE(std::filesystem::is_empty(file_cache->cache_dir)); + } + { + // We should be able to clear something from the vector index cache. + auto vec_cache = TiFlashTestEnv::getGlobalContext().getVectorIndexCache(); + ASSERT_NE(vec_cache, nullptr); + ASSERT_EQ(1, cleanVectorCacheEntries(vec_cache)); + } + { + // When cache is evicted (and memory cache is dropped), the query should be fine. + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + // Read again, we should be reading from memory cache. + + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 1); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 0); + } +} +CATCH + +TEST_F(VectorIndexSegmentOnS3Test, FileCacheDeleted) +try +{ + prepareWriteNodeStable(); + { + auto * file_cache = FileCache::instance(); + ASSERT_EQ(0, file_cache->getAll().size()); + } + { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + ASSERT_FALSE(std::filesystem::is_empty(file_cache->cache_dir)); + + // Simulate cache file is deleted by user. + std::filesystem::remove_all(file_cache->cache_dir); + } + { + // Query should be fine. + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + // Read again, we should be reading from memory cache. + + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 1); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 0); + } +} +CATCH + +TEST_F(VectorIndexSegmentOnS3Test, FileCacheDeletedAndVectorCacheDrop) +try +{ + prepareWriteNodeStable(); + { + auto * file_cache = FileCache::instance(); + ASSERT_EQ(0, file_cache->getAll().size()); + } + { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + ASSERT_FALSE(std::filesystem::is_empty(file_cache->cache_dir)); + + // Simulate cache file is deleted by user. + std::filesystem::remove_all(file_cache->cache_dir); + } + { + // We should be able to clear something from the vector index cache. + auto vec_cache = TiFlashTestEnv::getGlobalContext().getVectorIndexCache(); + ASSERT_NE(vec_cache, nullptr); + ASSERT_EQ(1, cleanVectorCacheEntries(vec_cache)); + } + { + // Query should be fine. + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + } + { + // Read again, we should be reading from memory cache. + + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 1); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 0); + } +} +CATCH + +TEST_F(VectorIndexSegmentOnS3Test, ConcurrentDownloadFromS3) +try +{ + prepareWriteNodeStable(); + { + auto * file_cache = FileCache::instance(); + ASSERT_EQ(0, file_cache->getAll().size()); + } + + auto sp_s3_fg_download = SyncPointCtl::enableInScope("FileCache::fgDownload"); + auto sp_wait_other_s3 = SyncPointCtl::enableInScope("before_FileSegment::waitForNotEmpty_wait"); + + auto th_1 = std::async([&]() { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[5, 6)"), + colVecFloat32("[5, 6)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + + ASSERT_EQ(PerfContext::file_cache.fg_download_from_s3, 1); + ASSERT_EQ(PerfContext::file_cache.fg_wait_download_from_s3, 0); + }); + + // th_1 should be blocked when downloading from s3. + sp_s3_fg_download.waitAndPause(); + + auto th_2 = std::async([&]() { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({7.0}, 1, scan_context); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name, vec_column_name}), + createColumns({ + colInt64("[7, 8)"), + colVecFloat32("[7, 8)"), + })); + + ASSERT_EQ(scan_context->total_vector_idx_load_from_cache, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_disk, 0); + ASSERT_EQ(scan_context->total_vector_idx_load_from_s3, 1); + + ASSERT_EQ(PerfContext::file_cache.fg_download_from_s3, 0); + ASSERT_EQ(PerfContext::file_cache.fg_wait_download_from_s3, 1); + }); + + // th_2 should be blocked by waiting th_1 to finish downloading from s3. + sp_wait_other_s3.waitAndNext(); + + // Let th_1 finish downloading from s3. + sp_s3_fg_download.next(); + + // Both th_1 and th_2 should be able to finish without hitting sync points again. + // e.g. th_2 should not ever try to fgDownload. + th_1.get(); + th_2.get(); +} +CATCH + +TEST_F(VectorIndexSegmentOnS3Test, S3Failure) +try +{ + prepareWriteNodeStable(); + DB::FailPointHelper::enableFailPoint(DB::FailPoints::file_cache_fg_download_fail); + SCOPE_EXIT({ DB::FailPointHelper::disableFailPoint(DB::FailPoints::file_cache_fg_download_fail); }); + + { + auto * file_cache = FileCache::instance(); + ASSERT_EQ(0, file_cache->getAll().size()); + } + { + auto scan_context = std::make_shared(); + auto stream = computeNodeANNQuery({5.0}, 1, scan_context); + + ASSERT_THROW( + { + stream->readPrefix(); + stream->read(); + }, + DB::Exception); + } +} +CATCH + + } // namespace DB::DM::tests diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index f8b0055a451..88e1dca52f3 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -14,8 +14,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -25,6 +27,7 @@ #include #include #include +#include #include #include @@ -51,12 +54,57 @@ extern const int S3_ERROR; extern const int FILE_DOESNT_EXIST; } // namespace DB::ErrorCodes +namespace DB::FailPoints +{ +extern const char file_cache_fg_download_fail[]; +} // namespace DB::FailPoints + namespace DB { using FileType = FileSegment::FileType; std::unique_ptr FileCache::global_file_cache_instance; +FileSegment::Status FileSegment::waitForNotEmpty() +{ + std::unique_lock lock(mtx); + + if (status != Status::Empty) + return status; + + PerfContext::file_cache.fg_wait_download_from_s3++; + + Stopwatch watch; + + while (true) + { + SYNC_FOR("before_FileSegment::waitForNotEmpty_wait"); // just before actual waiting... + + auto is_done = cv_ready.wait_for(lock, std::chrono::seconds(30), [&] { return status != Status::Empty; }); + if (is_done) + break; + + double elapsed_secs = watch.elapsedSeconds(); + LOG_WARNING( + Logger::get(), + "FileCache is still waiting FileSegment ready, file={} elapsed={}s", + local_fname, + elapsed_secs); + + // Snapshot time is 300s + if (elapsed_secs > 300) + { + throw Exception( + ErrorCodes::S3_ERROR, + "Failed to wait until S3 file {} is ready after {}s", + local_fname, + elapsed_secs); + } + } + + return status; +} + FileCache::FileCache(PathCapacityMetricsPtr capacity_metrics_, const StorageRemoteCacheConfig & config_) : capacity_metrics(capacity_metrics_) , cache_dir(config_.getDTFileCacheDir()) @@ -108,6 +156,24 @@ RandomAccessFilePtr FileCache::getRandomAccessFile( } } +FileSegmentPtr FileCache::downloadFileForLocalRead( + const S3::S3FilenameView & s3_fname, + const std::optional & filesize) +{ + auto file_seg = getOrWait(s3_fname, filesize); + if (!file_seg) + return nullptr; + + auto path = file_seg->getLocalFileName(); + if likely (Poco::File(path).exists()) + return file_seg; + + // Normally, this would not happen. But if someone removes cache files manually, the status of memory and filesystem are inconsistent. + // We can handle this situation by remove it from FileCache. + remove(s3_fname.toFullKey(), /*force*/ true); + return nullptr; +} + FileSegmentPtr FileCache::get(const S3::S3FilenameView & s3_fname, const std::optional & filesize) { auto s3_key = s3_fname.toFullKey(); @@ -166,6 +232,61 @@ FileSegmentPtr FileCache::get(const S3::S3FilenameView & s3_fname, const std::op return nullptr; } +FileSegmentPtr FileCache::getOrWait(const S3::S3FilenameView & s3_fname, const std::optional & filesize) +{ + auto s3_key = s3_fname.toFullKey(); + auto file_type = getFileType(s3_key); + auto & table = tables[static_cast(file_type)]; + + std::unique_lock lock(mtx); + + auto f = table.get(s3_key); + if (f != nullptr) + { + lock.unlock(); + f->setLastAccessTime(std::chrono::system_clock::now()); + auto status = f->waitForNotEmpty(); + if (status == FileSegment::Status::Complete) + { + GET_METRIC(tiflash_storage_remote_cache, type_dtfile_hit).Increment(); + return f; + } + // On-going download failed, let the caller retry. + return nullptr; + } + + GET_METRIC(tiflash_storage_remote_cache, type_dtfile_miss).Increment(); + + auto estimated_size = filesize ? *filesize : getEstimatedSizeOfFileType(file_type); + if (!reserveSpaceImpl(file_type, estimated_size, /*try_evict*/ true)) + { + // Space not enough. + GET_METRIC(tiflash_storage_remote_cache, type_dtfile_full).Increment(); + LOG_INFO( + log, + "s3_key={} space not enough(capacity={} used={} estimzted_size={}), skip cache", + s3_key, + cache_capacity, + cache_used, + estimated_size); + + // Just throw, no need to let the caller retry. + throw Exception(ErrorCodes::S3_ERROR, "Cannot reserve {} space for object {}", estimated_size, s3_key); + } + + auto file_seg + = std::make_shared(toLocalFilename(s3_key), FileSegment::Status::Empty, estimated_size, file_type); + table.set(s3_key, file_seg); + lock.unlock(); + + ++PerfContext::file_cache.fg_download_from_s3; + fgDownload(s3_key, file_seg); + if (!file_seg || !file_seg->isReadyToRead()) + throw Exception(ErrorCodes::S3_ERROR, "Download object {} failed", s3_key); + + return file_seg; +} + // Remove `local_fname` from disk and remove parent directory if parent directory is empty. void FileCache::removeDiskFile(const String & local_fname, bool update_fsize_metrics) const { @@ -208,12 +329,10 @@ void FileCache::remove(const String & s3_key, bool force) auto file_type = getFileType(s3_key); auto & table = tables[static_cast(file_type)]; - std::lock_guard lock(mtx); + std::unique_lock lock(mtx); auto f = table.get(s3_key, /*update_lru*/ false); if (f == nullptr) - { return; - } std::ignore = removeImpl(table, s3_key, f, force); } @@ -515,6 +634,7 @@ void FileCache::download(const String & s3_key, FileSegmentPtr & file_seg) if (!file_seg->isReadyToRead()) { + file_seg->setStatus(FileSegment::Status::Failed); GET_METRIC(tiflash_storage_remote_cache, type_dtfile_download_failed).Increment(); bg_download_fail_count.fetch_add(1, std::memory_order_relaxed); file_seg.reset(); @@ -544,6 +664,32 @@ void FileCache::bgDownload(const String & s3_key, FileSegmentPtr & file_seg) [this, s3_key = s3_key, file_seg = file_seg]() mutable { download(s3_key, file_seg); }); } +void FileCache::fgDownload(const String & s3_key, FileSegmentPtr & file_seg) +{ + SYNC_FOR("FileCache::fgDownload"); // simulate long s3 download + + try + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::file_cache_fg_download_fail); + GET_METRIC(tiflash_storage_remote_cache, type_dtfile_download).Increment(); + downloadImpl(s3_key, file_seg); + } + catch (...) + { + tryLogCurrentException(log, fmt::format("Download s3_key={} failed", s3_key)); + } + + if (!file_seg->isReadyToRead()) + { + file_seg->setStatus(FileSegment::Status::Failed); + GET_METRIC(tiflash_storage_remote_cache, type_dtfile_download_failed).Increment(); + file_seg.reset(); + remove(s3_key); + } + + LOG_DEBUG(log, "foreground downloading => s3_key {} finished", s3_key); +} + bool FileCache::isS3Filename(const String & fname) { return S3::S3FilenameView::fromKey(fname).isValid(); diff --git a/dbms/src/Storages/S3/FileCache.h b/dbms/src/Storages/S3/FileCache.h index 665050a40a9..dba2eacd66f 100644 --- a/dbms/src/Storages/S3/FileCache.h +++ b/dbms/src/Storages/S3/FileCache.h @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -71,6 +72,8 @@ class FileSegment return status == Status::Complete; } + Status waitForNotEmpty(); + void setSize(UInt64 size_) { std::lock_guard lock(mtx); @@ -81,6 +84,8 @@ class FileSegment { std::lock_guard lock(mtx); status = s; + if (status != Status::Empty) + cv_ready.notify_all(); } UInt64 getSize() const @@ -126,6 +131,7 @@ class FileSegment UInt64 size; const FileType file_type; std::chrono::time_point last_access_time; + std::condition_variable cv_ready; }; using FileSegmentPtr = std::shared_ptr; @@ -219,6 +225,13 @@ class FileCache const S3::S3FilenameView & s3_fname, const std::optional & filesize); + /// Download the file if it is not in the local cache and returns the + /// file guard of the local cache file. When file guard is alive, + /// local file will not be evicted. + FileSegmentPtr downloadFileForLocalRead( + const S3::S3FilenameView & s3_fname, + const std::optional & filesize); + void updateConfig(const Settings & settings); #ifndef DBMS_PUBLIC_GTEST @@ -233,8 +246,14 @@ class FileCache DISALLOW_COPY_AND_MOVE(FileCache); FileSegmentPtr get(const S3::S3FilenameView & s3_fname, const std::optional & filesize = std::nullopt); + /// Try best to wait until the file is available in cache. If the file is not in cache, it will download the file in foreground. + /// It may return nullptr after wait. In this case the caller could retry. + FileSegmentPtr getOrWait( + const S3::S3FilenameView & s3_fname, + const std::optional & filesize = std::nullopt); void bgDownload(const String & s3_key, FileSegmentPtr & file_seg); + void fgDownload(const String & s3_key, FileSegmentPtr & file_seg); void download(const String & s3_key, FileSegmentPtr & file_seg); void downloadImpl(const String & s3_key, FileSegmentPtr & file_seg); diff --git a/dbms/src/Storages/S3/FileCachePerf.cpp b/dbms/src/Storages/S3/FileCachePerf.cpp new file mode 100644 index 00000000000..937dd3ff2ea --- /dev/null +++ b/dbms/src/Storages/S3/FileCachePerf.cpp @@ -0,0 +1,22 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::PerfContext +{ + +thread_local FileCachePerfContext file_cache = {}; + +} diff --git a/dbms/src/Storages/S3/FileCachePerf.h b/dbms/src/Storages/S3/FileCachePerf.h new file mode 100644 index 00000000000..e206de87f68 --- /dev/null +++ b/dbms/src/Storages/S3/FileCachePerf.h @@ -0,0 +1,37 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +/// Remove the population of thread_local from Poco +#ifdef thread_local +#undef thread_local +#endif + +namespace DB::PerfContext +{ + +struct FileCachePerfContext +{ + size_t fg_download_from_s3 = 0; + size_t fg_wait_download_from_s3 = 0; + + void reset() { *this = {}; } +}; + +extern thread_local FileCachePerfContext file_cache; + +} // namespace DB::PerfContext diff --git a/dbms/src/TiDB/Schema/TiDB.cpp b/dbms/src/TiDB/Schema/TiDB.cpp index 8a1c060f19d..c9531e2ccf7 100644 --- a/dbms/src/TiDB/Schema/TiDB.cpp +++ b/dbms/src/TiDB/Schema/TiDB.cpp @@ -28,7 +28,9 @@ #include #include #include +#include #include +#include #include #include @@ -406,13 +408,13 @@ try if (vector_index) { - RUNTIME_CHECK(vector_index->kind != VectorIndexKind::INVALID); - RUNTIME_CHECK(vector_index->distance_metric != DistanceMetric::INVALID); + RUNTIME_CHECK(vector_index->kind != tipb::VectorIndexKind::INVALID_INDEX_KIND); + RUNTIME_CHECK(vector_index->distance_metric != tipb::VectorDistanceMetric::INVALID_DISTANCE_METRIC); Poco::JSON::Object::Ptr vector_index_json = new Poco::JSON::Object(); - vector_index_json->set("kind", String(magic_enum::enum_name(vector_index->kind))); + vector_index_json->set("kind", tipb::VectorIndexKind_Name(vector_index->kind)); vector_index_json->set("dimension", vector_index->dimension); - vector_index_json->set("distance_metric", String(magic_enum::enum_name(vector_index->distance_metric))); + vector_index_json->set("distance_metric", tipb::VectorDistanceMetric_Name(vector_index->distance_metric)); json->set("vector_index", vector_index_json); } @@ -470,22 +472,31 @@ try auto vector_index_json = json->getObject("vector_index"); if (vector_index_json) { - vector_index = std::make_shared(); - - auto vector_kind = magic_enum::enum_cast(vector_index_json->getValue("kind")); - RUNTIME_CHECK(vector_kind.has_value()); - RUNTIME_CHECK(vector_kind.value() != VectorIndexKind::INVALID); - vector_index->kind = vector_kind.value(); - - vector_index->dimension = vector_index_json->getValue("dimension"); - RUNTIME_CHECK(vector_index->dimension > 0); - RUNTIME_CHECK(vector_index->dimension <= 16383); // Just a protection - - auto distance_metric - = magic_enum::enum_cast(vector_index_json->getValue("distance_metric")); - RUNTIME_CHECK(distance_metric.has_value()); - RUNTIME_CHECK(distance_metric.value() != DistanceMetric::INVALID); - vector_index->distance_metric = distance_metric.value(); + tipb::VectorIndexKind kind = tipb::VectorIndexKind::INVALID_INDEX_KIND; + auto kind_field = vector_index_json->getValue("kind"); + auto ok = tipb::VectorIndexKind_Parse( // + kind_field, + &kind); + RUNTIME_CHECK_MSG(ok, "invalid kind of vector index, {}", kind_field); + RUNTIME_CHECK(kind != tipb::VectorIndexKind::INVALID_INDEX_KIND); + + auto dimension = vector_index_json->getValue("dimension"); + RUNTIME_CHECK(dimension > 0); + RUNTIME_CHECK(dimension <= TiDB::MAX_VECTOR_DIMENSION); // Just a protection + + tipb::VectorDistanceMetric distance_metric = tipb::VectorDistanceMetric::INVALID_DISTANCE_METRIC; + auto distance_metric_field = vector_index_json->getValue("distance_metric"); + ok = tipb::VectorDistanceMetric_Parse( // + distance_metric_field, + &distance_metric); + RUNTIME_CHECK_MSG(ok, "invalid distance_metric of vector index, {}", distance_metric_field); + RUNTIME_CHECK(distance_metric != tipb::VectorDistanceMetric::INVALID_DISTANCE_METRIC); + + vector_index = std::make_shared(VectorIndexDefinition{ + .kind = kind, + .dimension = dimension, + .distance_metric = distance_metric, + }); } } catch (const Poco::Exception & e) diff --git a/dbms/src/TiDB/Schema/TiDB.h b/dbms/src/TiDB/Schema/TiDB.h index d68e676d707..ee33b8c316a 100644 --- a/dbms/src/TiDB/Schema/TiDB.h +++ b/dbms/src/TiDB/Schema/TiDB.h @@ -129,7 +129,7 @@ struct ColumnInfo String comment; // TODO(vector-index): This index will be moved to the table level later - VectorIndexInfoPtr vector_index = nullptr; + VectorIndexDefinitionPtr vector_index = nullptr; #ifdef M #error "Please undefine macro M first." diff --git a/dbms/src/TiDB/Schema/VectorIndex.h b/dbms/src/TiDB/Schema/VectorIndex.h index 71437ca44cd..2af77b2e3e6 100644 --- a/dbms/src/TiDB/Schema/VectorIndex.h +++ b/dbms/src/TiDB/Schema/VectorIndex.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -23,80 +24,49 @@ namespace TiDB { -enum class VectorIndexKind +// Constructed from table definition. +struct VectorIndexDefinition { - INVALID = 0, - - // Note: Field names must match TiDB's enum definition. - HNSW, -}; - -enum class DistanceMetric -{ - INVALID = 0, + tipb::VectorIndexKind kind = tipb::VectorIndexKind::INVALID_INDEX_KIND; + UInt64 dimension = 0; + tipb::VectorDistanceMetric distance_metric = tipb::VectorDistanceMetric::INVALID_DISTANCE_METRIC; - // Note: Field names must match TiDB's enum definition. - L1, - L2, - COSINE, - INNER_PRODUCT, + // TODO(vector-index): There are possibly more fields, like efConstruct. + // Will be added later. }; +// As this is constructed from TiDB's table definition, we should not +// ever try to modify it anyway. +using VectorIndexDefinitionPtr = std::shared_ptr; -struct VectorIndexInfo -{ - VectorIndexKind kind = VectorIndexKind::INVALID; - UInt64 dimension = 0; - DistanceMetric distance_metric = DistanceMetric::INVALID; -}; - -using VectorIndexInfoPtr = std::shared_ptr; +// Defined in TiDB pkg/types/vector.go +static constexpr Int64 MAX_VECTOR_DIMENSION = 16383; } // namespace TiDB template <> -struct fmt::formatter -{ - static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } - - template - auto format(const TiDB::VectorIndexKind & v, FormatContext & ctx) const -> decltype(ctx.out()) - { - return fmt::format_to(ctx.out(), "{}", magic_enum::enum_name(v)); - } -}; - -template <> -struct fmt::formatter -{ - static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } - - template - auto format(const TiDB::DistanceMetric & d, FormatContext & ctx) const -> decltype(ctx.out()) - { - return fmt::format_to(ctx.out(), "{}", magic_enum::enum_name(d)); - } -}; - -template <> -struct fmt::formatter +struct fmt::formatter { static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } template - auto format(const TiDB::VectorIndexInfo & vi, FormatContext & ctx) const -> decltype(ctx.out()) + auto format(const TiDB::VectorIndexDefinition & vi, FormatContext & ctx) const -> decltype(ctx.out()) { - return fmt::format_to(ctx.out(), "{}:{}", vi.kind, vi.distance_metric); + return fmt::format_to( + ctx.out(), // + "{}:{}", + tipb::VectorIndexKind_Name(vi.kind), + tipb::VectorDistanceMetric_Name(vi.distance_metric)); } }; template <> -struct fmt::formatter +struct fmt::formatter { static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } template - auto format(const TiDB::VectorIndexInfoPtr & vi, FormatContext & ctx) const -> decltype(ctx.out()) + auto format(const TiDB::VectorIndexDefinitionPtr & vi, FormatContext & ctx) const -> decltype(ctx.out()) { if (!vi) return fmt::format_to(ctx.out(), "");