diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h index 2c720aaf626..25ca1746605 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h @@ -37,6 +37,7 @@ class ColumnFileTiny : public ColumnFilePersisted { public: friend class ColumnFileTinyReader; + friend class ColumnFileTinyVectorIndexWriter; friend struct Remote::Serializer; struct IndexInfo diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexWriter.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexWriter.cpp new file mode 100644 index 00000000000..c7c61e86e5c --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexWriter.cpp @@ -0,0 +1,235 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + + +namespace DB::ErrorCodes +{ +extern const int ABORTED; +} // namespace DB::ErrorCodes + +namespace DB::DM +{ + +ColumnFileTinyVectorIndexWriter::LocalIndexBuildInfo ColumnFileTinyVectorIndexWriter::getLocalIndexBuildInfo( + const LocalIndexInfosSnapshot & index_infos, + const ColumnFilePersistedSetPtr & file_set) +{ + assert(index_infos != nullptr); + + LocalIndexBuildInfo build; + build.indexes_to_build = std::make_shared(); + build.file_ids.reserve(file_set->getColumnFileCount()); + for (const auto & file : file_set->getFiles()) + { + auto * tiny_file = file->tryToTinyFile(); + if (!tiny_file) + continue; + + bool any_new_index_build = false; + for (const auto & index : *index_infos) + { + auto schema = tiny_file->getSchema(); + assert(schema != nullptr); + // The ColumnFileTiny may be built before col_id is added. Skip build indexes for it + if (!schema->getColIdToOffset().contains(index.column_id)) + continue; + + // Index already built, skip + if (tiny_file->hasIndex(index.index_id)) + continue; + + any_new_index_build = true; + // FIXME: the memory usage is not accurate, but it's fine for now. + build.estimated_memory_bytes += tiny_file->getBytes(); + + // Avoid duplicate index build + if (std::find(build.index_ids.begin(), build.index_ids.end(), index.index_id) == build.index_ids.end()) + { + build.indexes_to_build->emplace_back(index); + build.index_ids.emplace_back(index.index_id); + } + } + + if (any_new_index_build) + { + build.file_ids.emplace_back(LocalIndexerScheduler::ColumnFileTinyID(tiny_file->getDataPageId())); + } + } + + build.file_ids.shrink_to_fit(); + return build; +} + +ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile( + const ColumnDefines & column_defines, + const ColumnDefine & del_cd, + const ColumnFileTiny * file, + ProceedCheckFn should_proceed) const +{ + // read_columns are: DEL_MARK, COL_A, COL_B, ... + // index_builders are: COL_A, COL_B, ... + + ColumnDefinesPtr read_columns = std::make_shared(); + read_columns->reserve(options.index_infos->size() + 1); + read_columns->push_back(del_cd); + + std::unordered_map> index_builders; + + std::unordered_map> col_indexes; + for (const auto & index_info : *options.index_infos) + { + if (index_info.type != IndexType::Vector) + continue; + col_indexes[index_info.column_id].emplace_back(index_info); + } + + for (const auto & [col_id, index_infos] : col_indexes) + { + // Make sure the column_id is in the schema. + const auto cd_iter = std::find_if(column_defines.cbegin(), column_defines.cend(), [&](const auto & cd) { + return cd.id == col_id; + }); + RUNTIME_CHECK_MSG( + cd_iter != column_defines.cend(), + "Cannot find column_id={} in file_id={}", + col_id, + file->getDataPageId()); + + for (const auto & idx_info : index_infos) + { + // Just skip if the index is already built + if (file->hasIndex(idx_info.index_id)) + continue; + + index_builders[col_id].emplace_back( + VectorIndexBuilder::create(idx_info.index_id, idx_info.index_definition)); + } + read_columns->push_back(*cd_iter); + } + + // If no index to build, return nullptr + if (read_columns->size() == 1 || index_builders.empty()) + return nullptr; + + // Read all blocks and build index + // TODO: read one column at a time to reduce peak memory usage. + const size_t num_cols = read_columns->size(); + ColumnFileTinyReader reader(*file, options.data_provider, read_columns); + while (true) + { + if (!should_proceed()) + throw Exception(ErrorCodes::ABORTED, "Index build is interrupted"); + + auto block = reader.readNextBlock(); + if (!block) + break; + + RUNTIME_CHECK(block.columns() == read_columns->size()); + RUNTIME_CHECK(block.getByPosition(0).column_id == TAG_COLUMN_ID); + + auto del_mark_col = block.safeGetByPosition(0).column; + RUNTIME_CHECK(del_mark_col != nullptr); + const auto * del_mark = static_cast *>(del_mark_col.get()); + RUNTIME_CHECK(del_mark != nullptr); + + for (size_t col_idx = 1; col_idx < num_cols; ++col_idx) + { + const auto & col_with_type_and_name = block.safeGetByPosition(col_idx); + RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns->at(col_idx).id); + const auto & col = col_with_type_and_name.column; + for (const auto & index_builder : index_builders[read_columns->at(col_idx).id]) + { + index_builder->addBlock(*col, del_mark, should_proceed); + } + } + } + + // Save index to PageStorage + auto index_infos = std::make_shared(); + for (size_t col_idx = 1; col_idx < num_cols; ++col_idx) + { + const auto & cd = read_columns->at(col_idx); + for (const auto & index_builder : index_builders[cd.id]) + { + auto index_page_id = options.storage_pool->newLogPageId(); + MemoryWriteBuffer write_buf; + CompressedWriteBuffer compressed(write_buf); + index_builder->saveToBuffer(compressed); + compressed.next(); + auto data_size = write_buf.count(); + auto buf = write_buf.tryGetReadBuffer(); + // ColumnFileDataProviderRNLocalPageCache currently does not support read data with fields + options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size}); + + dtpb::VectorIndexFileProps vector_index; + vector_index.set_index_id(index_builder->index_id); + vector_index.set_index_bytes(data_size); + vector_index.set_index_kind(tipb::VectorIndexKind_Name(index_builder->definition->kind)); + vector_index.set_distance_metric( + tipb::VectorDistanceMetric_Name(index_builder->definition->distance_metric)); + vector_index.set_dimensions(index_builder->definition->dimension); + index_infos->emplace_back(index_page_id, vector_index); + } + } + + if (file->index_infos) + file->index_infos->insert(file->index_infos->end(), index_infos->begin(), index_infos->end()); + + options.wbs.writeLogAndData(); + // Note: The id of the file cannot be changed, otherwise minor compaction will fail. + // So we just clone the file with new index info. + return file->cloneWith(file->getDataPageId(), index_infos); +} + +ColumnFileTinys ColumnFileTinyVectorIndexWriter::build(ProceedCheckFn should_proceed) const +{ + ColumnFileTinys new_files; + new_files.reserve(options.files.size()); + ColumnDefines column_defines; + ColumnDefine del_cd; + for (const auto & file : options.files) + { + // Only build index for ColumnFileTiny + const auto * tiny_file = file->tryToTinyFile(); + if (!tiny_file) + continue; + if (column_defines.empty()) + { + auto schema = tiny_file->getSchema(); + column_defines = getColumnDefinesFromBlock(schema->getSchema()); + const auto del_cd_iter + = std::find_if(column_defines.cbegin(), column_defines.cend(), [](const ColumnDefine & cd) { + return cd.id == TAG_COLUMN_ID; + }); + RUNTIME_CHECK_MSG( + del_cd_iter != column_defines.cend(), + "Cannot find del_mark column, file_id={}", + tiny_file->getDataPageId()); + del_cd = *del_cd_iter; + } + if (auto new_file = buildIndexForFile(column_defines, del_cd, tiny_file, should_proceed); new_file) + new_files.push_back(new_file); + } + new_files.shrink_to_fit(); + return new_files; +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexWriter.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexWriter.h new file mode 100644 index 00000000000..70e92e1e22e --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexWriter.h @@ -0,0 +1,78 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + + +namespace DB::DM +{ + +using ColumnFileTinys = std::vector; + +// ColumnFileTinyVectorIndexWriter write vector index store in PageStorage for ColumnFileTiny. +class ColumnFileTinyVectorIndexWriter +{ +public: + struct LocalIndexBuildInfo + { + std::vector file_ids; + std::vector index_ids; + size_t estimated_memory_bytes = 0; + LocalIndexInfosPtr indexes_to_build; + }; + + static LocalIndexBuildInfo getLocalIndexBuildInfo( + const LocalIndexInfosSnapshot & index_infos, + const ColumnFilePersistedSetPtr & file_set); + + struct Options + { + const StoragePoolPtr storage_pool; + const WriteLimiterPtr write_limiter; + const ColumnFiles & files; + const IColumnFileDataProviderPtr data_provider; + const LocalIndexInfosPtr index_infos; + WriteBatches & wbs; // Write index and modify meta in the same batch. + }; + + explicit ColumnFileTinyVectorIndexWriter(const Options & options) + : logger(Logger::get()) + , options(options) + {} + + // Build vector index for all files in `options.files`. + // Only return the files that have been built new indexes. + using ProceedCheckFn = std::function; // Return false to stop building index. + ColumnFileTinys build(ProceedCheckFn should_proceed) const; + +private: + ColumnFileTinyPtr buildIndexForFile( + const ColumnDefines & column_defines, + const ColumnDefine & del_cd, + const ColumnFileTiny * file, + ProceedCheckFn should_proceed) const; + + const LoggerPtr logger; + const Options options; +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index 25badc04e2b..339200af11c 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -290,6 +290,22 @@ bool ColumnFilePersistedSet::appendPersistedColumnFiles(const ColumnFilePersiste return true; } +bool ColumnFilePersistedSet::updatePersistedColumnFiles( + const ColumnFilePersisteds & new_persisted_files, + WriteBatches & wbs) +{ + /// Save the new metadata of column files to disk. + serializeColumnFilePersisteds(wbs, metadata_id, new_persisted_files); + wbs.writeMeta(); + + /// Commit updates in memory. + persisted_files = std::move(new_persisted_files); + updateColumnFileStats(); + LOG_DEBUG(log, "{}, after update column files, persisted column files: {}", info(), detailInfo()); + + return true; +} + MinorCompactionPtr ColumnFilePersistedSet::pickUpMinorCompaction(DMContext & context) { // Every time we try to compact all column files. diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index 6fb6b7ffd7a..bac6d10e2a1 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -156,6 +156,8 @@ class ColumnFilePersistedSet bool appendPersistedColumnFiles(const ColumnFilePersisteds & column_files, WriteBatches & wbs); + bool updatePersistedColumnFiles(const ColumnFilePersisteds & new_persisted_files, WriteBatches & wbs); + /// Choose all small column files that can be compacted to larger column files MinorCompactionPtr pickUpMinorCompaction(DMContext & context); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 8cf629da88e..0916b4d8f15 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -855,6 +855,8 @@ bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa if (segment->flushCache(*dm_context)) { + // After flush, try to add delta local index. + segmentEnsureDeltaLocalIndexAsync(segment); break; } else if (!try_until_succeed) @@ -987,6 +989,8 @@ void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & ra // compact could fail. if (segment->compactDelta(*dm_context)) { + // After compact delta, try to create delta local index. + segmentEnsureDeltaLocalIndexAsync(segment); break; } } @@ -1716,6 +1720,8 @@ bool DeltaMergeStore::checkSegmentUpdate( segment->info(), magic_enum::enum_name(input_type)); segment->flushCache(*dm_context); + // After flush, try to add delta local index. + segmentEnsureDeltaLocalIndexAsync(segment); if (input_type == InputType::RaftLog) { // Only the segment update is from a raft log write, will we notify KVStore to trigger a foreground flush. diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 868178fef6d..da74822051e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -739,9 +739,17 @@ class DeltaMergeStore MergeDeltaReason reason, SegmentSnapshotPtr segment_snap = nullptr); - void segmentEnsureStableIndex(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info); + void segmentEnsureStableLocalIndex(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info); - void segmentEnsureStableIndexWithErrorReport(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info); + void segmentEnsureStableLocalIndexWithErrorReport( + DMContext & dm_context, + const LocalIndexBuildInfo & index_build_info); + + void segmentEnsureDeltaLocalIndex( + DMContext & dm_context, + const LocalIndexInfosPtr & index_info, + const DeltaValueSpacePtr & delta, + const String & source_segment_info); /** * Ingest a DMFile into the segment, optionally causing a new segment being created. @@ -895,7 +903,16 @@ class DeltaMergeStore * * @returns true if index is missing and a build task is added in background. */ - bool segmentEnsureStableIndexAsync(const SegmentPtr & segment); + bool segmentEnsureStableLocalIndexAsync(const SegmentPtr & segment); + + /** + * Ensure the segment has delta index. + * If the segment has no delta index, it will be built in background. + * Note: This function can not be called in constructor, since shared_from_this() is not available. + * + * @returns true if index is missing and a build task is added in background. + */ + bool segmentEnsureDeltaLocalIndexAsync(const SegmentPtr & segment); #ifndef DBMS_PUBLIC_GTEST private: @@ -905,6 +922,15 @@ class DeltaMergeStore void applyLocalIndexChange(const TiDB::TableInfo & new_table_info); + /** + * Wait until the segment has delta index. + * If the index is ready or no need to build, it will return immediately. + * Only used for testing. + * + * @returns false if index is still missing after wait timed out. + */ + bool segmentWaitDeltaIndexReady(const SegmentPtr & segment) const; + /** * Wait until the segment has stable index. * If the index is ready or no need to build, it will return immediately. @@ -912,7 +938,7 @@ class DeltaMergeStore * * @returns false if index is still missing after wait timed out. */ - bool segmentWaitStableIndexReady(const SegmentPtr & segment) const; + bool segmentWaitStableLocalIndexReady(const SegmentPtr & segment) const; void dropAllSegments(bool keep_first_segment); String getLogTracingId(const DMContext & dm_ctx); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index f8c3d1e54e0..7e3d62e2aa3 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -416,11 +416,15 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) } case TaskType::Compact: task.segment->compactDelta(*task.dm_context); + // After compact delta, try to create delta local index. + segmentEnsureDeltaLocalIndexAsync(task.segment); left = task.segment; type = ThreadType::BG_Compact; break; case TaskType::Flush: task.segment->flushCache(*task.dm_context); + // After flush cache, try to create delta local index. + segmentEnsureDeltaLocalIndexAsync(task.segment); // After flush cache, better place delta index. task.segment->placeDeltaIndex(*task.dm_context); left = task.segment; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 28800033657..f96a193a90a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -17,6 +17,8 @@ #include #include #include +#include +#include #include #include #include @@ -307,8 +309,8 @@ SegmentPair DeltaMergeStore::segmentSplit( // So there is no need to check the local index update for logical split. if (!split_info.is_logical) { - segmentEnsureStableIndexAsync(new_left); - segmentEnsureStableIndexAsync(new_right); + segmentEnsureStableLocalIndexAsync(new_left); + segmentEnsureStableLocalIndexAsync(new_right); } return {new_left, new_right}; @@ -449,7 +451,7 @@ SegmentPtr DeltaMergeStore::segmentMerge( if constexpr (DM_RUN_CHECK) check(dm_context.global_context); - segmentEnsureStableIndexAsync(merged); + segmentEnsureStableLocalIndexAsync(merged); return merged; } @@ -504,12 +506,12 @@ void DeltaMergeStore::checkAllSegmentsLocalIndex(std::vector && dropped size_t segments_missing_indexes = 0; - // 2. Trigger ensureStableIndex for all segments. + // 2. Trigger EnsureStableLocalIndex for all segments. // There could be new segments between 1 and 2, which is fine. New segments - // will invoke ensureStableIndex at creation time. + // will invoke EnsureStableLocalIndex at creation time. { // There must be a lock, because segments[] may be mutated. - // And one lock for all is fine, because segmentEnsureStableIndexAsync is non-blocking, it + // And one lock for all is fine, because segmentEnsureStableLocalIndexAsync is non-blocking, it // simply put tasks in the background. std::shared_lock lock(read_write_mutex); for (const auto & [end, segment] : segments) @@ -518,7 +520,7 @@ void DeltaMergeStore::checkAllSegmentsLocalIndex(std::vector && dropped // cleanup the index error messaage for dropped indexes segment->clearIndexBuildError(dropped_indexes); - if (segmentEnsureStableIndexAsync(segment)) + if (segmentEnsureStableLocalIndexAsync(segment)) ++segments_missing_indexes; } } @@ -530,7 +532,7 @@ void DeltaMergeStore::checkAllSegmentsLocalIndex(std::vector && dropped segments_missing_indexes); } -bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment) +bool DeltaMergeStore::segmentEnsureStableLocalIndexAsync(const SegmentPtr & segment) { RUNTIME_CHECK(segment != nullptr); @@ -554,7 +556,7 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment) auto store_weak_ptr = weak_from_this(); auto tracing_id - = fmt::format("segmentEnsureStableIndex<{}> source_segment={}", log->identifier(), segment->simpleInfo()); + = fmt::format("segmentEnsureStableLocalIndex<{}> source_segment={}", log->identifier(), segment->simpleInfo()); auto workload = [store_weak_ptr, build_info, tracing_id]() -> void { auto store = store_weak_ptr.lock(); if (store == nullptr) // Store is destroyed before the task is executed. @@ -563,7 +565,7 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment) store->global_context, store->global_context.getSettingsRef(), tracing_id); - store->segmentEnsureStableIndexWithErrorReport(*dm_context, build_info); + store->segmentEnsureStableLocalIndexWithErrorReport(*dm_context, build_info); }; auto indexer_scheduler = global_context.getGlobalLocalIndexerScheduler(); @@ -604,7 +606,7 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment) } } -bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) const +bool DeltaMergeStore::segmentWaitStableLocalIndexReady(const SegmentPtr & segment) const { RUNTIME_CHECK(segment != nullptr); @@ -690,7 +692,9 @@ SegmentPtr DeltaMergeStore::segmentUpdateMeta( return new_segment; } -void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info) +void DeltaMergeStore::segmentEnsureStableLocalIndex( + DMContext & dm_context, + const LocalIndexBuildInfo & index_build_info) { // 1. Acquire a snapshot for PageStorage, and keep the snapshot until index is built. // This helps keep DMFile valid during the index build process. @@ -700,9 +704,9 @@ void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const Loc // Note that we cannot simply skip the index building when seg is not valid any more, // because segL and segR is still referencing them, consider this case: // 1. seg=PhysicalSplit - // 2. Add CreateStableIndex(seg) to ThreadPool + // 2. Add CreateStableLocalIndex(seg) to ThreadPool // 3. segL, segR=LogicalSplit(seg) - // 4. CreateStableIndex(seg) + // 4. CreateStableLocalIndex(seg) auto storage_snapshot = std::make_shared( *dm_context.storage_pool, @@ -724,13 +728,13 @@ void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const Loc // 2. Check whether the DMFile has been referenced by any valid segment. if (!is_file_valid()) { - LOG_DEBUG(tracing_logger, "EnsureStableIndex - Give up because no segment to update"); + LOG_DEBUG(tracing_logger, "EnsureStableLocalIndex - Give up because no segment to update"); return; } LOG_INFO( tracing_logger, - "EnsureStableIndex - Begin building index, dm_files={}", + "EnsureStableLocalIndex - Begin building index, dm_files={}", DMFile::info(index_build_info.dm_files)); // 2. Build the index. @@ -754,7 +758,7 @@ void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const Loc { LOG_INFO( tracing_logger, - "EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={}", + "EnsureStableLocalIndex - Build index aborted because DMFile is no longer valid, dm_files={}", DMFile::info(index_build_info.dm_files)); return; } @@ -765,7 +769,7 @@ void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const Loc LOG_INFO( tracing_logger, - "EnsureStableIndex - Finish building index, dm_files={}", + "EnsureStableLocalIndex - Finish building index, dm_files={}", DMFile::info(index_build_info.dm_files)); // 3. Update the meta version of the segments to the latest one. @@ -791,10 +795,10 @@ void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const Loc } } -// A wrapper of `segmentEnsureStableIndex` +// A wrapper of `segmentEnsureStableLocalIndex` // If any exception thrown, the error message will be recorded to // the related segment(s) -void DeltaMergeStore::segmentEnsureStableIndexWithErrorReport( +void DeltaMergeStore::segmentEnsureStableLocalIndexWithErrorReport( DMContext & dm_context, const LocalIndexBuildInfo & index_build_info) { @@ -824,7 +828,7 @@ void DeltaMergeStore::segmentEnsureStableIndexWithErrorReport( try { - segmentEnsureStableIndex(dm_context, index_build_info); + segmentEnsureStableLocalIndex(dm_context, index_build_info); } catch (DB::Exception & e) { @@ -845,6 +849,289 @@ void DeltaMergeStore::segmentEnsureStableIndexWithErrorReport( } } +bool DeltaMergeStore::segmentEnsureDeltaLocalIndexAsync(const SegmentPtr & segment) +{ + RUNTIME_CHECK(segment != nullptr); + + auto local_index_infos_snap = getLocalIndexInfosSnapshot(); + if (!local_index_infos_snap) + return false; + + // Acquire a lock to make sure delta is not changed during the process. + auto lock = segment->getUpdateLock(); + if (!lock) + return false; + auto column_file_persisted_set = segment->getDelta()->getPersistedFileSet(); + if (!column_file_persisted_set) + return false; + auto build_info + = ColumnFileTinyVectorIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, column_file_persisted_set); + if (!build_info.indexes_to_build || build_info.indexes_to_build->empty()) + return false; + // Use weak_ptr to avoid blocking gc. + auto delta_weak_ptr = std::weak_ptr(segment->getDelta()); + lock->unlock(); + + auto store_weak_ptr = weak_from_this(); + auto tracing_id = fmt::format("segmentEnsureDeltaLocalIndexAsync<{}>", log->identifier()); + auto workload = [store_weak_ptr, build_info, delta_weak_ptr, segment, tracing_id]() -> void { + auto store = store_weak_ptr.lock(); + if (!store) // Store is destroyed before the task is executed. + return; + auto delta = delta_weak_ptr.lock(); + if (!delta) // Delta is destroyed before the task is executed. + return; + auto dm_context = store->newDMContext( // + store->global_context, + store->global_context.getSettingsRef(), + tracing_id); + const auto source_segment_info = segment->simpleInfo(); + store->segmentEnsureDeltaLocalIndex(*dm_context, build_info.indexes_to_build, delta, source_segment_info); + }; + + auto indexer_scheduler = global_context.getGlobalLocalIndexerScheduler(); + RUNTIME_CHECK(indexer_scheduler != nullptr); + try + { + // new task of these index are generated, clear existing error_message in segment + segment->clearIndexBuildError(build_info.index_ids); + + auto [ok, reason] = indexer_scheduler->pushTask(LocalIndexerScheduler::Task{ + .keyspace_id = keyspace_id, + .table_id = physical_table_id, + .file_ids = build_info.file_ids, + .request_memory = build_info.estimated_memory_bytes, + .workload = workload, + }); + if (ok) + return true; + + segment->setIndexBuildError(build_info.index_ids, reason); + LOG_ERROR( + log->getChild(tracing_id), + "Failed to generate async segment stable index task, index_ids={} reason={}", + build_info.index_ids, + reason); + return false; + } + catch (...) + { + const auto message = getCurrentExceptionMessage(false, false); + segment->setIndexBuildError(build_info.index_ids, message); + + tryLogCurrentException(log); + + // catch and ignore the exception + // not able to push task to index scheduler + return false; + } +} + +bool DeltaMergeStore::segmentWaitDeltaIndexReady(const SegmentPtr & segment) const +{ + RUNTIME_CHECK(segment != nullptr); + + auto local_index_infos_snap = getLocalIndexInfosSnapshot(); + if (!local_index_infos_snap) + return true; + + // Acquire a lock to make sure delta is not changed during the process. + auto lock = segment->mustGetUpdateLock(); + auto column_file_persisted_set = segment->getDelta()->getPersistedFileSet(); + if (!column_file_persisted_set) + return false; + auto build_info + = ColumnFileTinyVectorIndexWriter::getLocalIndexBuildInfo(local_index_infos, column_file_persisted_set); + // Use weak_ptr to avoid blocking gc. + auto delta_weak_ptr = std::weak_ptr(segment->getDelta()); + lock.unlock(); + if (!build_info.indexes_to_build || build_info.indexes_to_build->empty()) + return false; + + auto segment_id = segment->segmentId(); + static constexpr size_t MAX_CHECK_TIME_SECONDS = 60; // 60s + Stopwatch watch; + while (watch.elapsedSeconds() < MAX_CHECK_TIME_SECONDS) + { + ColumnFilePersistedSetPtr column_file_persisted_set; + { + std::shared_lock lock(read_write_mutex); + auto seg = id_to_segment.at(segment_id); + column_file_persisted_set = seg->getDelta()->getPersistedFileSet(); + } + if (!column_file_persisted_set) + return false; // ColumnFilePersistedSet is not exist, return false + bool all_indexes_built = true; + for (const auto & index : *build_info.indexes_to_build) + { + for (const auto & column_file : column_file_persisted_set->getFiles()) + { + auto * tiny_file = column_file->tryToTinyFile(); + if (!tiny_file) + continue; + all_indexes_built = all_indexes_built && (tiny_file->hasIndex(index.index_id)); + } + } + if (all_indexes_built) + return true; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 0.1s + } + + return false; +} + +void DeltaMergeStore::segmentEnsureDeltaLocalIndex( + DMContext & dm_context, + const LocalIndexInfosPtr & index_info, + const DeltaValueSpacePtr & delta, + const String & source_segment_info) +{ + Stopwatch watch; + + // 1. Acquire a read-only snapshot for persisted files, and keep the snapshot until index is built. + ColumnFileSetSnapshotPtr persisted_files_snap; + if (auto lock = delta->getLock(); lock) + { + auto storage_snap = std::make_shared( + *storage_pool, + dm_context.getReadLimiter(), + dm_context.tracing_id, + /*snapshot_read*/ true); + auto data_from_storage_snap = ColumnFileDataProviderLocalStoragePool::create(storage_snap); + persisted_files_snap = delta->getPersistedFileSet()->createSnapshot(data_from_storage_snap); + } + if (!persisted_files_snap) + { + LOG_DEBUG( + log, + "EnsureDeltaLocalIndex - Give up because create PersistedColumnFileSet snapshot failed, delta={}, " + "source_segment={}", + delta->simpleInfo(), + source_segment_info); + return; + } + + auto persisted_files = persisted_files_snap->getColumnFiles(); + if (persisted_files.empty()) + { + LOG_DEBUG( + log, + "EnsureDeltaLocalIndex - Give up because no column files to update, delta={}, source_segment={}", + delta->simpleInfo(), + source_segment_info); + return; + } + + LOG_INFO( + log, + "EnsureDeltaLocalIndex - Begin building index, delta={} source_segment={}", + delta->info(), + source_segment_info); + + // 2. Build the index. + WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter()); + ColumnFileTinyVectorIndexWriter iw(ColumnFileTinyVectorIndexWriter::Options{ + .storage_pool = storage_pool, + .write_limiter = dm_context.getWriteLimiter(), + .files = persisted_files, + .data_provider = persisted_files_snap->getDataProvider(), + .index_infos = index_info, + .wbs = wbs, + }); + + ColumnFileTinys new_tiny_files; + try + { + auto is_delta_valid = [delta] { + return !delta->hasAbandoned(); + }; + // When delta hs been abandoned we need to abort the index build. + new_tiny_files = iw.build(is_delta_valid); + } + catch (const Exception & e) + { + wbs.setRollback(); + if (e.code() == ErrorCodes::ABORTED) + { + LOG_INFO( + log, + "EnsureDeltaLocalIndex - Build index aborted because delta has been abandoned, delta={} " + "source_segment={}", + delta->simpleInfo(), + source_segment_info); + return; + } + throw; + } + + const size_t rows + = std::accumulate(new_tiny_files.begin(), new_tiny_files.end(), 0, [](size_t sum, const auto & tiny_file) { + return sum + tiny_file->getRows(); + }); + const size_t bytes + = std::accumulate(new_tiny_files.begin(), new_tiny_files.end(), 0, [](size_t sum, const auto & tiny_file) { + return sum + tiny_file->getBytes(); + }); + const size_t file_num = new_tiny_files.size(); + LOG_INFO( + log, + "EnsureDeltaLocalIndex - Indexes have been build, {} files, {} rows, {} bytes, cost {:.3f}s, delta={} " + "source_segment={}", + file_num, + rows, + bytes, + watch.elapsedSeconds(), + delta->simpleInfo(), + source_segment_info); + + SYNC_FOR("DeltaMergeStore::segmentEnsureDeltaLocalIndex_after_build"); + + // 3. Update persisted column files. + auto lock = delta->getLock(); + if (!lock) + { + // Since we only create a read-only snapshot for delta, it is possible that the delta has been abandoned by other threads. + LOG_INFO( + log, + "EnsureDeltaLocalIndex - Give up because delta has been abandoned after building index, delta={}, " + "source_segment={}", + delta->simpleInfo(), + source_segment_info); + wbs.setRollback(); + return; + } + + // Between acquiring the snapshot and locking the delta, the delta may have been changed (like FlushCache and Compact). + // So there are may be some new column files in the delta but are not in the snapshot and some column files in the snapshot that are abandoned by the delta. + // We only need to update the column files that are both in the snapshot and the delta. + // It is safe to update the column files in the delta directly, because the delta is locked and the column data in ColumnFile is immutable + auto delta_persisted_file_set = delta->getPersistedFileSet(); + auto delta_persisted_column_files = delta_persisted_file_set->getFiles(); + + std::unordered_map new_column_files_map; + for (auto & new_tiny_file : new_tiny_files) + { + new_column_files_map[new_tiny_file->getDataPageId()] = std::move(new_tiny_file); + } + // Update the column files in the delta with the new column files. + for (auto & column_file : delta_persisted_column_files) + { + const auto * tiny_file = column_file->tryToTinyFile(); + if (!tiny_file) + continue; + if (auto iter = new_column_files_map.find(tiny_file->getDataPageId()); iter != new_column_files_map.end()) + column_file = iter->second; + } + + delta_persisted_file_set->updatePersistedColumnFiles(delta_persisted_column_files, wbs); + LOG_INFO( + log, + "EnsureDeltaLocalIndex - Finish building index, cost {:.3f}s, delta={} source_segment={}", + watch.elapsedSeconds(), + delta->info(), + source_segment_info); +} + SegmentPtr DeltaMergeStore::segmentMergeDelta( DMContext & dm_context, const SegmentPtr & segment, @@ -992,7 +1279,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( if constexpr (DM_RUN_CHECK) check(dm_context.global_context); - segmentEnsureStableIndexAsync(new_segment); + segmentEnsureStableLocalIndexAsync(new_segment); return new_segment; } @@ -1102,7 +1389,7 @@ SegmentPtr DeltaMergeStore::segmentIngestData( if constexpr (DM_RUN_CHECK) check(dm_context.global_context); - segmentEnsureStableIndexAsync(new_segment); + segmentEnsureStableLocalIndexAsync(new_segment); return new_segment; } @@ -1162,7 +1449,7 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceDataFromCheckpoint( if constexpr (DM_RUN_CHECK) check(dm_context.global_context); - segmentEnsureStableIndexAsync(new_segment); + segmentEnsureStableLocalIndexAsync(new_segment); return new_segment; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp index 6fef0a692c4..dd7ee697719 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp @@ -214,7 +214,7 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, P ? dm_file_mutable->vectorIndexFileName(index_id) : colIndexFileName(DMFile::getFileNameBase(cd.id, substream_path)); const auto index_path = iw->localPath() + "/" + index_file_name; - index_builder->save(index_path); + index_builder->saveToFile(index_path); // Memorize what kind of vector index it is, so that we can correctly restore it when reading. dtpb::VectorIndexFileProps pb_idx; diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h index b86d9ea6a3b..f95bac95d2e 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h @@ -57,7 +57,8 @@ class VectorIndexBuilder ProceedCheckFn should_proceed) = 0; - virtual void save(std::string_view path) const = 0; + virtual void saveToFile(std::string_view path) const = 0; + virtual void saveToBuffer(WriteBuffer & write_buf) const = 0; public: const IndexID index_id; diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp index 79841675e01..a17111724b2 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp @@ -133,7 +133,7 @@ void VectorIndexHNSWBuilder::addBlock( last_reported_memory_usage = current_memory_usage; } -void VectorIndexHNSWBuilder::save(std::string_view path) const +void VectorIndexHNSWBuilder::saveToFile(std::string_view path) const { Stopwatch w; SCOPE_EXIT({ total_duration += w.elapsedSeconds(); }); @@ -142,6 +142,18 @@ void VectorIndexHNSWBuilder::save(std::string_view path) const RUNTIME_CHECK_MSG(result, "Failed to save vector index: {} path={}", result.error.what(), path); } +void VectorIndexHNSWBuilder::saveToBuffer(WriteBuffer & write_buf) const +{ + Stopwatch w; + SCOPE_EXIT({ total_duration += w.elapsedSeconds(); }); + + auto result = index.save_to_stream([&](void const * buffer, std::size_t length) { + write_buf.write(reinterpret_cast(buffer), length); + return true; + }); + RUNTIME_CHECK_MSG(result, "Failed to save vector index: {}", result.error.what()); +} + VectorIndexHNSWBuilder::~VectorIndexHNSWBuilder() { GET_METRIC(tiflash_vector_index_duration, type_build).Observe(total_duration); diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h index 59161db7cc3..380d5bdf9f8 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.h @@ -35,7 +35,8 @@ class VectorIndexHNSWBuilder : public VectorIndexBuilder void addBlock(const IColumn & column, const ColumnVector * del_mark, ProceedCheckFn should_proceed) override; - void save(std::string_view path) const override; + void saveToFile(std::string_view path) const override; + void saveToBuffer(WriteBuffer & write_buf) const override; private: USearchImplType index; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp index fe9d6899ad0..b5ba4ea38f6 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp @@ -119,7 +119,7 @@ class DeltaMergeStoreVectorTest store->segmentMergeDelta(*dm_context, segment, DeltaMergeStore::MergeDeltaReason::Manual) != nullptr); } - void waitStableIndexReady() + void waitStableLocalIndexReady() { std::vector all_segments; { @@ -128,7 +128,7 @@ class DeltaMergeStoreVectorTest all_segments.push_back(segment); } for (const auto & segment : all_segments) - ASSERT_TRUE(store->segmentWaitStableIndexReady(segment)); + ASSERT_TRUE(store->segmentWaitStableLocalIndexReady(segment)); } void triggerMergeAllSegments() @@ -170,7 +170,7 @@ try triggerMergeDelta(); // check stable index has built for all segments - waitStableIndexReady(); + waitStableLocalIndexReady(); const auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); @@ -244,7 +244,7 @@ try } // check stable index has built for all segments - waitStableIndexReady(); + waitStableLocalIndexReady(); // read from store { @@ -282,7 +282,7 @@ try triggerMergeAllSegments(); // check stable index has built for all segments - waitStableIndexReady(); + waitStableLocalIndexReady(); auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); @@ -350,7 +350,7 @@ try if (left == nullptr && right == nullptr) { // check stable index has built for all segments first - waitStableIndexReady(); + waitStableLocalIndexReady(); // trigger physical split again std::tie(left, right) = physical_split(); } @@ -364,7 +364,7 @@ try store->rowkey_column_size); // check stable index has built for all segments - waitStableIndexReady(); + waitStableLocalIndexReady(); // read from store { @@ -402,7 +402,7 @@ try triggerMergeAllSegments(); // check stable index has built for all segments - waitStableIndexReady(); + waitStableLocalIndexReady(); auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); @@ -488,7 +488,7 @@ try } // check stable index has built for all segments - waitStableIndexReady(); + waitStableLocalIndexReady(); auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); @@ -556,7 +556,7 @@ try store = reload(); // check stable index has built for all segments - waitStableIndexReady(); + waitStableLocalIndexReady(); { auto local_index_snap = store->getLocalIndexInfosSnapshot(); ASSERT_NE(local_index_snap, nullptr); @@ -646,7 +646,7 @@ try } // check stable index has built for all segments - waitStableIndexReady(); + waitStableLocalIndexReady(); const auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index 6a9e9a4fb68..867251091f7 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -1380,7 +1380,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 5, /* at */ 0, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + ensureSegmentStableLocalIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); auto stream = annQuery(DELTA_MERGE_FIRST_SEGMENT_ID, createQueryColumns(), 1, {100.0}); assertStreamOut(stream, "[4, 5)"); @@ -1405,7 +1405,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 5, /* at */ 0, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + ensureSegmentStableLocalIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 10, /* at */ 20); @@ -1436,7 +1436,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 10, /* at */ 20, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + ensureSegmentStableLocalIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); // Delta: [12, 18), [50, 60) writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 6, /* at */ 12); @@ -1486,7 +1486,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 100, /* at */ 0, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + ensureSegmentStableLocalIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); size_t cache_hit = 0; size_t cache_miss = 0; @@ -1529,7 +1529,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 100, /* at */ 0, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + ensureSegmentStableLocalIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); size_t cache_hit = 0; size_t cache_miss = 0; @@ -1574,7 +1574,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 100, /* at */ 0, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + ensureSegmentStableLocalIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); size_t cache_hit = 0; size_t cache_miss = 0; @@ -1677,7 +1677,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 5, /* at */ 0, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + ensureSegmentStableLocalIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 10, /* at */ 20); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h index 37c93696c21..b3516e1de44 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h @@ -185,7 +185,7 @@ class DeltaMergeStoreVectorBase : public VectorIndexTestUtils store->segmentMergeDelta(*dm_context, segment, DeltaMergeStore::MergeDeltaReason::Manual) != nullptr); } - void waitStableIndexReady() const + void waitStableLocalIndexReady() const { std::vector all_segments; { @@ -194,7 +194,7 @@ class DeltaMergeStoreVectorBase : public VectorIndexTestUtils all_segments.push_back(segment); } for (const auto & segment : all_segments) - ASSERT_TRUE(store->segmentWaitStableIndexReady(segment)); + ASSERT_TRUE(store->segmentWaitStableLocalIndexReady(segment)); } ContextPtr db_context; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 7370f53e68e..2656f97220e 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -773,9 +773,9 @@ bool SegmentTestBasic::replaceSegmentStableData(PageIdU64 segment_id, const DMFi return success; } -bool SegmentTestBasic::ensureSegmentStableIndex(PageIdU64 segment_id, const LocalIndexInfosPtr & local_index_infos) +bool SegmentTestBasic::ensureSegmentStableLocalIndex(PageIdU64 segment_id, const LocalIndexInfosPtr & local_index_infos) { - LOG_INFO(logger_op, "EnsureSegmentStableIndex, segment_id={}", segment_id); + LOG_INFO(logger_op, "EnsureSegmentStableLocalIndex, segment_id={}", segment_id); RUNTIME_CHECK(segments.find(segment_id) != segments.end()); @@ -794,12 +794,12 @@ bool SegmentTestBasic::ensureSegmentStableIndex(PageIdU64 segment_id, const Loca auto new_dmfiles = iw.build(); RUNTIME_CHECK(new_dmfiles.size() == 1); - LOG_INFO(logger_op, "EnsureSegmentStableIndex, build index done, segment_id={}", segment_id); + LOG_INFO(logger_op, "EnsureSegmentStableLocalIndex, build index done, segment_id={}", segment_id); // Replace stable data success = replaceSegmentStableData(segment_id, new_dmfiles[0]); - operation_statistics["ensureStableIndex"]++; + operation_statistics["ensureStableLocalIndex"]++; return success; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 059c8eb739e..7e6e18244db 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -106,7 +106,7 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic /** * Returns whether segment stable index is created. */ - bool ensureSegmentStableIndex(PageIdU64 segment_id, const LocalIndexInfosPtr & local_index_infos); + bool ensureSegmentStableLocalIndex(PageIdU64 segment_id, const LocalIndexInfosPtr & local_index_infos); Block prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted = false); Block prepareWriteBlockInSegmentRange( diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index 7f6d33487d7..5ff90cbf468 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -836,8 +836,8 @@ try // test read with ANN query after add a vector index { // check stable index has built for all segments - dmsv.waitStableIndexReady(); - LOG_INFO(Logger::get(), "waitStableIndexReady done"); + dmsv.waitStableLocalIndexReady(); + LOG_INFO(Logger::get(), "waitStableLocalIndexReady done"); const auto range = DM::RowKeyRange::newAll(dmsv.store->is_common_handle, dmsv.store->rowkey_column_size); // read from store