diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h index 3874eca65c3..91122e39ab8 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h @@ -46,6 +46,12 @@ class BitmapFilterView return BitmapFilterView(std::make_shared(size, default_value), 0, size); } + BitmapFilterView createSubView(UInt32 offset, UInt32 size) const + { + RUNTIME_CHECK(offset + size <= filter_size, offset, size, filter_size); + return BitmapFilterView(filter, filter_offset + offset, size); + } + // Caller should ensure n in [0, size). inline bool get(UInt32 n) const { return filter->get(filter_offset + n); } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.cpp index b343d70abe1..78b3fd10b29 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.cpp @@ -39,8 +39,9 @@ size_t ColumnFileSetInputStream::skipNextBlock() return 0; } -Block ColumnFileSetInputStream::read() +Block ColumnFileSetInputStream::read(FilterPtr & res_filter, bool) { + res_filter = nullptr; while (cur_column_file_reader != reader.column_file_readers.end()) { if (*cur_column_file_reader == nullptr) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.h index 55fa74db678..8ef6cfd095b 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.h @@ -47,7 +47,13 @@ class ColumnFileSetInputStream : public SkippableBlockInputStream size_t skipNextBlock() override; - Block read() override; + Block read() override + { + FilterPtr filter = nullptr; + return read(filter, false); + } + + Block read(FilterPtr & res_filter, bool return_filter) override; Block readWithFilter(const IColumn::Filter & filter) override; }; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h index 031dc66662b..2ef374d47b8 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h @@ -25,6 +25,7 @@ namespace DB::DM class ColumnFileSetReader { friend class ColumnFileSetInputStream; + friend class ColumnFileSetWithVectorIndexInputStream; private: const DMContext & context; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp new file mode 100644 index 00000000000..957942471e7 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp @@ -0,0 +1,262 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + + +namespace DB::DM +{ + +ColumnFileSetInputStreamPtr ColumnFileSetWithVectorIndexInputStream::tryBuild( + const DMContext & context, + const ColumnFileSetSnapshotPtr & delta_snap, + const ColumnDefinesPtr & col_defs, + const RowKeyRange & segment_range_, + const IColumnFileDataProviderPtr & data_provider, + const RSOperatorPtr & rs_operator, + const BitmapFilterPtr & bitmap_filter, + size_t offset, + ReadTag read_tag_) +{ + auto fallback = [&]() { + return std::make_shared(context, delta_snap, col_defs, segment_range_, read_tag_); + }; + + if (rs_operator == nullptr || bitmap_filter == nullptr) + return fallback(); + + auto filter_with_ann = std::dynamic_pointer_cast(rs_operator); + if (filter_with_ann == nullptr) + return fallback(); + + auto ann_query_info = filter_with_ann->ann_query_info; + if (!ann_query_info) + return fallback(); + + // Fast check: ANNQueryInfo is available in the whole read path. However we may not reading vector column now. + bool is_matching_ann_query = false; + for (const auto & cd : *col_defs) + { + if (cd.id == ann_query_info->column_id()) + { + is_matching_ann_query = true; + break; + } + } + if (!is_matching_ann_query) + return fallback(); + + std::optional vec_cd; + auto rest_columns = std::make_shared(); + rest_columns->reserve(col_defs->size() - 1); + for (const auto & cd : *col_defs) + { + if (cd.id == ann_query_info->column_id()) + vec_cd.emplace(cd); + else + rest_columns->emplace_back(cd); + } + + // No vector index column is specified, just use the normal logic. + if (!vec_cd.has_value()) + return fallback(); + + // All check passed. Let's read via vector index. + return std::make_shared( + context, + delta_snap, + col_defs, + segment_range_, + data_provider, + ann_query_info, + BitmapFilterView(bitmap_filter, offset, delta_snap->getRows()), + std::move(*vec_cd), + rest_columns, + read_tag_); +} + +Block ColumnFileSetWithVectorIndexInputStream::read(FilterPtr & res_filter, bool return_filter) +{ + if (return_filter) + return readImpl(res_filter); + + // If return_filter == false, we must filter by ourselves. + + FilterPtr filter = nullptr; + auto res = readImpl(filter); + if (filter != nullptr) + { + auto passed_count = countBytesInFilter(*filter); + for (auto & col : res) + col.column = col.column->filter(*filter, passed_count); + } + // filter == nullptr means all rows are valid and no need to filter. + return res; +} + +Block ColumnFileSetWithVectorIndexInputStream::readOtherColumns() +{ + auto reset_column_file_reader = (*cur_column_file_reader)->createNewReader(rest_col_defs, ReadTag::Query); + Block block = reset_column_file_reader->readNextBlock(); + return block; +} + +void ColumnFileSetWithVectorIndexInputStream::toNextFile(size_t current_file_index, size_t current_file_rows) +{ + (*cur_column_file_reader).reset(); + ++cur_column_file_reader; + read_rows += current_file_rows; + tiny_readers[current_file_index].reset(); +} + +Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter) +{ + load(); + + while (cur_column_file_reader != reader.column_file_readers.end()) + { + // Skip ColumnFileDeleteRange + if (*cur_column_file_reader == nullptr) + { + ++cur_column_file_reader; + continue; + } + auto current_file_index = std::distance(reader.column_file_readers.begin(), cur_column_file_reader); + // If has index, we can read the column by vector index. + if (tiny_readers[current_file_index] != nullptr) + { + const auto file_rows = column_files[current_file_index]->getRows(); + auto selected_row_begin = std::lower_bound( + selected_rows.cbegin(), + selected_rows.cend(), + read_rows, + [](const auto & row, UInt32 offset) { return row.key < offset; }); + auto selected_row_end = std::lower_bound( + selected_row_begin, + selected_rows.cend(), + read_rows + file_rows, + [](const auto & row, UInt32 offset) { return row.key < offset; }); + size_t selected_rows = std::distance(selected_row_begin, selected_row_end); + // If all rows are filtered out, skip this file. + if (selected_rows == 0) + { + toNextFile(current_file_index, file_rows); + continue; + } + + // read vector type column by vector index + auto tiny_reader = tiny_readers[current_file_index]; + auto vec_column = vec_cd.type->createColumn(); + const std::span file_selected_rows{selected_row_begin, selected_row_end}; + tiny_reader->read(vec_column, file_selected_rows, /* rowid_start_offset= */ read_rows, file_rows); + assert(vec_column->size() == file_rows); + + Block block; + if (!rest_col_defs->empty()) + { + block = readOtherColumns(); + assert(block.rows() == vec_column->size()); + } + + auto index = header.getPositionByName(vec_cd.name); + block.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name)); + + // Fill res_filter + if (selected_rows == file_rows) + { + res_filter = nullptr; + } + else + { + filter.clear(); + filter.resize_fill(file_rows, 0); + for (const auto & [rowid, _] : file_selected_rows) + filter[rowid - read_rows] = 1; + res_filter = &filter; + } + + // All rows in this ColumnFileTiny have been read. + block.setStartOffset(read_rows); + toNextFile(current_file_index, file_rows); + return block; + } + auto block = (*cur_column_file_reader)->readNextBlock(); + if (block) + { + block.setStartOffset(read_rows); + read_rows += block.rows(); + res_filter = nullptr; + return block; + } + else + { + (*cur_column_file_reader).reset(); + ++cur_column_file_reader; + } + } + return {}; +} + +void ColumnFileSetWithVectorIndexInputStream::load() +{ + if (loaded) + return; + + tiny_readers.reserve(column_files.size()); + UInt32 precedes_rows = 0; + for (const auto & column_file : column_files) + { + if (auto * tiny_file = column_file->tryToTinyFile(); + tiny_file && tiny_file->hasIndex(ann_query_info->index_id())) + { + auto tiny_reader = std::make_shared( + *tiny_file, + data_provider, + ann_query_info, + valid_rows.createSubView(precedes_rows, tiny_file->getRows()), + vec_cd, + vec_index_cache); + auto sr = tiny_reader->load(); + for (auto & row : sr) + row.key += precedes_rows; + selected_rows.insert(selected_rows.end(), sr.begin(), sr.end()); + tiny_readers.push_back(tiny_reader); + // avoid virutal function call + precedes_rows += tiny_file->getRows(); + } + else + { + tiny_readers.push_back(nullptr); + precedes_rows += column_file->getRows(); + } + } + // Keep the top k minimum distances rows. + auto select_size = selected_rows.size() > ann_query_info->top_k() ? ann_query_info->top_k() : selected_rows.size(); + auto top_k_end = selected_rows.begin() + select_size; + std::nth_element(selected_rows.begin(), top_k_end, selected_rows.end(), [](const auto & lhs, const auto & rhs) { + return lhs.distance < rhs.distance; + }); + selected_rows.resize(select_size); + // Sort by key again. + std::sort(selected_rows.begin(), selected_rows.end(), [](const auto & lhs, const auto & rhs) { + return lhs.key < rhs.key; + }); + + loaded = true; +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h new file mode 100644 index 00000000000..54a34cb4a97 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h @@ -0,0 +1,101 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + + +namespace DB::DM +{ + +class ColumnFileSetWithVectorIndexInputStream : public ColumnFileSetInputStream +{ +private: + const IColumnFileDataProviderPtr data_provider; + const ANNQueryInfoPtr ann_query_info; + const BitmapFilterView valid_rows; + // Global vector index cache + const VectorIndexCachePtr vec_index_cache; + const ColumnDefine vec_cd; + const ColumnDefinesPtr rest_col_defs; + + // Set after load(). Top K search results in files with vector index. + std::vector selected_rows; + std::vector tiny_readers; + + ColumnFiles & column_files; + + const Block header; + IColumn::Filter filter; + + bool loaded = false; + +public: + ColumnFileSetWithVectorIndexInputStream( + const DMContext & context_, + const ColumnFileSetSnapshotPtr & delta_snap_, + const ColumnDefinesPtr & col_defs_, + const RowKeyRange & segment_range_, + const IColumnFileDataProviderPtr & data_provider_, + const ANNQueryInfoPtr & ann_query_info_, + const BitmapFilterView && valid_rows_, + ColumnDefine && vec_cd_, + const ColumnDefinesPtr & rest_col_defs_, + ReadTag read_tag_) + : ColumnFileSetInputStream(context_, delta_snap_, col_defs_, segment_range_, read_tag_) + , data_provider(data_provider_) + , ann_query_info(ann_query_info_) + , valid_rows(std::move(valid_rows_)) + , vec_index_cache(context_.global_context.getVectorIndexCache()) + , vec_cd(std::move(vec_cd_)) + , rest_col_defs(rest_col_defs_) + , column_files(reader.snapshot->getColumnFiles()) + , header(getHeader()) + {} + + static ColumnFileSetInputStreamPtr tryBuild( + const DMContext & context, + const ColumnFileSetSnapshotPtr & delta_snap, + const ColumnDefinesPtr & col_defs, + const RowKeyRange & segment_range_, + const IColumnFileDataProviderPtr & data_provider, + const RSOperatorPtr & rs_operator, + const BitmapFilterPtr & bitmap_filter, + size_t offset, + ReadTag read_tag_); + + String getName() const override { return "ColumnFileSetWithVectorIndex"; } + + // When all rows in block are not filtered out, + // `res_filter` will be set to null. + // The caller needs to do handle this situation. + Block read(FilterPtr & res_filter, bool return_filter) override; + +private: + Block readImpl(FilterPtr & res_filter); + + Block readOtherColumns(); + + void toNextFile(size_t current_file_index, size_t current_file_rows); + + void load(); +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h index 25ca1746605..b1fc853946e 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h @@ -38,6 +38,7 @@ class ColumnFileTiny : public ColumnFilePersisted public: friend class ColumnFileTinyReader; friend class ColumnFileTinyVectorIndexWriter; + friend class ColumnFileTinyVectorIndexReader; friend struct Remote::Serializer; struct IndexInfo diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp new file mode 100644 index 00000000000..789d3c5a530 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.cpp @@ -0,0 +1,156 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + + +namespace DB::DM +{ + +void ColumnFileTinyVectorIndexReader::read( + MutableColumnPtr & vec_column, + const std::span & read_rowids, + size_t rowid_start_offset, + size_t read_rows) +{ + RUNTIME_CHECK(loaded); + + Stopwatch watch; + vec_column->reserve(read_rows); + std::vector value; + size_t current_rowid = rowid_start_offset; + for (const auto & [rowid, _] : read_rowids) + { + vec_index->get(rowid, value); + if (rowid > current_rowid) + { + UInt32 nulls = rowid - current_rowid; + // Insert [] if column is Not Null, or NULL if column is Nullable + vec_column->insertManyDefaults(nulls); + } + vec_column->insertData(reinterpret_cast(value.data()), value.size() * sizeof(Float32)); + current_rowid = rowid + 1; + } + if (current_rowid < rowid_start_offset + read_rows) + { + UInt32 nulls = rowid_start_offset + read_rows - current_rowid; + vec_column->insertManyDefaults(nulls); + } + + perf_stat.returned_rows = read_rowids.size(); + perf_stat.read_vec_column_seconds = watch.elapsedSeconds(); +} + +std::vector ColumnFileTinyVectorIndexReader::load() +{ + if (loaded) + return {}; + + Stopwatch watch; + + loadVectorIndex(); + auto search_results = loadVectorSearchResult(); + + perf_stat.load_vec_index_and_results_seconds = watch.elapsedSeconds(); + + loaded = true; + return search_results; +} + +void ColumnFileTinyVectorIndexReader::loadVectorIndex() +{ + const auto & index_infos = tiny_file.index_infos; + if (!index_infos || index_infos->empty()) + return; + auto index_id = ann_query_info->index_id(); + const auto index_info_iter + = std::find_if(index_infos->cbegin(), index_infos->cend(), [index_id](const auto & info) { + if (!info.vector_index) + return false; + return info.vector_index->index_id() == index_id; + }); + if (index_info_iter == index_infos->cend()) + return; + auto vector_index = index_info_iter->vector_index; + if (!vector_index) + return; + auto index_page_id = index_info_iter->index_page_id; + auto load_from_page_storage = [&]() { + perf_stat.load_from_cache = false; + std::vector index_fields = {0}; + auto index_page = data_provider->readTinyData(index_page_id, index_fields); + ReadBufferFromOwnString read_buf(index_page.data); + CompressedReadBuffer compressed(read_buf); + return VectorIndexViewer::load(*vector_index, compressed); + }; + if (vec_index_cache) + { + const auto key = fmt::format("{}{}", VectorIndexCache::COLUMNFILETINY_INDEX_NAME_PREFIX, index_page_id); + vec_index = vec_index_cache->getOrSet(key, load_from_page_storage); + } + else + vec_index = load_from_page_storage(); +} + +ColumnFileTinyVectorIndexReader::~ColumnFileTinyVectorIndexReader() +{ + LOG_DEBUG( + log, + "Finish vector search over column tiny_{}/{}(cid={}, rows={}){} cached, cost_[search/read]={:.3f}s/{:.3f}s " + "top_k_[query/visited/discarded/result]={}/{}/{}/{} ", + tiny_file.getDataPageId(), + vec_cd.name, + vec_cd.id, + tiny_file.getRows(), + perf_stat.load_from_cache ? "" : " not", + + perf_stat.load_vec_index_and_results_seconds, + perf_stat.read_vec_column_seconds, + + ann_query_info->top_k(), + perf_stat.visited_nodes, // Visited nodes will be larger than query_top_k when there are MVCC rows + perf_stat.discarded_nodes, // How many nodes are skipped by MVCC + perf_stat.returned_rows); +} + +std::vector ColumnFileTinyVectorIndexReader::loadVectorSearchResult() +{ + auto perf_begin = PerfContext::vector_search; + RUNTIME_CHECK(valid_rows.size() == tiny_file.getRows(), valid_rows.size(), tiny_file.getRows()); + + auto search_results = vec_index->searchWithDistance(ann_query_info, valid_rows); + // Sort by key + std::sort(search_results.begin(), search_results.end(), [](const auto & lhs, const auto & rhs) { + return lhs.key < rhs.key; + }); + // results must not contain duplicates. Usually there should be no duplicates. + search_results.erase( + std::unique( + search_results.begin(), + search_results.end(), + [](const auto & lhs, const auto & rhs) { return lhs.key == rhs.key; }), + search_results.end()); + + perf_stat.discarded_nodes = PerfContext::vector_search.discarded_nodes - perf_begin.discarded_nodes; + perf_stat.visited_nodes = PerfContext::vector_search.visited_nodes - perf_begin.visited_nodes; + return search_results; +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h new file mode 100644 index 00000000000..c1a9ab501f6 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h @@ -0,0 +1,98 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + + +namespace DB::DM +{ + +class ColumnFileTinyVectorIndexReader +{ +private: + const ColumnFileTiny & tiny_file; + const IColumnFileDataProviderPtr data_provider; + + const ANNQueryInfoPtr ann_query_info; + // Set after load(). + VectorIndexViewerPtr vec_index; + const BitmapFilterView valid_rows; + // Note: ColumnDefine comes from read path does not have vector_index fields. + const ColumnDefine vec_cd; + // Global vector index cache + const VectorIndexCachePtr vec_index_cache; + LoggerPtr log; + + // Performance statistics + struct PerfStat + { + double load_vec_index_and_results_seconds = 0; + double read_vec_column_seconds = 0; + size_t discarded_nodes = 0; + size_t visited_nodes = 0; + size_t returned_rows = 0; + // Whether the vector index is loaded from cache. + bool load_from_cache = true; + }; + PerfStat perf_stat; + + // Whether the vector index and search results are loaded. + bool loaded = false; + +public: + ColumnFileTinyVectorIndexReader( + const ColumnFileTiny & tiny_file_, + const IColumnFileDataProviderPtr & data_provider_, + const ANNQueryInfoPtr & ann_query_info_, + const BitmapFilterView && valid_rows_, + const ColumnDefine & vec_cd_, + const VectorIndexCachePtr & vec_index_cache_) + : tiny_file(tiny_file_) + , data_provider(data_provider_) + , ann_query_info(ann_query_info_) + , valid_rows(std::move(valid_rows_)) + , vec_cd(vec_cd_) + , vec_index_cache(vec_index_cache_) + , log(Logger::get()) + {} + + ~ColumnFileTinyVectorIndexReader(); + + // Read vector column data and set filter. + // The column will be as same as as the rows of the tiny file, + // but only the rows in selected_rows will be filled, + // others will be filled with default values. + void read( + MutableColumnPtr & vec_column, + const std::span & read_rowids, + size_t rowid_start_offset, + size_t read_rows); + + // Load vector index and search results. + // Return the rowids of the selected rows. + std::vector load(); + +private: + void loadVectorIndex(); + std::vector loadVectorSearchResult(); +}; + +using ColumnFileTinyVectorIndexReaderPtr = std::shared_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index 339200af11c..c615dbfed66 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -264,20 +264,13 @@ bool ColumnFilePersistedSet::checkAndIncreaseFlushVersion(size_t task_flush_vers bool ColumnFilePersistedSet::appendPersistedColumnFiles(const ColumnFilePersisteds & column_files, WriteBatches & wbs) { - ColumnFilePersisteds new_persisted_files; - for (const auto & file : persisted_files) - { - new_persisted_files.push_back(file); - } - for (const auto & file : column_files) - { - new_persisted_files.push_back(file); - } - /// Save the new metadata of column files to disk. + ColumnFilePersisteds new_persisted_files{persisted_files}; + new_persisted_files.insert(new_persisted_files.end(), column_files.begin(), column_files.end()); + // Save the new metadata of column files to disk. serializeColumnFilePersisteds(wbs, metadata_id, new_persisted_files); wbs.writeMeta(); - /// Commit updates in memory. + // Commit updates in memory. persisted_files.swap(new_persisted_files); updateColumnFileStats(); LOG_DEBUG( @@ -290,19 +283,17 @@ bool ColumnFilePersistedSet::appendPersistedColumnFiles(const ColumnFilePersiste return true; } -bool ColumnFilePersistedSet::updatePersistedColumnFiles( +bool ColumnFilePersistedSet::updatePersistedColumnFilesAfterAddingIndex( const ColumnFilePersisteds & new_persisted_files, WriteBatches & wbs) { - /// Save the new metadata of column files to disk. + // Save the new metadata of column files to disk. serializeColumnFilePersisteds(wbs, metadata_id, new_persisted_files); wbs.writeMeta(); - /// Commit updates in memory. + // Commit updates in memory. persisted_files = std::move(new_persisted_files); - updateColumnFileStats(); - LOG_DEBUG(log, "{}, after update column files, persisted column files: {}", info(), detailInfo()); - + // After adding index, the stats of column files will not change. return true; } diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index bac6d10e2a1..614980787d5 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -156,7 +156,9 @@ class ColumnFilePersistedSet bool appendPersistedColumnFiles(const ColumnFilePersisteds & column_files, WriteBatches & wbs); - bool updatePersistedColumnFiles(const ColumnFilePersisteds & new_persisted_files, WriteBatches & wbs); + bool updatePersistedColumnFilesAfterAddingIndex( + const ColumnFilePersisteds & new_persisted_files, + WriteBatches & wbs); /// Choose all small column files that can be compacted to larger column files MinorCompactionPtr pickUpMinorCompaction(DMContext & context); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index da74822051e..0aa82ab355c 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -905,6 +905,12 @@ class DeltaMergeStore */ bool segmentEnsureStableLocalIndexAsync(const SegmentPtr & segment); +#ifndef DBMS_PUBLIC_GTEST +private: +#else +public: +#endif + /** * Ensure the segment has delta index. * If the segment has no delta index, it will be built in background. @@ -914,12 +920,6 @@ class DeltaMergeStore */ bool segmentEnsureDeltaLocalIndexAsync(const SegmentPtr & segment); -#ifndef DBMS_PUBLIC_GTEST -private: -#else -public: -#endif - void applyLocalIndexChange(const TiDB::TableInfo & new_table_info); /** @@ -929,7 +929,7 @@ class DeltaMergeStore * * @returns false if index is still missing after wait timed out. */ - bool segmentWaitDeltaIndexReady(const SegmentPtr & segment) const; + bool segmentWaitDeltaLocalIndexReady(const SegmentPtr & segment) const; /** * Wait until the segment has stable index. diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index c547325675a..0a74564f728 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -927,7 +927,7 @@ bool DeltaMergeStore::segmentEnsureDeltaLocalIndexAsync(const SegmentPtr & segme } } -bool DeltaMergeStore::segmentWaitDeltaIndexReady(const SegmentPtr & segment) const +bool DeltaMergeStore::segmentWaitDeltaLocalIndexReady(const SegmentPtr & segment) const { RUNTIME_CHECK(segment != nullptr); @@ -1123,7 +1123,7 @@ void DeltaMergeStore::segmentEnsureDeltaLocalIndex( column_file = iter->second; } - delta_persisted_file_set->updatePersistedColumnFiles(delta_persisted_column_files, wbs); + delta_persisted_file_set->updatePersistedColumnFilesAfterAddingIndex(delta_persisted_column_files, wbs); LOG_INFO( log, "EnsureDeltaLocalIndex - Finish building index, cost {:.3f}s, delta={} source_segment={}", diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp index 432bab7c9d6..175869383c4 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -232,19 +232,15 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats() UNUSED(handle); // Delta + const auto & delta = segment->getDelta(); + if (const auto lock = delta->getLock(); lock) { - const auto & delta = segment->getDelta(); - auto lock = delta->getLock(); - if (!lock) - continue; const auto & mem_table = delta->getMemTableSet(); index_stats.rows_delta_not_indexed += mem_table->getRows(); const auto & persisted = delta->getPersistedFileSet(); for (const auto & file : persisted->getFiles()) { - // TODO: this is not efficient, we can maintain the indexed_rows in ColumnFilePersisted - const auto * tiny_file = file->tryToTinyFile(); - if (tiny_file) + if (const auto * tiny_file = file->tryToTinyFile(); tiny_file) { if (tiny_file->hasIndex(index_stats.index_id)) index_stats.rows_delta_indexed += tiny_file->getRows(); diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp index 652d595281e..cfdce81a80e 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.cpp @@ -74,4 +74,25 @@ VectorIndexViewerPtr VectorIndexViewer::view(const dtpb::VectorIndexFileProps & } } + +VectorIndexViewerPtr VectorIndexViewer::load(const dtpb::VectorIndexFileProps & file_props, ReadBuffer & buf) +{ + RUNTIME_CHECK(file_props.dimensions() > 0); + RUNTIME_CHECK(file_props.dimensions() <= TiDB::MAX_VECTOR_DIMENSION); + + tipb::VectorIndexKind kind; + RUNTIME_CHECK(tipb::VectorIndexKind_Parse(file_props.index_kind(), &kind)); + + switch (kind) + { + case tipb::VectorIndexKind::HNSW: + return VectorIndexHNSWViewer::load(file_props, buf); + default: + throw Exception( // + ErrorCodes::INCORRECT_QUERY, + "Unsupported vector index {}", + file_props.index_kind()); + } +} + } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h index f95bac95d2e..6cc7cbf1bc6 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h @@ -72,6 +72,13 @@ class VectorIndexViewer public: /// The key is the row's offset in the DMFile. using Key = VectorIndexBuilder::Key; + using Distance = Float32; + + struct SearchResult + { + Key key; + Distance distance; + }; /// True bit means the row is valid and should be kept in the search result. /// False bit lets the row filtered out and will search for more results. @@ -79,6 +86,7 @@ class VectorIndexViewer public: static VectorIndexViewerPtr view(const dtpb::VectorIndexFileProps & file_props, std::string_view path); + static VectorIndexViewerPtr load(const dtpb::VectorIndexFileProps & file_props, ReadBuffer & buf); public: explicit VectorIndexViewer(const dtpb::VectorIndexFileProps & file_props_) @@ -89,6 +97,10 @@ class VectorIndexViewer // Invalid rows in `valid_rows` will be discared when applying the search virtual std::vector search(const ANNQueryInfoPtr & queryInfo, const RowFilter & valid_rows) const = 0; + virtual std::vector searchWithDistance( + const ANNQueryInfoPtr & query_info, + const RowFilter & valid_rows) const + = 0; virtual size_t size() const = 0; diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h index 1a82496b8bc..c76ab4f6ed4 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h @@ -62,6 +62,7 @@ class VectorIndexCache std::thread cleaner_thread; public: + static constexpr const char * COLUMNFILETINY_INDEX_NAME_PREFIX = "vec_index_page_"; explicit VectorIndexCache(size_t max_entities); ~VectorIndexCache(); @@ -69,6 +70,7 @@ class VectorIndexCache template Cache::MappedPtr getOrSet(const Cache::Key & file_path, LoadFunc && load) { + if (!file_path.starts_with(COLUMNFILETINY_INDEX_NAME_PREFIX)) { std::scoped_lock lock(mu); files_to_check.insert(file_path); diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp index a17111724b2..063df3feacb 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp @@ -23,14 +23,12 @@ #include #include -#include #include namespace DB::ErrorCodes { extern const int INCORRECT_DATA; extern const int INCORRECT_QUERY; -extern const int CANNOT_ALLOCATE_MEMORY; extern const int ABORTED; } // namespace DB::ErrorCodes @@ -208,9 +206,45 @@ VectorIndexViewerPtr VectorIndexHNSWViewer::view(const dtpb::VectorIndexFileProp return vi; } -std::vector VectorIndexHNSWViewer::search( - const ANNQueryInfoPtr & query_info, - const RowFilter & valid_rows) const +VectorIndexViewerPtr VectorIndexHNSWViewer::load(const dtpb::VectorIndexFileProps & file_props, ReadBuffer & buf) +{ + RUNTIME_CHECK(file_props.index_kind() == tipb::VectorIndexKind_Name(kind())); + + tipb::VectorDistanceMetric metric; + RUNTIME_CHECK(tipb::VectorDistanceMetric_Parse(file_props.distance_metric(), &metric)); + RUNTIME_CHECK(metric != tipb::VectorDistanceMetric::INVALID_DISTANCE_METRIC); + + Stopwatch w; + SCOPE_EXIT({ GET_METRIC(tiflash_vector_index_duration, type_view).Observe(w.elapsedSeconds()); }); + + auto vi = std::make_shared(file_props); + + vi->index = USearchImplType::make( + unum::usearch::metric_punned_t( // + file_props.dimensions(), + getUSearchMetricKind(metric)), + unum::usearch::index_dense_config_t( + unum::usearch::default_connectivity(), + unum::usearch::default_expansion_add(), + 16 /* default is 64 */)); + + // Currently may have a lot of threads querying concurrently + auto limit = unum::usearch::index_limits_t(0, /* threads */ std::thread::hardware_concurrency() * 10); + vi->index.reserve(limit); + + auto result = vi->index.load_from_stream([&](void * buffer, std::size_t length) { + return buf.read(reinterpret_cast(buffer), length) == length; + }); + RUNTIME_CHECK_MSG(result, "Failed to load vector index: {}", result.error.what()); + + auto current_memory_usage = vi->index.memory_usage(); + GET_METRIC(tiflash_vector_index_memory_usage, type_view).Increment(static_cast(current_memory_usage)); + vi->last_reported_memory_usage = current_memory_usage; + + return vi; +} + +auto VectorIndexHNSWViewer::searchImpl(const ANNQueryInfoPtr & query_info, const RowFilter & valid_rows) const { RUNTIME_CHECK(query_info->ref_vec_f32().size() >= sizeof(UInt32)); auto query_vec_size = readLittleEndian(query_info->ref_vec_f32().data()); @@ -223,7 +257,7 @@ std::vector VectorIndexHNSWViewer::search( query_info->index_id(), query_info->column_id()); - RUNTIME_CHECK(query_info->ref_vec_f32().size() >= sizeof(UInt32) + query_vec_size * sizeof(Float32)); + RUNTIME_CHECK(query_info->ref_vec_f32().size() == sizeof(UInt32) + query_vec_size * sizeof(Float32)); if (tipb::VectorDistanceMetric_Name(query_info->distance_metric()) != file_props.distance_metric()) throw Exception( @@ -269,21 +303,51 @@ std::vector VectorIndexHNSWViewer::search( if (has_exception_in_search) throw Exception(ErrorCodes::INCORRECT_QUERY, "Exception happened occurred during search"); - std::vector keys(result.size()); - result.dump_to(keys.data()); - PerfContext::vector_search.visited_nodes += visited_nodes; PerfContext::vector_search.discarded_nodes += discarded_nodes; + return result; +} + +std::vector VectorIndexHNSWViewer::search( + const ANNQueryInfoPtr & query_info, + const RowFilter & valid_rows) const +{ + auto result = searchImpl(query_info, valid_rows); // For some reason usearch does not always do the predicate for all search results. // So we need to filter again. - keys.erase( - std::remove_if(keys.begin(), keys.end(), [&valid_rows](Key key) { return !valid_rows[key]; }), - keys.end()); - + const size_t result_size = result.size(); + std::vector keys; + keys.reserve(result_size); + for (size_t i = 0; i < result_size; ++i) + { + const auto rowid = result[i].member.key; + if (valid_rows[rowid]) + keys.push_back(rowid); + } return keys; } +std::vector VectorIndexHNSWViewer::searchWithDistance( + const ANNQueryInfoPtr & query_info, + const RowFilter & valid_rows) const +{ + auto result = searchImpl(query_info, valid_rows); + + // For some reason usearch does not always do the predicate for all search results. + // So we need to filter again. + const size_t result_size = result.size(); + std::vector search_results; + search_results.reserve(result_size); + for (size_t i = 0; i < result_size; ++i) + { + const auto rowid = result[i].member.key; + if (valid_rows[rowid]) + search_results.emplace_back(rowid, result[i].distance); + } + return search_results; +} + size_t VectorIndexHNSWViewer::size() const { return index.size(); diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h index 380d5bdf9f8..2ad35e4ce19 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h @@ -50,6 +50,7 @@ class VectorIndexHNSWViewer : public VectorIndexViewer { public: static VectorIndexViewerPtr view(const dtpb::VectorIndexFileProps & props, std::string_view path); + static VectorIndexViewerPtr load(const dtpb::VectorIndexFileProps & file_props, ReadBuffer & buf); static tipb::VectorIndexKind kind(); @@ -59,11 +60,16 @@ class VectorIndexHNSWViewer : public VectorIndexViewer std::vector search(const ANNQueryInfoPtr & query_info, const RowFilter & valid_rows) const override; + std::vector searchWithDistance(const ANNQueryInfoPtr & query_info, const RowFilter & valid_rows) + const override; + size_t size() const override; void get(Key key, std::vector & out) const override; private: + auto searchImpl(const ANNQueryInfoPtr & query_info, const RowFilter & valid_rows) const; + USearchImplType index; size_t last_reported_memory_usage = 0; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index efb3730348b..d0b01b06c43 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -1607,6 +1608,8 @@ SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( auto prepared = remote_data_store->prepareDMFile(file_oid, new_data_page_id); auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), b->getFile()->metaVersion()); auto new_column_file = b->cloneWith(dm_context, dmfile, rowkey_range); + // TODO: Do we need to acquire new page id for ColumnFileTiny index page? + // Maybe we even do not need to clone data page: https://github.com/pingcap/tiflash/pull/9436 new_column_file_persisteds.push_back(new_column_file); } else @@ -3327,11 +3330,15 @@ SkippableBlockInputStreamPtr Segment::getConcatSkippableBlockInputStream( columns_to_read_ptr, this->rowkey_range, read_tag); - SkippableBlockInputStreamPtr persisted_files_stream = std::make_shared( + SkippableBlockInputStreamPtr persisted_files_stream = ColumnFileSetWithVectorIndexInputStream::tryBuild( dm_context, persisted_files, columns_to_read_ptr, this->rowkey_range, + persisted_files->getDataProvider(), + filter, + bitmap_filter, + segment_snap->stable->getDMFilesRows(), read_tag); auto stream = std::dynamic_pointer_cast>(stable_stream); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp index b5ba4ea38f6..e899d0ff3f5 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -71,12 +72,23 @@ class DeltaMergeStoreVectorTest return s; } - void write(size_t num_rows_write) + void write(size_t begin, size_t end, bool is_delete = false) { - String sequence = fmt::format("[0, {})", num_rows_write); + String sequence = fmt::format("[{}, {})", begin, end); Block block; { - block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + block = DMTestEnv::prepareSimpleWriteBlock( + begin, + end, + false, + /*tso= */ 3, + /*pk_name_=*/EXTRA_HANDLE_COLUMN_NAME, + /*pk_col_id=*/EXTRA_HANDLE_COLUMN_ID, + /*pk_type=*/EXTRA_HANDLE_COLUMN_INT_TYPE, + /*is_common_handle=*/false, + /*rowkey_column_size=*/1, + /*with_internal_columns=*/true, + is_delete); // Add a column of vector for test block.insert(colVecFloat32(sequence, vec_column_name, vec_column_id)); } @@ -119,6 +131,35 @@ class DeltaMergeStoreVectorTest store->segmentMergeDelta(*dm_context, segment, DeltaMergeStore::MergeDeltaReason::Manual) != nullptr); } + void triggerFlushCacheAndEnsureDeltaLocalIndex() const + { + std::vector all_segments; + { + std::shared_lock lock(store->read_write_mutex); + for (const auto & [_, segment] : store->id_to_segment) + all_segments.push_back(segment); + } + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + for (const auto & segment : all_segments) + { + ASSERT_TRUE(segment->flushCache(*dm_context)); + store->segmentEnsureDeltaLocalIndexAsync(segment); + } + } + + void triggerCompactDelta() const + { + std::vector all_segments; + { + std::shared_lock lock(store->read_write_mutex); + for (const auto & [_, segment] : store->id_to_segment) + all_segments.push_back(segment); + } + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + for (const auto & segment : all_segments) + ASSERT_TRUE(segment->compactDelta(*dm_context)); + } + void waitStableLocalIndexReady() { std::vector all_segments; @@ -131,6 +172,18 @@ class DeltaMergeStoreVectorTest ASSERT_TRUE(store->segmentWaitStableLocalIndexReady(segment)); } + void waitDeltaIndexReady() + { + std::vector all_segments; + { + std::shared_lock lock(store->read_write_mutex); + for (const auto & [_, segment] : store->id_to_segment) + all_segments.push_back(segment); + } + for (const auto & segment : all_segments) + ASSERT_TRUE(store->segmentWaitDeltaLocalIndexReady(segment)); + } + void triggerMergeAllSegments() { auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); @@ -163,12 +216,21 @@ try const size_t num_rows_write = 128; - // write to store - write(num_rows_write); - + // write [0, 128) to store + write(0, num_rows_write); // trigger mergeDelta for all segments triggerMergeDelta(); + // write [128, 256) to store + write(num_rows_write, num_rows_write * 2); + // write delete [0, 64) to store + write(0, num_rows_write / 2, true); + + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + + // check delta index has built for all segments + waitDeltaIndexReady(); // check stable index has built for all segments waitStableLocalIndexReady(); @@ -176,7 +238,7 @@ try // read from store { - read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + read(range, EMPTY_FILTER, colVecFloat32("[64, 256)", vec_column_name, vec_column_id)); } auto ann_query_info = std::make_shared(); @@ -186,21 +248,189 @@ try // read with ANN query { ann_query_info->set_top_k(1); - ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({72.0})); auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + // stable 72.0, delta 128.0 + read(range, filter, createVecFloat32Column({{72.0}, {128.0}})); + } - read(range, filter, createVecFloat32Column({{2.0}})); + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({72.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + // stable 72.0, delta 128.0 + read(range, filter, createVecFloat32Column({{72.0}, {128.0}})); + } +} +CATCH + +TEST_F(DeltaMergeStoreVectorTest, TestMultipleColumnFileTiny) +try +{ + store = reload(); + + const size_t num_rows_write = 128; + + // write [0, 128) to store + write(0, num_rows_write); + + // write [128, 256) to store + write(num_rows_write, num_rows_write * 2); + + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + + // check delta index has built for all segments + waitDeltaIndexReady(); + + const auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); + + // read from store + { + read(range, EMPTY_FILTER, colVecFloat32("[0, 256)", vec_column_name, vec_column_id)); + } + + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + + // read with ANN query + { + ann_query_info->set_top_k(2); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({72.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + read(range, filter, createVecFloat32Column({{72.0}, {73.0}})); } + // read with ANN query + { + ann_query_info->set_top_k(2); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({172.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + read(range, filter, createVecFloat32Column({{172.0}, {173.0}})); + } +} +CATCH + +TEST_F(DeltaMergeStoreVectorTest, TestFlushCache) +try +{ + store = reload(); + + const size_t num_rows_write = 128; + + auto sp_delta_index_built + = SyncPointCtl::enableInScope("DeltaMergeStore::segmentEnsureDeltaLocalIndex_after_build"); + // write [0, 128) to store + write(0, num_rows_write); + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + + // Pause after delta vector index built but not set. + sp_delta_index_built.waitAndPause(); + + // write [128, 130) to store + write(num_rows_write, num_rows_write + 2); + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + + // Now persisted file set has changed. + // Resume + sp_delta_index_built.next(); + + const auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); + + // read from store + { + read(range, EMPTY_FILTER, colVecFloat32("[0, 130)", vec_column_name, vec_column_id)); + } + + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + // read with ANN query { ann_query_info->set_top_k(1); - ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.1})); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({72.0})); auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + // [0, 128) with vector index return 72.0, [128, 130) without vector index return all. + read(range, filter, createVecFloat32Column({{72.0}, {128.0}, {129.0}})); + } - read(range, filter, createVecFloat32Column({{2.0}})); + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({72.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + // [0, 128) with vector index return 72.0, [128, 130) without vector index return all. + read(range, filter, createVecFloat32Column({{72.0}, {128.0}, {129.0}})); + } +} +CATCH + +TEST_F(DeltaMergeStoreVectorTest, TestCompactDelta) +try +{ + store = reload(); + + auto sp_delta_index_built + = SyncPointCtl::enableInScope("DeltaMergeStore::segmentEnsureDeltaLocalIndex_after_build"); + // write [0, 2) to store + write(0, 2); + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + // write [2, 4) to store + write(2, 4); + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + + // Pause after delta vector index built but not set. + sp_delta_index_built.waitAndPause(); + + // compact delta [0, 2) + [2, 4) -> [0, 4) + triggerCompactDelta(); + + // Now persisted file set has changed. + // Resume + sp_delta_index_built.next(); + + const auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); + + // read from store + { + read(range, EMPTY_FILTER, colVecFloat32("[0, 4)", vec_column_name, vec_column_id)); + } + + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + // [0, 4) without vector index return all. + read(range, filter, createVecFloat32Column({{0.0}, {1.0}, {2.0}, {3.0}})); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + // [0, 4) without vector index return all. + read(range, filter, createVecFloat32Column({{0.0}, {1.0}, {2.0}, {3.0}})); } } CATCH @@ -212,12 +442,20 @@ try const size_t num_rows_write = 128; - // write to store - write(num_rows_write); - + // write [0, 128) to store + write(0, num_rows_write); // trigger mergeDelta for all segments triggerMergeDelta(); + // write [128, 256) to store + write(num_rows_write, num_rows_write * 2); + + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + + // check delta index has built for all segments + waitDeltaIndexReady(); + // logical split RowKeyRange left_segment_range; { @@ -227,7 +465,7 @@ try segment = store->segments.begin()->second; } auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); - auto breakpoint = RowKeyValue::fromHandle(num_rows_write / 2); + auto breakpoint = RowKeyValue::fromHandle(num_rows_write); const auto [left, right] = store->segmentSplit( *dm_context, segment, @@ -251,7 +489,7 @@ try read( left_segment_range, EMPTY_FILTER, - colVecFloat32(fmt::format("[0, {})", num_rows_write / 2), vec_column_name, vec_column_id)); + colVecFloat32(fmt::format("[0, {})", num_rows_write), vec_column_name, vec_column_id)); } auto ann_query_info = std::make_shared(); @@ -271,11 +509,11 @@ try // read with ANN query { ann_query_info->set_top_k(1); - ann_query_info->set_ref_vec_f32(encodeVectorFloat32({122.1})); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({222.1})); auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); - read(left_segment_range, filter, createVecFloat32Column({{63.0}})); + read(left_segment_range, filter, createVecFloat32Column({{127.0}})); } // merge segment @@ -288,7 +526,7 @@ try // read from store { - read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + read(range, EMPTY_FILTER, colVecFloat32("[0, 256)", vec_column_name, vec_column_id)); } // read with ANN query @@ -323,12 +561,20 @@ try const size_t num_rows_write = 128; - // write to store - write(num_rows_write); - + // write [0, 128) to store + write(0, num_rows_write); // trigger mergeDelta for all segments triggerMergeDelta(); + // write [128, 256) to store + write(num_rows_write, num_rows_write * 2); + + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + + // check delta index has built for all segments + waitDeltaIndexReady(); + // physical split auto physical_split = [&] { SegmentPtr segment; @@ -337,7 +583,7 @@ try segment = store->segments.begin()->second; } auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); - auto breakpoint = RowKeyValue::fromHandle(num_rows_write / 2); + auto breakpoint = RowKeyValue::fromHandle(num_rows_write); return store->segmentSplit( *dm_context, segment, @@ -355,8 +601,8 @@ try std::tie(left, right) = physical_split(); } - ASSERT_TRUE(left->rowkey_range.end == RowKeyValue::fromHandle(num_rows_write / 2)); - ASSERT_TRUE(right->rowkey_range.start == RowKeyValue::fromHandle(num_rows_write / 2)); + ASSERT_TRUE(left->rowkey_range.end == RowKeyValue::fromHandle(num_rows_write)); + ASSERT_TRUE(right->rowkey_range.start == RowKeyValue::fromHandle(num_rows_write)); RowKeyRange left_segment_range = RowKeyRange( left->rowkey_range.start, left->rowkey_range.end, @@ -371,7 +617,7 @@ try read( left_segment_range, EMPTY_FILTER, - colVecFloat32(fmt::format("[0, {})", num_rows_write / 2), vec_column_name, vec_column_id)); + colVecFloat32(fmt::format("[0, {})", num_rows_write), vec_column_name, vec_column_id)); } auto ann_query_info = std::make_shared(); @@ -391,11 +637,11 @@ try // read with ANN query { ann_query_info->set_top_k(1); - ann_query_info->set_ref_vec_f32(encodeVectorFloat32({122.1})); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({222.1})); auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); - read(left_segment_range, filter, createVecFloat32Column({{63.0}})); + read(left_segment_range, filter, createVecFloat32Column({{127.0}})); } // merge segment @@ -408,7 +654,7 @@ try // read from store { - read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + read(range, EMPTY_FILTER, colVecFloat32("[0, 256)", vec_column_name, vec_column_id)); } // read with ANN query @@ -441,7 +687,7 @@ try const size_t num_rows_write = 128; // write to store - write(num_rows_write); + write(0, num_rows_write); // Prepare DMFile auto [dmfile_parent_path, file_id] = store->preAllocateIngestFile(); @@ -543,12 +789,20 @@ try const size_t num_rows_write = 128; - // write to store - write(num_rows_write); - + // write [0, 128) to store + write(0, num_rows_write); // trigger mergeDelta for all segments triggerMergeDelta(); + // write [128, 256) to store + write(num_rows_write, num_rows_write * 2); + + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + + // check delta index has built for all segments + waitDeltaIndexReady(); + // shutdown store store->shutdown(); @@ -574,7 +828,7 @@ try // read from store { - read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + read(range, EMPTY_FILTER, colVecFloat32("[0, 256)", vec_column_name, vec_column_id)); } auto ann_query_info = std::make_shared(); @@ -588,17 +842,17 @@ try auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); - read(range, filter, createVecFloat32Column({{2.0}})); + read(range, filter, createVecFloat32Column({{2.0}, {128.0}})); } // read with ANN query { ann_query_info->set_top_k(1); - ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.1})); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({222.1})); auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); - read(range, filter, createVecFloat32Column({{2.0}})); + read(range, filter, createVecFloat32Column({{127.0}, {222.0}})); } } CATCH @@ -614,11 +868,14 @@ try const size_t num_rows_write = 128; - // write to store before index built - write(num_rows_write); + // write [0, 128) to store + write(0, num_rows_write); // trigger mergeDelta for all segments triggerMergeDelta(); + // write [128, 256) to store + write(num_rows_write, num_rows_write * 2); + { // Add vecotr index TiDB::TableInfo new_table_info_with_vector_index; @@ -645,6 +902,11 @@ try ASSERT_EQ(store->local_index_infos->size(), 1); } + // trigger FlushCache for all segments + triggerFlushCacheAndEnsureDeltaLocalIndex(); + + // check delta index has built for all segments + waitDeltaIndexReady(); // check stable index has built for all segments waitStableLocalIndexReady(); @@ -652,7 +914,7 @@ try // read from store { - read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + read(range, EMPTY_FILTER, colVecFloat32("[0, 256)", vec_column_name, vec_column_id)); } auto ann_query_info = std::make_shared(); @@ -667,17 +929,17 @@ try auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); - read(range, filter, createVecFloat32Column({{2.0}})); + read(range, filter, createVecFloat32Column({{2.0}, {128.0}})); } // read with ANN query { ann_query_info->set_top_k(1); - ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.1})); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({222.1})); auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); - read(range, filter, createVecFloat32Column({{2.0}})); + read(range, filter, createVecFloat32Column({{127.0}, {222.0}})); } { @@ -706,7 +968,7 @@ try const size_t num_rows_write = 128; // write to store before index built - write(num_rows_write); + write(0, num_rows_write); // trigger mergeDelta for all segments triggerMergeDelta(); @@ -759,7 +1021,7 @@ try const size_t num_rows_write = 128; // write to store before index built - write(num_rows_write); + write(0, num_rows_write); // trigger mergeDelta for all segments triggerMergeDelta(); diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index bc1c9138f3e..475074d6978 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -515,13 +515,6 @@ std::variant StorageDisagg const auto & executor_id = table_scan.getTableScanExecutorID(); auto rs_operator = buildRSOperator(db_context, column_defines); - { - DM::ANNQueryInfoPtr ann_query_info = nullptr; - if (table_scan.getANNQueryInfo().query_type() != tipb::ANNQueryType::InvalidQueryType) - ann_query_info = std::make_shared(table_scan.getANNQueryInfo()); - if (ann_query_info != nullptr) - rs_operator = wrapWithANNQueryInfo(rs_operator, ann_query_info); - } auto push_down_filter = DM::PushDownFilter::build( rs_operator, table_scan.getColumns(),