From cb4a6b28879b11365eca4027fae519e45d5ceba3 Mon Sep 17 00:00:00 2001 From: Myscale Date: Wed, 12 Jun 2024 14:35:13 +0800 Subject: [PATCH 1/2] Project import generated by Copybara. GitOrigin-RevId: 4c7ab18dc782f03e88a12b9eba658848e53b6f75 --- cmake/autogenerated_myscale_versions.txt | 6 +- programs/keeper/Keeper.cpp | 2 +- programs/server/Server.cpp | 4 +- programs/server/Server.h | 2 - src/Access/UsersConfigAccessStorage.cpp | 2 +- src/Common/AsynchronousMetrics.cpp | 2 +- src/Common/Config/ConfigReloader.cpp | 2 +- src/Common/Config/ConfigReloader.h | 2 +- src/Common/ErrorCodes.cpp | 3 +- src/Core/Settings.h | 1 + src/DataTypes/DataTypeObjectToFetch.cpp | 6 + src/DataTypes/DataTypeTuple.cpp | 6 + .../SerializationObjectToFetch.cpp | 8 + .../Serializations/SerializationTuple.cpp | 2 + src/Disks/ObjectStorages/S3/S3ObjectStorage.h | 5 +- .../ObjectStorages/S3/registerDiskS3.cpp | 2 +- src/Interpreters/Context.cpp | 2 +- src/Interpreters/Context.h | 1 + src/Interpreters/ExpressionAnalyzer.cpp | 13 +- src/Interpreters/InterpreterInsertQuery.cpp | 1 - src/Parsers/ParserCreateQuery.cpp | 7 - .../QueryPlan/ReadWithVectorScan.cpp.rej | 41 -- src/Storages/AlterCommands.cpp | 34 +- src/Storages/MergeTree/DataPartsExchange.cpp | 2 - src/Storages/MergeTree/IMergeTreeDataPart.cpp | 189 +------- src/Storages/MergeTree/KeyCondition.cpp | 19 +- src/Storages/MergeTree/KeyCondition.h | 13 +- src/Storages/MergeTree/MergeTask.cpp | 27 +- src/Storages/MergeTree/MergeTask.h | 1 + src/Storages/MergeTree/MergeTreeData.cpp | 11 +- src/Storages/MergeTree/MergeTreeData.h | 3 +- .../MergeTree/MergeTreeDataPartChecksum.cpp | 8 +- .../MergeTree/StorageFromMergeTreeDataPart.h | 15 - src/Storages/MergeTree/TantivyIndexStore.cpp | 11 +- src/Storages/MergeTree/checkDataPart.cpp | 4 + src/Storages/SelectQueryInfo.h | 2 - src/Storages/StorageInMemoryMetadata.cpp | 1 + src/Storages/StorageMergeTree.cpp | 2 - src/Storages/StorageReplicatedMergeTree.cpp | 26 +- src/Storages/StorageReplicatedMergeTree.h | 1 + src/VectorIndex/CMakeLists.txt | 1 + src/VectorIndex/Cache/VICacheManager.cpp | 3 +- src/VectorIndex/Cache/VICacheManager.h | 1 + src/VectorIndex/Common/SegmentId.cpp | 9 +- src/VectorIndex/Common/VIMetadata.h | 2 + src/VectorIndex/Common/VIPartReader.h | 3 +- src/VectorIndex/Common/VIWithDataPart.cpp | 88 +++- src/VectorIndex/Common/VIWithDataPart.h | 15 +- src/VectorIndex/Interpreters/VIEventLog.h | 2 +- .../Processors/ReadWithHybridSearch.cpp | 53 ++- src/VectorIndex/Storages/HybridSearchResult.h | 1 - ...rgeTreeSelectWithHybridSearchProcessor.cpp | 402 +++++++++++++----- ...MergeTreeSelectWithHybridSearchProcessor.h | 26 +- .../Storages/MergeTreeVSManager.cpp | 85 +++- src/VectorIndex/Storages/StorageSystemVIs.cpp | 13 +- .../Storages/StorageSystemVIsWithPart.h | 1 - src/VectorIndex/Storages/VIBuilderUpdater.cpp | 14 +- src/VectorIndex/Storages/VIDescriptions.cpp | 5 +- src/VectorIndex/Storages/VIInfo.h | 1 + src/VectorIndex/Storages/VITaskBase.cpp | 9 +- src/VectorIndex/Storages/VSDescription.h | 1 + src/VectorIndex/Utils/VIUtils.cpp | 14 +- src/VectorIndex/Utils/VIUtils.h | 7 +- .../test.py | 34 +- .../test_mqvs_control_decouple_flag/test.py | 8 +- .../test_mqvs_vector_index_upgrade/test.py | 6 +- 66 files changed, 735 insertions(+), 558 deletions(-) delete mode 100644 src/Processors/QueryPlan/ReadWithVectorScan.cpp.rej diff --git a/cmake/autogenerated_myscale_versions.txt b/cmake/autogenerated_myscale_versions.txt index e0680f07c47..4e03669bc72 100644 --- a/cmake/autogenerated_myscale_versions.txt +++ b/cmake/autogenerated_myscale_versions.txt @@ -3,9 +3,9 @@ # NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION, # only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes. SET(MYSCALE_VERSION_MAJOR 1) -SET(MYSCALE_VERSION_MINOR 5) +SET(MYSCALE_VERSION_MINOR 6) SET(MYSCALE_VERSION_PATCH 0) -SET(MYSCALE_VERSION_DESCRIBE myscale-v1.5.0) -SET(MYSCALE_VERSION_STRING 1.5.0) +SET(MYSCALE_VERSION_DESCRIBE myscale-v1.6.0) +SET(MYSCALE_VERSION_STRING 1.6.0) # end of autochange diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index 266b363eb47..e20da9fdfc9 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -531,7 +531,7 @@ try config().getString("path", ""), std::move(unused_cache), unused_event, - [&](ConfigurationPtr config, bool /* initial_loading */) + [&](ConfigurationPtr config, XMLDocumentPtr /* preprocessed_xml */, bool /* initial_loading */) { if (config->has("keeper_server")) tiny_context->updateKeeperConfiguration(*config); diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 471e6c4e93c..5ebc1664c82 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1264,7 +1264,7 @@ try config().getString("path", ""), std::move(main_config_zk_node_cache), main_config_zk_changed_event, - [&](ConfigurationPtr config, bool initial_loading) + [&](ConfigurationPtr config, [[maybe_unused]] XMLDocumentPtr preprocessed_xml, bool initial_loading) { Settings::checkNoSettingNamesAtTopLevel(*config, config_path); @@ -1641,6 +1641,8 @@ try CompressionCodecEncrypted::Configuration::instance().load(config(), "encryption_codecs"); SCOPE_EXIT({ + + async_metrics.stop(); /** Ask to cancel background jobs all table engines, diff --git a/programs/server/Server.h b/programs/server/Server.h index 50973ef77e7..e9ae6d8d937 100644 --- a/programs/server/Server.h +++ b/programs/server/Server.h @@ -68,7 +68,6 @@ class Server : public BaseDaemon, public IServer std::string getDefaultCorePath() const override; - private: ContextMutablePtr global_context; /// Updated/recent config, to compare http_handlers @@ -90,7 +89,6 @@ class Server : public BaseDaemon, public IServer AsynchronousMetrics & async_metrics, bool & is_secure); - using CreateServerFunc = std::function; void createServer( Poco::Util::AbstractConfiguration & config, diff --git a/src/Access/UsersConfigAccessStorage.cpp b/src/Access/UsersConfigAccessStorage.cpp index 562df61e8aa..03bddff8325 100644 --- a/src/Access/UsersConfigAccessStorage.cpp +++ b/src/Access/UsersConfigAccessStorage.cpp @@ -660,7 +660,7 @@ void UsersConfigAccessStorage::load( preprocessed_dir, zkutil::ZooKeeperNodeCache(get_zookeeper_function), std::make_shared(), - [&](Poco::AutoPtr new_config, bool /*initial_loading*/) + [&](Poco::AutoPtr new_config, XMLDocumentPtr /*preprocessed_xml*/, bool /*initial_loading*/) { Settings::checkNoSettingNamesAtTopLevel(*new_config, users_config_path); parseFromConfig(*new_config); diff --git a/src/Common/AsynchronousMetrics.cpp b/src/Common/AsynchronousMetrics.cpp index 677b104b30d..2f759e8b457 100644 --- a/src/Common/AsynchronousMetrics.cpp +++ b/src/Common/AsynchronousMetrics.cpp @@ -610,7 +610,7 @@ void AsynchronousMetrics::update(TimePoint update_time) Int64 rss = data.resident; #if defined(OS_LINUX) // To obtain a more precise memory usage, we deduct the shared - // memory utilized by MSTG mmap files. + // memory utilized by MyScale vector index mmap files. if (first_run) base_shared = data.shared; Int64 shared = data.shared - base_shared; diff --git a/src/Common/Config/ConfigReloader.cpp b/src/Common/Config/ConfigReloader.cpp index de7011b67bf..e671adfe6da 100644 --- a/src/Common/Config/ConfigReloader.cpp +++ b/src/Common/Config/ConfigReloader.cpp @@ -146,7 +146,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac try { - updater(loaded_config.configuration, initial_loading); + updater(loaded_config.configuration, loaded_config.preprocessed_xml, initial_loading); } catch (...) { diff --git a/src/Common/Config/ConfigReloader.h b/src/Common/Config/ConfigReloader.h index 982e21c91e2..971ebc23d39 100644 --- a/src/Common/Config/ConfigReloader.h +++ b/src/Common/Config/ConfigReloader.h @@ -27,7 +27,7 @@ class Context; class ConfigReloader { public: - using Updater = std::function; + using Updater = std::function; /** include_from_path is usually /etc/metrika.xml (i.e. value of tag) */ diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 95014e34771..4d7183bedfa 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -668,7 +668,8 @@ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ M(1001, STD_EXCEPTION) \ - M(1002, UNKNOWN_EXCEPTION) \ + M(1003, UNKNOWN_EXCEPTION) + /* See END */ namespace DB diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c12eb4d91b4..87053851007 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -917,6 +917,7 @@ class IColumn; M(Bool, enable_brute_force_vector_search, false, "Enable brute-force search for data parts without vector indexes.", 0) \ M(Float, hybrid_search_fusion_weight, 0.5f, "Default fusion_weight for hybrid search Relative Score Fusion (RSF) function. Valid value is in interval [0.0f, 1.0f]", 0) \ M(UInt64, hybrid_search_fusion_k, 60, "Default fusion_k for hybrid search Reciprocal Rank Fusion (RRF) function", 0) \ + M(Bool, optimize_prefilter_in_search, true, "Enable prewhere optimization for vector or text search if some partition columns in prewhere condition.", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS. diff --git a/src/DataTypes/DataTypeObjectToFetch.cpp b/src/DataTypes/DataTypeObjectToFetch.cpp index 43b13f41f84..3c73696c607 100644 --- a/src/DataTypes/DataTypeObjectToFetch.cpp +++ b/src/DataTypes/DataTypeObjectToFetch.cpp @@ -105,12 +105,14 @@ static inline IColumn & extractElementColumn(IColumn & column, size_t idx) template static void addElementSafe(const DataTypes & elems, IColumn & column, F && impl) { + /// We use the assumption that tuples of zero size do not exist. size_t old_size = column.size(); try { impl(); + // Check that all columns now have the same size. size_t new_size = column.size(); for (auto i : collections::range(0, elems.size())) @@ -150,6 +152,10 @@ MutableColumnPtr DataTypeObjectToFetch::createColumn() const MutableColumnPtr DataTypeObjectToFetch::createColumn(const ISerialization & serialization) const { + /// If we read subcolumn of nested Tuple, it may be wrapped to SerializationNamed + /// several times to allow to reconstruct the substream path name. + /// Here we don't need substream path name, so we drop first several wrapper serializations. + const auto * current_serialization = &serialization; while (const auto * serialization_named = typeid_cast(current_serialization)) current_serialization = serialization_named->getNested().get(); diff --git a/src/DataTypes/DataTypeTuple.cpp b/src/DataTypes/DataTypeTuple.cpp index 662bcf5bc2f..768f87fe3d4 100644 --- a/src/DataTypes/DataTypeTuple.cpp +++ b/src/DataTypes/DataTypeTuple.cpp @@ -103,12 +103,14 @@ static inline IColumn & extractElementColumn(IColumn & column, size_t idx) template static void addElementSafe(const DataTypes & elems, IColumn & column, F && impl) { + /// We use the assumption that tuples of zero size do not exist. size_t old_size = column.size(); try { impl(); + // Check that all columns now have the same size. size_t new_size = column.size(); for (auto i : collections::range(0, elems.size())) @@ -148,6 +150,10 @@ MutableColumnPtr DataTypeTuple::createColumn() const MutableColumnPtr DataTypeTuple::createColumn(const ISerialization & serialization) const { + /// If we read subcolumn of nested Tuple, it may be wrapped to SerializationNamed + /// several times to allow to reconstruct the substream path name. + /// Here we don't need substream path name, so we drop first several wrapper serializations. + const auto * current_serialization = &serialization; while (const auto * serialization_named = typeid_cast(current_serialization)) current_serialization = serialization_named->getNested().get(); diff --git a/src/DataTypes/Serializations/SerializationObjectToFetch.cpp b/src/DataTypes/Serializations/SerializationObjectToFetch.cpp index 39ccfb5837b..e2ffd53af0c 100644 --- a/src/DataTypes/Serializations/SerializationObjectToFetch.cpp +++ b/src/DataTypes/Serializations/SerializationObjectToFetch.cpp @@ -63,19 +63,25 @@ void SerializationObjectToFetch::serializeBinary(const IColumn & column, size_t template static void addElementSafe(size_t num_elems, IColumn & column, F && impl) { + /// We use the assumption that ObjectToFetchs of zero size do not exist. size_t old_size = column.size(); try { impl(); + // Check that all columns now have the same size. size_t new_size = column.size(); for (size_t i = 1; i < num_elems; ++i) { const auto & element_column = extractElementColumn(column, i); if (element_column.size() != new_size) + { + // This is not a logical error because it may work with + // user-supplied data. throw Exception(ErrorCodes::SIZES_OF_COLUMNS_IN_TUPLE_DOESNT_MATCH, "Cannot read a ObjectToFetch because not all elements are present"); + } } } catch (...) @@ -131,9 +137,11 @@ void SerializationObjectToFetch::deserializeText(IColumn & column, ReadBuffer & } }); + // Special format for one element objecttofetch (1,) if (1 == elems.size()) { skipWhitespaceIfAny(istr); + // Allow both (1) and (1,) checkChar(',', istr); } skipWhitespaceIfAny(istr); diff --git a/src/DataTypes/Serializations/SerializationTuple.cpp b/src/DataTypes/Serializations/SerializationTuple.cpp index ac79fe02f2f..b963d35d785 100644 --- a/src/DataTypes/Serializations/SerializationTuple.cpp +++ b/src/DataTypes/Serializations/SerializationTuple.cpp @@ -65,12 +65,14 @@ void SerializationTuple::serializeBinary(const IColumn & column, size_t row_num, template static void addElementSafe(size_t num_elems, IColumn & column, F && impl) { + /// We use the assumption that tuples of zero size do not exist. size_t old_size = column.size(); try { impl(); + // Check that all columns now have the same size. size_t new_size = column.size(); for (size_t i = 1; i < num_elems; ++i) { diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index bcdc97983be..fe213f23886 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -50,7 +50,8 @@ class S3ObjectStorage : public IObjectStorage String version_id_, const S3Capabilities & s3_capabilities_, String bucket_, - String connection_string) + String connection_string, + ContextPtr context = nullptr) : bucket(bucket_) , client(std::move(client_)) , s3_settings(std::move(s3_settings_)) @@ -61,6 +62,8 @@ class S3ObjectStorage : public IObjectStorage data_source_description.description = connection_string; data_source_description.is_cached = false; data_source_description.is_encrypted = false; + if (context) + applyRemoteThrottlingSettings(context); log = &Poco::Logger::get(logger_name); } diff --git a/src/Disks/ObjectStorages/S3/registerDiskS3.cpp b/src/Disks/ObjectStorages/S3/registerDiskS3.cpp index 1c192a0d89c..b097cae7eb9 100644 --- a/src/Disks/ObjectStorages/S3/registerDiskS3.cpp +++ b/src/Disks/ObjectStorages/S3/registerDiskS3.cpp @@ -130,7 +130,7 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check) } else { - s3_storage = std::make_shared(std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint); + s3_storage = std::make_shared(std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint, context); auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context); metadata_storage = std::make_shared(metadata_disk, uri.key); } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 45963546081..1ca28743c3f 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -223,7 +223,7 @@ struct ContextSharedPart : boost::noncopyable String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function. String dictionaries_lib_path; /// Path to the directory with user provided binaries and libraries for external dictionaries. String user_scripts_path; /// Path to the directory with user provided scripts. - String vector_index_cache_path; /// Path to the directory of vector index cache for MSTG disk mode. + String vector_index_cache_path; /// Path to the directory of vector index cache for MyScale vector index disk mode. String tantivy_index_cache_path; /// Path to the directory of tantivy index cache. ConfigurationPtr config; /// Global configuration settings. diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 2210465a1ee..822d4ee566c 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1173,6 +1173,7 @@ class Context: public std::enable_shared_from_this HybridSearchInfoPtr getHybridSearchInfo() const; void setHybridSearchInfo(HybridSearchInfoPtr hybrid_search_info) const; void resetHybridSearchInfo() const; + private: std::unique_lock getLock() const; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 4443b69be7a..190ea6f9e0b 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -159,11 +159,12 @@ inline void checkTantivyIndex([[maybe_unused]]const StorageSnapshotPtr & storage std::pair getVectorIndexTypeAndParameterCheck(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, String & search_column_name) { + auto log = getLogger(); String index_type = ""; /// Obtain the default value of the `use_parameter_check` in the MergeTreeSetting. std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings()); bool use_parameter_check = storage_settings->vector_index_parameter_check; - LOG_TRACE(getLogger(), "vector_index_parameter_check value in MergeTreeSetting: {}", use_parameter_check); + LOG_TRACE(log, "vector_index_parameter_check value in MergeTreeSetting: {}", use_parameter_check); /// Obtain the type of the vector index recorded in the meta_data. if (metadata_snapshot) @@ -175,7 +176,7 @@ std::pair getVectorIndexTypeAndParameterCheck(const StorageMetadat if (vec_index_desc.column == search_column_name) { index_type = vec_index_desc.type; - LOG_TRACE(getLogger(), "The vector index type used for the query is `{}`", Poco::toUpper(index_type)); + LOG_TRACE(log, "The vector index type used for the query is `{}`", Poco::toUpper(index_type)); break; } @@ -196,7 +197,7 @@ std::pair getVectorIndexTypeAndParameterCheck(const StorageMetadat { use_parameter_check = new_value.get(); LOG_TRACE( - getLogger(), "vector_index_parameter_check value in sql definition: {}", use_parameter_check); + log, "vector_index_parameter_check value in sql definition: {}", use_parameter_check); break; } } @@ -570,7 +571,7 @@ void ExpressionAnalyzer::analyzeVectorScan(ActionsDAGPtr & temp_actions) has_vector_scan = true; } } - + /// Fill in dim and recognize VectorSearchType from metadata if (has_vector_scan) { getAndCheckVectorScanInfoFromMetadata(syntax->storage_snapshot, vector_scan_descriptions[0], getContext()); @@ -1837,6 +1838,7 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere( const auto & node = step.actions()->findInOutputs(prewhere_column_name); auto filter_type = node.result_type; + LOG_DEBUG(getLogger(), "[appendPrewhere] filter_type: {}", filter_type->getName()); if (!filter_type->canBeUsedInBooleanContext()) throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, "Invalid type for filter in PREWHERE: {}", filter_type->getName()); @@ -1846,6 +1848,8 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere( /// Remove unused source_columns from prewhere actions. auto tmp_actions_dag = std::make_shared(sourceColumns()); getRootActions(select_query->prewhere(), only_types, tmp_actions_dag); + /// Constants cannot be removed since they can be used in other parts of the query. + /// And if they are not used anywhere, except PREWHERE, they will be removed on the next step. tmp_actions_dag->removeUnusedActions( NameSet{prewhere_column_name}, /* allow_remove_inputs= */ true, @@ -2116,7 +2120,6 @@ void SelectQueryExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain, const auto * select_query = getSelectQuery(); ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); - getRootActions(select_query->select(), only_types, step.actions()); for (const auto & child : select_query->select()->children) diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index b458bd35575..fbbf955ace7 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -42,7 +42,6 @@ namespace Search enum class DataType; } - namespace DB { diff --git a/src/Parsers/ParserCreateQuery.cpp b/src/Parsers/ParserCreateQuery.cpp index d872d8542a1..4c622013b0f 100644 --- a/src/Parsers/ParserCreateQuery.cpp +++ b/src/Parsers/ParserCreateQuery.cpp @@ -185,13 +185,6 @@ bool ParserVectorIndexDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected type = function_node; } - - // if (!s_granularity.ignore(pos, expected)) - // return false; - - // if (!granularity_p.parse(pos, granularity, expected)) - // return false; - auto index = std::make_shared(); index->name = name->as().name(); index->column = column->as().name(); diff --git a/src/Processors/QueryPlan/ReadWithVectorScan.cpp.rej b/src/Processors/QueryPlan/ReadWithVectorScan.cpp.rej deleted file mode 100644 index 4568fd96b6d..00000000000 --- a/src/Processors/QueryPlan/ReadWithVectorScan.cpp.rej +++ /dev/null @@ -1,41 +0,0 @@ -diff a/src/Processors/QueryPlan/ReadWithVectorScan.cpp b/src/Processors/QueryPlan/ReadWithVectorScan.cpp (rejected hunks) -@@ -4,9 +4,17 @@ - #include - #include - #include -+#include - #include - #include - -+namespace ProfileEvents -+{ -+ extern const Event SelectedParts; -+ extern const Event SelectedRanges; -+ extern const Event SelectedMarks; -+} -+ - namespace DB - { - -@@ -188,19 +255,15 @@ Pipe ReadWithVectorScan::readFromParts( - }; - } - -- MarkRanges ranges; -- if (part->index_granularity.getMarksCount()) -- ranges.emplace_back(0, part->index_granularity.getMarksCount()); -- - auto source = std::make_shared( - data, - storage_snapshot, -- part, -+ part.data_part, - max_block_size, - preferred_block_size_bytes, - preferred_max_column_in_block_size_bytes, - required_columns, -- ranges, -+ part.ranges, - use_uncompressed_cache, - prewhere_info, - actions_settings, diff --git a/src/Storages/AlterCommands.cpp b/src/Storages/AlterCommands.cpp index 004764746ed..24e0ea36156 100644 --- a/src/Storages/AlterCommands.cpp +++ b/src/Storages/AlterCommands.cpp @@ -110,23 +110,31 @@ String getColumnNameFromLengthCheck(const ASTPtr & constraint_decl) bool getParameterCheckStatus(StorageInMemoryMetadata & metadata, ContextPtr context) { - std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings()); - bool use_parameter_check = storage_settings->vector_index_parameter_check; - if (metadata.hasSettingsChanges()) +std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings()); +bool use_parameter_check = storage_settings->vector_index_parameter_check; +LOG_TRACE( + &Poco::Logger::get("AlterCommand"), + "[getParameterCheckStatus] vector_index_parameter_check value in MergeTreeSetting: {}", + use_parameter_check); +if (metadata.hasSettingsChanges()) +{ + const auto current_changes = metadata.getSettingsChanges()->as().changes; + for (const auto & changed_setting : current_changes) { - const auto current_changes = metadata.getSettingsChanges()->as().changes; - for (const auto & changed_setting : current_changes) + const auto & setting_name = changed_setting.name; + const auto & new_value = changed_setting.value; + if (setting_name == "vector_index_parameter_check") { - const auto & setting_name = changed_setting.name; - const auto & new_value = changed_setting.value; - if (setting_name == "vector_index_parameter_check") - { - use_parameter_check = new_value.get(); - break; - } + use_parameter_check = new_value.get(); + LOG_TRACE( + &Poco::Logger::get("AlterCommand"), + "[getParameterCheckStatus] vector_index_parameter_check value in sql definition: {}", + use_parameter_check); + break; } } - return use_parameter_check; +} +return use_parameter_check; } std::optional AlterCommand::parse(const ASTAlterCommand * command_ast) diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 3e9d583e1c6..c02a9ce1883 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -1338,8 +1338,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( } catch (...) { - /// Remove the whole part directory if fetch of base - /// part or fetch of any projection was stopped. LOG_INFO(log, "Directory {} will be deleted due to an error during fetch part.", part_storage_for_loading->getRelativePath()); part_storage_for_loading->removeSharedRecursive(true); part_storage_for_loading->commitTransaction(); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 4e8b4b9e2e0..6f6f8830860 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1457,194 +1457,6 @@ bool IMergeTreeDataPart::isSmallPart() const return rows_count == 0 || rows_count < min_rows_to_build_vector_index; } -/// Used for upgrade -void IMergeTreeDataPart::convertIndexFileForUpgrade() -{ - IDataPartStorage & part_storage = const_cast(getDataPartStorage()); - - /// If checksums file needs to be generated. - bool has_intact_old_version_vector_index = false; - - /// Only supports either all index versions are in V1, or all versions are in V2 - String old_vector_index_ready_v1 = toString("vector_index_ready") + VECTOR_INDEX_FILE_OLD_SUFFIX; - String old_vector_index_ready_v2 = toString("vector_index_ready_v2") + VECTOR_INDEX_FILE_OLD_SUFFIX; - String old_index_description_v2 = toString(VECTOR_INDEX_DESCRIPTION) + VECTOR_INDEX_FILE_OLD_SUFFIX; - - auto metadata_snapshot = storage.getInMemoryMetadataPtr(); - auto vec_indices = metadata_snapshot->getVectorIndices(); - - /// Only one vector index is supported before upgrade to support multiple. - /// Use the first vector index description. - auto vector_index_desc = vec_indices[0]; - String current_index_description_name = toString(VECTOR_INDEX_DESCRIPTION) + VECTOR_INDEX_FILE_SUFFIX; - String current_checksums_file_name = VectorIndex::getVectorIndexChecksumsFileName(vector_index_desc.name); - String new_description_file_name = VectorIndex::getVectorIndexDescriptionFileName(vector_index_desc.name); - - /// Quick check the existence of new description file with index name. - bool from_checksums = false; - if (part_storage.exists(current_checksums_file_name)) - { - from_checksums = true; - if (part_storage.exists(new_description_file_name)) - { - /// The current version file already exists locally, no need to convert - LOG_DEBUG(storage.log, "The current version file already exists locally, does not need to convert"); - return; - } - } - - /// Used for upgrade from checksums version, need update checksums with new file names. - std::unordered_map converted_files_map; - for (auto it = part_storage.iterate(); it->isValid(); it->next()) - { - String file_name = it->name(); - - /// v1, v2 or checksums - if (from_checksums) - { - if (!endsWith(file_name, VECTOR_INDEX_FILE_SUFFIX)) - continue; - } - else if (!(endsWith(file_name, VECTOR_INDEX_FILE_OLD_SUFFIX))) - continue; - - /// vector index description file need to update name. - bool is_description = false; - - /// Check for checksums first - if (from_checksums) - { - if (endsWith(file_name, current_index_description_name)) - { - /// Lastest desciption file name with index name - if (endsWith(file_name, new_description_file_name)) - { - LOG_DEBUG(storage.log, "The current version file already exists locally, does not need to convert"); - return; - } - - has_intact_old_version_vector_index = true; - is_description = true; - } - else if (file_name == current_checksums_file_name) - continue; - } - else if (endsWith(file_name, old_vector_index_ready_v2)) /// v2 ready file - { - has_intact_old_version_vector_index = true; - - LOG_DEBUG(storage.log, "Delete ready file {}", file_name); - part_storage.removeFile(file_name); - - continue; - } - else if (endsWith(file_name, old_index_description_v2)) - { - is_description = true; - } - else if (endsWith(file_name, old_vector_index_ready_v1)) /// v1 ready file - { - has_intact_old_version_vector_index = true; - is_description = true; /// ready will be updated to description file. - } - - /// There are some common codes to get new description file name. - if (is_description) - { - String new_file_name = file_name; - if (endsWith(file_name, VECTOR_INDEX_FILE_OLD_SUFFIX)) - new_file_name = fs::path(file_name).replace_extension(VECTOR_INDEX_FILE_SUFFIX).string(); - - /// Replace vector_index_description to -vector_index_description - /// Replace merged---vector_index_description to merged----vector_index_description - new_file_name = std::regex_replace(new_file_name, std::regex("vector"), vector_index_desc.name + "-vector"); - converted_files_map[file_name] = new_file_name; - } - else - { - /// For other vector index files (exclude ready, checksum, or description files), update vector index file extension to latest. - /// Support multiple vector indices feature changes the vector index file name by removing column name. - /// e.g. --id_list will be updated to -id_list - - /// old vector index name - String old_vector_index_column_name = vector_index_desc.name + "-" + vector_index_desc.column; - if (file_name.find(old_vector_index_column_name) == std::string::npos) - { - /// Just check suffix - if (!from_checksums && endsWith(file_name, VECTOR_INDEX_FILE_OLD_SUFFIX)) - { - String new_file_name = fs::path(file_name).replace_extension(VECTOR_INDEX_FILE_SUFFIX).string(); - converted_files_map[file_name] = new_file_name; - } - continue; - } - - /// Replace "-" to "" - String new_file_name = std::regex_replace(file_name, std::regex(old_vector_index_column_name), vector_index_desc.name); - - if (!from_checksums && endsWith(new_file_name, VECTOR_INDEX_FILE_OLD_SUFFIX)) - new_file_name = fs::path(new_file_name).replace_extension(VECTOR_INDEX_FILE_SUFFIX).string(); - - converted_files_map[file_name] = new_file_name; - } - } - - /// Support multiple vector indice - /// No vector index files in part or incomplete vector index files - if (!has_intact_old_version_vector_index) - return; - - /// Here we collect converted files and upgrade is needed. - for (auto const & [old_name_, new_name_] : converted_files_map) - { - part_storage.moveFile(old_name_, new_name_); - LOG_DEBUG(storage.log, "Convert vector index file {} to {}", old_name_, new_name_); - } - - /// Old version vector index files have ready file, will generate checksums file. - Checksums vector_index_checksums; - if (!from_checksums) - vector_index_checksums = calculateVectorIndexChecksums(getDataPartStoragePtr(), part_storage.getRelativePath()); - else if (!converted_files_map.empty()) - { - if (!part_storage.exists(current_checksums_file_name)) - { - LOG_WARNING(storage.log, "checksums file '{}' doesn't exist in part {}, will not update it", current_checksums_file_name, name); - return; - } - - /// Multiple vector indices feature changes vector file names - MergeTreeDataPartChecksums old_checksums; - auto buf = part_storage.readFile(current_checksums_file_name, {}, std::nullopt, std::nullopt); - if (old_checksums.read(*buf)) - assertEOF(*buf); - - for (auto const & [name_, checksum_] : old_checksums.files) - { - String new_name_; - if (converted_files_map.contains(name_)) - { - new_name_ = converted_files_map[name_]; - } - else - { - new_name_ = name_; - } - vector_index_checksums.addFile(new_name_, checksum_.file_size, checksum_.file_hash); - } - - part_storage.removeFile(current_checksums_file_name); - } - - LOG_DEBUG(storage.log, "write vector index {} checksums file for part {}", vector_index_desc.name, name); - writeMetadata( - current_checksums_file_name, - {}, - [&vector_index_checksums](auto & buffer) { vector_index_checksums.write(buffer); }); - - /// Incomplete vector index files will be removed when loading checksums file. -} - void IMergeTreeDataPart::convertIndexFileForRestore() { auto metadata_snapshot = storage.getInMemoryMetadataPtr(); @@ -1848,6 +1660,7 @@ void IMergeTreeDataPart::onLightweightDelete(const String index_name) const /// Store deleted row ids std::vector del_row_ids; + /// Support multiple vector indices for (auto & vec_index_desc : metadata_snapshot->getVectorIndices()) { /// Only update the specified vector index diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index 5c71804ef27..f795cb65ec1 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -736,12 +736,14 @@ KeyCondition::KeyCondition( const ExpressionActionsPtr & key_expr_, NameSet array_joined_column_names_, bool single_point_, - bool strict_) + bool strict_, + bool unknown_false_) : key_expr(key_expr_) , key_subexpr_names(getAllSubexpressionNames(*key_expr)) , array_joined_column_names(std::move(array_joined_column_names_)) , single_point(single_point_) , strict(strict_) + , unknown_false(unknown_false_) { for (const auto & name : key_column_names) if (!key_columns.contains(name)) @@ -779,7 +781,8 @@ KeyCondition::KeyCondition( const Names & key_column_names, const ExpressionActionsPtr & key_expr_, bool single_point_, - bool strict_) + bool strict_, + bool unknown_false_) : KeyCondition( query_info.query, query_info.filter_asts, @@ -790,7 +793,8 @@ KeyCondition::KeyCondition( key_expr_, query_info.syntax_analyzer_result ? query_info.syntax_analyzer_result->getArrayJoinSourceNameSet() : NameSet{}, single_point_, - strict_) + strict_, + unknown_false_) { } @@ -801,12 +805,14 @@ KeyCondition::KeyCondition( const ExpressionActionsPtr & key_expr_, NameSet array_joined_column_names_, bool single_point_, - bool strict_) + bool strict_, + bool unknown_false_) : key_expr(key_expr_) , key_subexpr_names(getAllSubexpressionNames(*key_expr)) , array_joined_column_names(std::move(array_joined_column_names_)) , single_point(single_point_) , strict(strict_) + , unknown_false(unknown_false_) { for (const auto & name : key_column_names) if (!key_columns.contains(name)) @@ -2219,7 +2225,10 @@ BoolMask KeyCondition::checkInHyperrectangle( { if (element.function == RPNElement::FUNCTION_UNKNOWN) { - rpn_stack.emplace_back(true, true); + if (unknown_false) + rpn_stack.emplace_back(false, true); + else + rpn_stack.emplace_back(true, true); } else if (element.function == RPNElement::FUNCTION_IN_RANGE || element.function == RPNElement::FUNCTION_NOT_IN_RANGE) diff --git a/src/Storages/MergeTree/KeyCondition.h b/src/Storages/MergeTree/KeyCondition.h index 0a4ac93b082..cda7392834b 100644 --- a/src/Storages/MergeTree/KeyCondition.h +++ b/src/Storages/MergeTree/KeyCondition.h @@ -228,7 +228,8 @@ class KeyCondition const ExpressionActionsPtr & key_expr, NameSet array_joined_column_names, bool single_point_ = false, - bool strict_ = false); + bool strict_ = false, + bool unknown_false_ = false); /** Construct key condition from AST SELECT query WHERE, PREWHERE and additional filters. * Select query, additional filters, prepared sets are initialized using query info. @@ -239,7 +240,8 @@ class KeyCondition const Names & key_column_names, const ExpressionActionsPtr & key_expr_, bool single_point_ = false, - bool strict_ = false); + bool strict_ = false, + bool unknown_false_ = false); /// Construct key condition from ActionsDAG nodes KeyCondition( @@ -249,7 +251,8 @@ class KeyCondition const ExpressionActionsPtr & key_expr, NameSet array_joined_column_names, bool single_point_ = false, - bool strict_ = false); + bool strict_ = false, + bool unknown_false_ = false); /// Whether the condition and its negation are feasible in the direct product of single column ranges specified by `hyperrectangle`. BoolMask checkInHyperrectangle( @@ -491,6 +494,10 @@ class KeyCondition // If true, do not use always_monotonic information to transform constants bool strict; + + /// If true, FUCTION_UNKNOWN will be [false, true] in checkInHyperrectangle(). + /// Used for vector scan search to confirm if performPrefilter() is needed. + bool unknown_false; }; String extractFixedPrefixFromLikePattern(std::string_view like_pattern, bool requires_perfect_prefix); diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index b0f1193d224..5881467d5a3 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -616,7 +616,9 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::generateRowIdsMap() { const PaddedPODArray& col_data = checkAndGetColumn(*block.getByName("_part_offset").column)->getData(); for (size_t i = 0; i < block.rows(); ++i) + { part_offsets[part_num].emplace_back(col_data[i]); + } } } @@ -703,11 +705,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::generateRowIdsMap() int i = 0; while (old_row_id < partRowNum) { + UInt64 new_row_id = -1; if (parts_new_row_ids[source_num].count(old_row_id) > 0) { - UInt64 new_row_id = parts_new_row_ids[source_num][old_row_id]; - writeIntText(new_row_id, *global_ctx->row_ids_map_bufs[source_num]); - writeChar('\t', *global_ctx->row_ids_map_bufs[source_num]); + new_row_id = parts_new_row_ids[source_num][old_row_id]; } else { @@ -715,6 +716,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::generateRowIdsMap() deleteRowIds[i] = static_cast(old_row_id); i++; } + writeIntText(new_row_id, *global_ctx->row_ids_map_bufs[source_num]); + writeChar('\t', *global_ctx->row_ids_map_bufs[source_num]); ++old_row_id; } @@ -726,15 +729,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::generateRowIdsMap() const DataPartStorageOnDiskBase * part_storage = dynamic_cast(global_ctx->future_part->parts[source_num]->getDataPartStoragePtr().get()); if (part_storage == nullptr) - { throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported part storage."); - } VectorIndex::SegmentId segment_id( global_ctx->future_part->parts[source_num]->getDataPartStoragePtr(), global_ctx->future_part->parts[source_num]->name, vec_index_desc.name, vec_index_desc.column); + updateBitMap(segment_id, deleteRowIds); } } @@ -791,7 +793,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::generateRowIdsMap() } LOG_DEBUG(ctx->log, "After write row_source_pos: inverted_row_ids_map_buf size: {}", global_ctx->inverted_row_ids_map_buf->count()); - if (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Horizontal) { ctx->rows_sources_file.reset(); @@ -815,13 +816,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::generateRowIdsMap() { /// Release the buffer in advance to prevent fatal occurrences during subsequent buffer destruction. for (size_t i = 0; i < global_ctx->row_ids_map_bufs.size(); ++i) - { global_ctx->row_ids_map_bufs[i].reset(); - } + for (size_t i = 0; i < global_ctx->row_ids_map_uncompressed_bufs.size(); ++i) - { global_ctx->row_ids_map_uncompressed_bufs[i].reset(); - } global_ctx->inverted_row_ids_map_buf.reset(); global_ctx->inverted_row_ids_map_uncompressed_buf.reset(); @@ -1135,6 +1133,7 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const std::unordered_map vector_index_checksums_map_tmp; if (global_ctx->can_be_decouple) { + /// Support multiple vector indices for (auto & vec_index : global_ctx->metadata_snapshot->getVectorIndices()) { auto it = global_ctx->all_parts_have_vector_index.find(vec_index.name); @@ -1239,6 +1238,7 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const else if (global_ctx->only_one_vpart_merged) { /// In single one VPart case, move vector index files to new data part dir + /// Support multiple vector indices auto old_part = global_ctx->future_part->parts[global_ctx->first_part_with_data]; for (auto & vec_index : global_ctx->metadata_snapshot->getVectorIndices()) { @@ -1268,6 +1268,13 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const /// has no vector index, but should init index from local metadata. global_ctx->new_data_part->vector_index.loadVectorIndexFromLocalFile(); + /// Cancel source part build vector index + for (auto & old_part : global_ctx->future_part->parts) + { + old_part->vector_index.cancelAllIndexBuild(); + old_part->vector_index.waitAllIndexFinish(); + } + global_ctx->new_data_part->getDataPartStorage().precommitTransaction(); global_ctx->promise.set_value(global_ctx->new_data_part); diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index b8a11b636d0..2c9a5cd927c 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -182,6 +182,7 @@ class MergeTask scope_guard temporary_directory_lock; + /// Support multiple vector indices /// In multiple vector indices case, two replicas may have inconsistent vector indices, /// i.e. one replica has two vector indices built on two parts, while the slow replica has one part with v1 and v2, another part with only v1. /// Currently, ignore the incomplete vector index if not all merged parts contain it. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 50db0f9f839..6a73eccb0d3 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2374,6 +2374,7 @@ std::pair MergeTreeData::needClearVectorIndexCacheAndFile( return std::make_pair(true, false); } + /// Support multiple vector indices /// The active part contains the part in the cache_key. /// First make sure the vector index info in cache key is same with defined in metadata. If not exists or not same, remove cache and file. String index_name = cache_key.vector_index_name; @@ -2450,16 +2451,6 @@ void MergeTreeData::clearPKCache(const DataPartsVector & parts) } } -void MergeTreeData::clearVectorNvmeCache() const -{ - auto vector_nvme_cache_folder = fs::path(getContext()->getVectorIndexCachePath()) / VectorIndex::SegmentId::getPartRelativePath(getRelativeDataPath()); - if (fs::exists(vector_nvme_cache_folder)) - { - LOG_INFO(log, "Remove nvme cache folder: {}", vector_nvme_cache_folder); - fs::remove_all(vector_nvme_cache_folder); - } -} - void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool throw_on_error, NameSet * parts_failed_to_delete) { NameSet part_names_succeed; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 26df8326b70..a3f34446822 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -664,7 +664,6 @@ class MergeTreeData : public IStorage, public WithMutableContext void clearPartsFromFilesystem(const DataPartsVector & parts, bool throw_on_error = true, NameSet * parts_failed_to_delete = nullptr); void clearCachedVectorIndex(const DataPartsVector & parts, bool force = true); void clearPKCache(const DataPartsVector & parts); - void clearVectorNvmeCache() const; /// Check whether the cache and vector index file need to be deleted according to the part to which the cache belongs. std::pair needClearVectorIndexCacheAndFile( const DataPartPtr & part, const StorageMetadataPtr & metadata_snapshot, const VectorIndex::CacheKey & cache_key) const; @@ -1589,7 +1588,7 @@ class MergeTreeData : public IStorage, public WithMutableContext mutable TemporaryParts temporary_parts; mutable std::mutex currently_vector_index_status_mutex; - + /// Support multiple vector indices mutable std::unordered_map vector_indices_status; }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp b/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp index 78f68ea72fe..3ff697ab4eb 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp @@ -46,9 +46,11 @@ void MergeTreeDataPartChecksum::checkEqual(const MergeTreeDataPartChecksum & rhs void MergeTreeDataPartChecksum::checkSize(const IDataPartStorage & storage, const String & name) const { - /// Skip inverted index files, these have a default MergeTreeDataPartChecksum with file_size == 0 + /// Skip inverted and fts index files, these have a default MergeTreeDataPartChecksum with file_size == 0 if (name.ends_with(".gin_dict") || name.ends_with(".gin_post") || name.ends_with(".gin_seg") || name.ends_with(".gin_sid")) return; + if (name.ends_with(".data") || name.ends_with(".meta")) + return; if (!storage.exists(name)) throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "{} doesn't exist", fs::path(storage.getRelativePath()) / name); @@ -83,6 +85,10 @@ void MergeTreeDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & r if (name.ends_with(".gin_dict") || name.ends_with(".gin_post") || name.ends_with(".gin_seg") || name.ends_with(".gin_sid")) continue; + /// Exclude files written by fts index from check. No correct checksums are available for them currently. + if (name.ends_with(".meta") || name.ends_with(".data")) + continue; + auto jt = rhs.files.find(name); if (jt == rhs.files.end()) throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No file {} in data part", name); diff --git a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index b0245109979..005bcfd1a3a 100644 --- a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -60,21 +60,6 @@ class StorageFromMergeTreeDataPart final : public IStorage return std::make_shared(*this, metadata_snapshot, std::move(object_columns)); } - StorageSnapshotPtr getStorageSnapshotForQuery( - const StorageMetadataPtr & metadata_snapshot, const ASTPtr & /*query*/, ContextPtr /*query_context*/) const override - { - const auto & storage_columns = metadata_snapshot->getColumns(); - if (!hasDynamicSubcolumns(storage_columns)) - return std::make_shared(*this, metadata_snapshot); - - auto data_parts = storage.getDataPartsVectorForInternalUsage(); - - auto object_columns = getConcreteObjectColumns( - data_parts.begin(), data_parts.end(), storage_columns, [](const auto & part) -> const auto & { return part->getColumns(); }); - - return std::make_shared(*this, metadata_snapshot, std::move(object_columns)); - } - void read( QueryPlan & query_plan, const Names & column_names, diff --git a/src/Storages/MergeTree/TantivyIndexStore.cpp b/src/Storages/MergeTree/TantivyIndexStore.cpp index 8388cd81a57..50b3fa40697 100644 --- a/src/Storages/MergeTree/TantivyIndexStore.cpp +++ b/src/Storages/MergeTree/TantivyIndexStore.cpp @@ -240,12 +240,11 @@ ChecksumPairs TantivyIndexStore::serialize() written_bytes); ChecksumPairs checksums; - checksums.emplace_back( - index_data_file_name, - DB::MergeTreeDataPartChecksums::Checksum(data_hashing_uncompressed_stream->count(), data_hashing_uncompressed_stream->getHash())); - checksums.emplace_back( - index_meta_file_name, - DB::MergeTreeDataPartChecksums::Checksum(meta_hashing_uncompressed_stream->count(), meta_hashing_uncompressed_stream->getHash())); + // To prevent inconsistency issues with FTS index file checksums across multiple replicas, an empty checksum is generated here. + checksums.emplace_back(index_data_file_name, DB::MergeTreeDataPartChecksums::Checksum()); + // DB::MergeTreeDataPartChecksums::Checksum(data_hashing_uncompressed_stream->count(), data_hashing_uncompressed_stream->getHash())); + checksums.emplace_back(index_meta_file_name, DB::MergeTreeDataPartChecksums::Checksum()); + // DB::MergeTreeDataPartChecksums::Checksum(meta_hashing_uncompressed_stream->count(), meta_hashing_uncompressed_stream->getHash())); return checksums; } diff --git a/src/Storages/MergeTree/checkDataPart.cpp b/src/Storages/MergeTree/checkDataPart.cpp index b44dd7dafdb..b7bd0d00ef3 100644 --- a/src/Storages/MergeTree/checkDataPart.cpp +++ b/src/Storages/MergeTree/checkDataPart.cpp @@ -173,6 +173,10 @@ IMergeTreeDataPart::Checksums checkDataPart( if (file_name.ends_with(".gin_dict") || file_name.ends_with(".gin_post") || file_name.ends_with(".gin_seg") || file_name.ends_with(".gin_sid")) continue; + /// Exclude files written by fts index from check. No correct checksums are available for them currently. + if (file_name.ends_with(".data") || file_name.ends_with(".meta")) + continue; + /// Exclude vector index files to prevent conflicts with newly built vector indexes if (file_name.ends_with(VECTOR_INDEX_FILE_SUFFIX)) continue; diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index b786feb02e4..c2a138e7a95 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -92,8 +92,6 @@ struct PrewhereInfo return prewhere_info; } - - mutable std::mutex prewhere_info_mutex; }; /// Helper struct to store all the information about the filter expression. diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index 0b3c4daa004..0f000690419 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -160,6 +160,7 @@ bool StorageInMemoryMetadata::hasVectorIndices() const bool StorageInMemoryMetadata::hasVectorIndexOnColumn(const String & column_name) const { + /// Support multiple vector indices for (auto & vec_index : vec_indices) { if (vec_index.column == column_name) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index d8415e8b1a9..93a1d477824 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -140,8 +140,6 @@ void StorageMergeTree::startup() /// expand cleanup folder vector_tmp clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_", "vector_tmp_"}); - /// clear nvme cache - clearVectorNvmeCache(); /// NOTE background task will also do the above cleanups periodically. time_after_previous_cleanup_parts.restart(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7559f1b84f1..9b50dda97d3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3659,6 +3659,7 @@ void StorageReplicatedMergeTree::removeVecIndexBuildStatusForPartsFromZK(zkutil: { String part_name = part->name; + /// Support multiple vector indices for (const auto & vec_index_desc : getInMemoryMetadataPtr()->getVectorIndices()) { String vec_index_name = vec_index_desc.name; @@ -3719,6 +3720,7 @@ void StorageReplicatedMergeTree::cleanupVectorIndexBuildStatusFromZK(const Strin void StorageReplicatedMergeTree::startVectorIndexJob(const VIDescriptions & old_vec_indices, const VIDescriptions & new_vec_indices) { /// Compare old and new vector_indices to determine add or drop vector index + /// Support multiple vector indices /// Check drop vector index for (const auto & old_vec_index : old_vec_indices) { @@ -4892,10 +4894,6 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread) startBeingLeader(); - /// clear nvme cache - /// no need clear nvme cache in this field, reload vector index will reuse this cache. - clearVectorNvmeCache(); - /// Initilize vector index build status for each index for (const auto & vec_index_desc : getInMemoryMetadataPtr()->getVectorIndices()) addVectorIndexBuildStatus(vec_index_desc.name); @@ -5602,8 +5600,10 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: {}", metadata_version); if (metadata_diff.vector_indices_changed) + { + /// Support multiple vector indices startVectorIndexJob(old_vec_indices, getInMemoryMetadataPtr()->getVectorIndices()); - + } } { @@ -9792,7 +9792,7 @@ void StorageReplicatedMergeTree::attachRestoredParts(MutableDataPartsVector && p sink->writeExistingPart(part); } -void StorageReplicatedMergeTree::loadVectorIndexFromZookeeper() +std::unordered_map> StorageReplicatedMergeTree::getPreloadVectorIndicesFromZK() { auto zookeeper = getZooKeeper(); @@ -9801,8 +9801,10 @@ void StorageReplicatedMergeTree::loadVectorIndexFromZookeeper() if (!success || vector_index_info.empty()) { + /// try other replicas Strings replicas = zookeeper->getChildren(fs::path(zookeeper_path) / "replicas"); + /// Select replicas in uniformly random order. std::shuffle(replicas.begin(), replicas.end(), thread_local_rng); for (const String & replica : replicas) @@ -9824,7 +9826,7 @@ void StorageReplicatedMergeTree::loadVectorIndexFromZookeeper() if (vector_index_info.empty()) { LOG_INFO(log, "No vector index info found on zookeeper for table {}", getStorageID().getFullTableName()); - return; + return {}; } ReadBufferFromString in(vector_index_info); @@ -9846,6 +9848,14 @@ void StorageReplicatedMergeTree::loadVectorIndexFromZookeeper() } } + return vector_indices; +} + +void StorageReplicatedMergeTree::loadVectorIndexFromZookeeper() +{ + + std::unordered_map> vector_indices = getPreloadVectorIndicesFromZK(); + LOG_INFO(log, "Load {} vector indices from keeper", vector_indices.size()); Stopwatch watch; @@ -9907,6 +9917,7 @@ void StorageReplicatedMergeTree::writeVectorIndexInfoToZookeeper(bool force) std::unordered_map> cached_index_parts; std::unordered_map index_column_map; + /// get cached vector index & parts for current table for (const auto & cache_item : cache_list) { auto cache_key = cache_item.first; @@ -9929,6 +9940,7 @@ void StorageReplicatedMergeTree::writeVectorIndexInfoToZookeeper(bool force) WriteBufferFromOwnString out; int count = 0; + /// get active part name (without mutation) for cached vector index for (const auto & index_parts : cached_index_parts) { auto index_name = index_parts.first; diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 30a8c8a6932..1fe101f5619 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -951,6 +951,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData /// In create and drop vector index cases, do some operations. void startVectorIndexJob(const VIDescriptions & old_vec_indices, const VIDescriptions & new_vec_indices); + std::unordered_map> getPreloadVectorIndicesFromZK(); /// Get cached vector index info from zookeeper and load into cache. void loadVectorIndexFromZookeeper(); diff --git a/src/VectorIndex/CMakeLists.txt b/src/VectorIndex/CMakeLists.txt index 9b7f1da469a..f9388be504c 100644 --- a/src/VectorIndex/CMakeLists.txt +++ b/src/VectorIndex/CMakeLists.txt @@ -14,6 +14,7 @@ target_link_libraries(clickhouse_vector_index dbms PRIVATE ch_contrib::search_index + daemon ) if (TARGET ch_rust::supercrate) diff --git a/src/VectorIndex/Cache/VICacheManager.cpp b/src/VectorIndex/Cache/VICacheManager.cpp index 94f6be96df5..0623405bb80 100644 --- a/src/VectorIndex/Cache/VICacheManager.cpp +++ b/src/VectorIndex/Cache/VICacheManager.cpp @@ -175,10 +175,11 @@ std::list> VICacheManager::getAllItems() std::list>> cache_list = cache->getCacheList(); for (auto cache_item : cache_list) + { // key --- string // value --- std::shared_ptr result.emplace_back(std::make_pair(cache_item.first, cache_item.second->des)); - + } return result; } diff --git a/src/VectorIndex/Cache/VICacheManager.h b/src/VectorIndex/Cache/VICacheManager.h index 6cf6c677b10..6681b9cee61 100644 --- a/src/VectorIndex/Cache/VICacheManager.h +++ b/src/VectorIndex/Cache/VICacheManager.h @@ -82,6 +82,7 @@ class VectorIndexCache class VICacheManager { // cache manager manages a series of cache instance. + // these caches could either be cache in memory or cache on GPU device. // it privides a getInstance() method which returns a consistent view // of all caches to all classes trying to access cache. diff --git a/src/VectorIndex/Common/SegmentId.cpp b/src/VectorIndex/Common/SegmentId.cpp index faf180cb839..cab6dc7a532 100644 --- a/src/VectorIndex/Common/SegmentId.cpp +++ b/src/VectorIndex/Common/SegmentId.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace fs = std::filesystem; @@ -41,7 +42,6 @@ namespace ErrorCodes { extern const int BAD_ARGUMENTS; } - } namespace VectorIndex @@ -93,6 +93,12 @@ std::tuple, std::shared_ptr, std::shared_ptrpush_back(*row_source_pos); ++row_source_pos; } @@ -115,7 +121,6 @@ std::tuple, std::shared_ptr, std::shared_ptrignore(); inverted_row_ids_map->push_back(row_id); } - return std::make_tuple( row_ids_map, inverted_row_ids_map, inverted_row_sources_map); } diff --git a/src/VectorIndex/Common/VIMetadata.h b/src/VectorIndex/Common/VIMetadata.h index 26b637921f3..462b561049e 100644 --- a/src/VectorIndex/Common/VIMetadata.h +++ b/src/VectorIndex/Common/VIMetadata.h @@ -55,6 +55,8 @@ class VIMetadata const SegmentId & segment_id; + /// Current part name is not provided and is read from the description file. + /// This value is incorrect in some scenarios. DB::String version; VIType type; VIMetric metric; diff --git a/src/VectorIndex/Common/VIPartReader.h b/src/VectorIndex/Common/VIPartReader.h index 68e78548f2e..141fb4d63da 100644 --- a/src/VectorIndex/Common/VIPartReader.h +++ b/src/VectorIndex/Common/VIPartReader.h @@ -237,8 +237,7 @@ class VIPartReader : public VISourcePartReader vector_raw_data[row * dimension + i] = src_vec[vec_start_offset + i]; } ids[row] = current_round_start_row + row; - } - else + } else { /// Illegal vector empty_ids.emplace_back(current_round_start_row + row); diff --git a/src/VectorIndex/Common/VIWithDataPart.cpp b/src/VectorIndex/Common/VIWithDataPart.cpp index c075d544a65..5a972cd2bf9 100644 --- a/src/VectorIndex/Common/VIWithDataPart.cpp +++ b/src/VectorIndex/Common/VIWithDataPart.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -391,7 +392,7 @@ void VIWithColumnInPart::cancelBuild() build_index_info->cancel_build.store(true); } -VIVariantPtr VIWithColumnInPart::createIndex() const +VIVariantPtr VIWithColumnInPart::createIndex(bool is_dummy) const { auto metric = Search::getMetricType(metric_str, vector_search_type); VIVariantPtr index_variant; @@ -405,8 +406,11 @@ VIVariantPtr VIWithColumnInPart::createIndex() const return key != "metric_type"; }); - const String vector_index_cache_prefix - = getUniqueVectorIndexCachePrefix(part_storage->getRelativePath(), current_part_name, index_name); + String vector_index_cache_prefix + = getVectorIndexCachePrefix(part_storage->getRelativePath(), current_part_name, index_name); + + if (is_dummy) + vector_index_cache_prefix = String(fs::path(vector_index_cache_prefix).parent_path()) + "-dummy" + "/"; if (vector_search_type == Search::DataType::FloatVector) index_variant = Search::createVectorIndex( @@ -416,9 +420,14 @@ VIVariantPtr VIWithColumnInPart::createIndex() const dimension, total_vec, index_des, + max_threads, vector_index_cache_prefix, - true /* use_file_checksum */, - true /* manage_cache_folder */); + [base_daemon = BaseDaemon::tryGetInstance()]() + { + if (base_daemon.has_value()) + return base_daemon.value().get().isCancelled(); + return false; + }); else if (vector_search_type == Search::DataType::BinaryVector) index_variant = Search::createVectorIndex( index_name, @@ -427,21 +436,18 @@ VIVariantPtr VIWithColumnInPart::createIndex() const dimension, total_vec, index_des, + max_threads, vector_index_cache_prefix, - true /* use_file_checksum */, - true /* manage_cache_folder */); + [base_daemon = BaseDaemon::tryGetInstance()]() + { + if (base_daemon.has_value()) + return base_daemon.value().get().isCancelled(); + return false; + }); return index_variant; } -#ifdef ENABLE_SCANN -std::shared_ptr VIWithColumnInPart::getDiskIOManager() const -{ - return nullptr; - -} -#endif - void VIWithColumnInPart::serialize( VIVariantPtr & index, DiskPtr disk, @@ -569,7 +575,7 @@ IndexWithMetaHolderPtr VIWithColumnInPart::loadDecoupleCache(SegmentId & segment } /// cancel load vector index implement -IndexWithMetaHolderPtr VIWithColumnInPart::load(SegmentId & segment_id, bool is_active, const String & nvme_cache_path_uuid) +IndexWithMetaHolderPtr VIWithColumnInPart::load(SegmentId & segment_id, bool is_active) { OpenTelemetry::SpanHolder span("VIWithColumnInPart::load"); VICacheManager * mgr = VICacheManager::getInstance(); @@ -638,8 +644,12 @@ IndexWithMetaHolderPtr VIWithColumnInPart::load(SegmentId & segment_id, bool is_ } index_params.setParam("load_index_version", metadata.version); - String vector_index_cache_prefix = getUniqueVectorIndexCachePrefix( - part_storage->getRelativePath(), segment_id.getCacheKey().part_name_no_mutation, index_name, nvme_cache_path_uuid); + String vector_index_cache_prefix = getVectorIndexCachePrefix( + part_storage->getRelativePath(), segment_id.getCacheKey().part_name_no_mutation, index_name); + + if (segment_id.fromMergedParts()) + vector_index_cache_prefix = fs::path(vector_index_cache_prefix).parent_path().string() + String("-decouple/"); + VIVariantPtr index_variant; if (vector_search_type == Search::DataType::FloatVector) @@ -650,9 +660,14 @@ IndexWithMetaHolderPtr VIWithColumnInPart::load(SegmentId & segment_id, bool is_ metadata.dimension, metadata.total_vec, index_params, + max_threads, vector_index_cache_prefix, - true /* use_file_checksum */, - true /* manage_cache_folder */); + [base_daemon = BaseDaemon::tryGetInstance()]() + { + if (base_daemon.has_value()) + return base_daemon.value().get().isCancelled(); + return false; + }); else if (vector_search_type == Search::DataType::BinaryVector) index_variant = Search::createVectorIndex( index_name, @@ -661,9 +676,14 @@ IndexWithMetaHolderPtr VIWithColumnInPart::load(SegmentId & segment_id, bool is_ metadata.dimension, metadata.total_vec, index_params, + max_threads, vector_index_cache_prefix, - true /* use_file_checksum */, - true /* manage_cache_folder */); + [base_daemon = BaseDaemon::tryGetInstance()]() + { + if (base_daemon.has_value()) + return base_daemon.value().get().isCancelled(); + return false; + }); auto file_reader = Search::IndexDataFileReader( segment_id.getFullPath(), @@ -767,7 +787,7 @@ bool VIWithColumnInPart::cache(VIVariantPtr index) auto delete_bitmap = std::make_shared(total_vec, true); convertBitmap(segment_id, part_deleted_row_ids, delete_bitmap, nullptr, nullptr, nullptr); - String vector_index_cache_prefix = getUniqueVectorIndexCachePrefix(part_storage->getRelativePath(), current_part_name, index_name); + String vector_index_cache_prefix = getVectorIndexCachePrefix(part_storage->getRelativePath(), current_part_name, index_name); /// when cacheIndexAndMeta() is called, related files should have already been loaded. VectorIndexWithMetaPtr cache_item = std::make_shared( index, total_vec, delete_bitmap, des, nullptr, nullptr, nullptr, 0, disk_mode, fallback_to_flat, vector_index_cache_prefix); @@ -936,6 +956,21 @@ SearchResultPtr VIWithColumnInPart::search( } } +void VIWithColumnInPart::waitBuildFinish(const size_t timeout) +{ + auto timeout_duration = std::chrono::milliseconds(timeout); + std::chrono::steady_clock::time_point end_of_timeout + = std::chrono::steady_clock::now() + std::chrono::duration_cast(timeout_duration); + while (std::chrono::steady_clock::now() < end_of_timeout) + { + if (build_index_info->state != VIState::BUILDING) + return; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + LOG_WARNING(log, "Wait index build finish timeout."); +} + + bool VIWithDataPart::isBuildCancelled(const String & index_name) { std::shared_lock lock(vector_indices_mutex); @@ -958,6 +993,13 @@ void VIWithDataPart::cancelAllIndexBuild() it.second->cancelBuild(); } +void VIWithDataPart::waitAllIndexFinish() +{ + std::shared_lock lock(vector_indices_mutex); + for (auto it : vector_indices) + it.second->waitBuildFinish(); +} + /// revert according std::optional std::optional VIWithDataPart::getColumnIndex(const String & index_name) { diff --git a/src/VectorIndex/Common/VIWithDataPart.h b/src/VectorIndex/Common/VIWithDataPart.h index bcfe4f6b57d..d1f8083ee90 100644 --- a/src/VectorIndex/Common/VIWithDataPart.h +++ b/src/VectorIndex/Common/VIWithDataPart.h @@ -37,6 +37,7 @@ #include +#define WAIT_BUILD_INDEX_TIMEOUT 10 * 1000 /// 10s namespace VectorIndex { @@ -216,7 +217,7 @@ class VIWithColumnInPart std::shared_ptr & vector_index_checksum, VIBuildMemoryUsageHelper & build_memory_lock); - IndexWithMetaHolderPtr load(SegmentId & segment_id, bool is_active = true, const String & nvme_cache_path_uuid = ""); + IndexWithMetaHolderPtr load(SegmentId & segment_id, bool is_active = true); IndexWithMetaHolderPtr loadDecoupleCache(SegmentId & segment_id); @@ -237,6 +238,8 @@ class VIWithColumnInPart bool hasUsableVectorIndex(); + void waitBuildFinish(const size_t timeout = WAIT_BUILD_INDEX_TIMEOUT); + static void transferToNewRowIds(const VIWithMeta & index_with_meta, SearchResultPtr result); static SearchResultPtr TransferToOldRowIds(const VIWithMeta & index_with_meta, const SearchResultPtr result); @@ -255,11 +258,7 @@ class VIWithColumnInPart static bool canMergeForColumnIndex(const MergeTreeDataPartPtr & left, const MergeTreeDataPartPtr & right, const String & vec_index_name); private: - VIVariantPtr createIndex() const; - -#ifdef ENABLE_SCANN - std::shared_ptr getDiskIOManager() const; -#endif + VIVariantPtr createIndex(bool is_dummy = true) const; static std::once_flag once; static int max_threads; @@ -311,7 +310,7 @@ void VIWithColumnInPart::buildIndex( if (num_threads == 0) num_threads = 1; - index_variant = createIndex(); + index_variant = createIndex(false); typename SearchIndexDataTypeMap::VectorIndexPtr index_ptr; if constexpr (T == Search::DataType::FloatVector) { @@ -452,6 +451,8 @@ class VIWithDataPart void cancelAllIndexBuild(); + void waitAllIndexFinish(); + void convertIndexFileForUpgrade(); void loadVectorIndexFromLocalFile(bool need_convert_index_file = false); diff --git a/src/VectorIndex/Interpreters/VIEventLog.h b/src/VectorIndex/Interpreters/VIEventLog.h index 5eea5114d40..051db9aba37 100644 --- a/src/VectorIndex/Interpreters/VIEventLog.h +++ b/src/VectorIndex/Interpreters/VIEventLog.h @@ -52,7 +52,7 @@ struct VIEventLogElement }; String database_name; String table_name; - String index_name; + String index_name; /// Support multiple vector indices mutable String part_name; mutable String current_part_name; mutable String partition_id; diff --git a/src/VectorIndex/Processors/ReadWithHybridSearch.cpp b/src/VectorIndex/Processors/ReadWithHybridSearch.cpp index 59a4db43401..8ceee581d75 100644 --- a/src/VectorIndex/Processors/ReadWithHybridSearch.cpp +++ b/src/VectorIndex/Processors/ReadWithHybridSearch.cpp @@ -263,32 +263,53 @@ void ReadWithHybridSearch::initializePipeline(QueryPipelineBuilder & pipeline, c return; } -/* + if(isFinal(query_info)) { - std::vector add_columns = metadata_for_reading->getColumnsRequiredForSortingKey(); - column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); + std::set add_columns; + add_columns.insert(column_names_to_read.begin(),column_names_to_read.end()); + + for(auto temp_name : metadata_for_reading->getColumnsRequiredForSortingKey()) + { + if(!add_columns.contains(temp_name)) + { + column_names_to_read.push_back(temp_name); + add_columns.insert(temp_name); + } + } if (!data.merging_params.is_deleted_column.empty()) { - column_names_to_read.push_back(data.merging_params.is_deleted_column); + if(!add_columns.contains(data.merging_params.is_deleted_column)) + { + column_names_to_read.push_back(data.merging_params.is_deleted_column); + add_columns.insert(data.merging_params.is_deleted_column); + } + LOG_DEBUG(log, "merging_params.is_deleted_column is : {}", data.merging_params.is_deleted_column); } if (!data.merging_params.sign_column.empty()) { - column_names_to_read.push_back(data.merging_params.sign_column); + if(!add_columns.contains(data.merging_params.sign_column)) + { + column_names_to_read.push_back(data.merging_params.sign_column); + add_columns.insert(data.merging_params.sign_column); + } + LOG_DEBUG(log, "merging_params.sign_column is : {}", data.merging_params.sign_column); } if (!data.merging_params.version_column.empty()) { - column_names_to_read.push_back(data.merging_params.version_column); + if(!add_columns.contains(data.merging_params.version_column)) + { + column_names_to_read.push_back(data.merging_params.version_column); + add_columns.insert(data.merging_params.version_column); + } LOG_DEBUG(log, "merging_params.version_column is : {}", data.merging_params.version_column); } - ::sort(column_names_to_read.begin(), column_names_to_read.end()); - column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); } -*/ + /// Reference spreadMarkRangesAmongStreams() pipe = createReadProcessorsAmongParts( @@ -439,7 +460,7 @@ Pipe ReadWithHybridSearch::createReadProcessorsAmongParts( }); } -/* + if(isFinal(query_info)) { /// Add generating sorting key processor @@ -474,7 +495,7 @@ Pipe ReadWithHybridSearch::createReadProcessorsAmongParts( max_block_size); } -*/ + return pipe; } @@ -489,14 +510,6 @@ Pipe ReadWithHybridSearch::readFromParts( if (!query_info.has_hybrid_search) return {}; - /// Prewhere info should not be changed, because it is shared by parts. - if (prewhere_info) - { - /// need_filter is false when both prewhere and where exist, prewhere will be delayed, all read rows with a prehwere_column returned. - /// In this case, we need only rows statisfied prewhere conditions. - prewhere_info->need_filter = true; - } - for (const auto & part : parts) { MergeTreeBaseSearchManagerPtr search_manager = nullptr; @@ -524,6 +537,8 @@ Pipe ReadWithHybridSearch::readFromParts( auto algorithm = std::make_unique( search_manager, + context, + requested_num_streams, data, storage_snapshot, part.data_part, diff --git a/src/VectorIndex/Storages/HybridSearchResult.h b/src/VectorIndex/Storages/HybridSearchResult.h index 5cc415676b7..dd8df6966a0 100644 --- a/src/VectorIndex/Storages/HybridSearchResult.h +++ b/src/VectorIndex/Storages/HybridSearchResult.h @@ -13,7 +13,6 @@ * limitations under the License. */ - #pragma once #include diff --git a/src/VectorIndex/Storages/MergeTreeSelectWithHybridSearchProcessor.cpp b/src/VectorIndex/Storages/MergeTreeSelectWithHybridSearchProcessor.cpp index 0cf73353e19..58676117da0 100644 --- a/src/VectorIndex/Storages/MergeTreeSelectWithHybridSearchProcessor.cpp +++ b/src/VectorIndex/Storages/MergeTreeSelectWithHybridSearchProcessor.cpp @@ -14,12 +14,15 @@ */ #include +#include #include #include +#include #include #include #include #include +#include #include #include @@ -60,36 +63,207 @@ static bool isHybridSearchByPk(const std::vector & pk_col_names, const s return match; } -/// Referenced from MergeTreeSelectProcessor::initializeReaders() and MergeTreeBaseSelectProcessor::initializeMergeTreeReadersForPart() -void MergeTreeSelectWithHybridSearchProcessor::initializeReadersWithHybridSearch() +/// Referenced from MergeTreeSelectProcessor::initializeReaders() +void MergeTreeSelectWithHybridSearchProcessor::initializeReaders() { - OpenTelemetry::SpanHolder span("MergeTreeSelectWithHybridSearchProcessor::initializeReadersWithHybridSearch()"); - task_columns = getReadTaskColumns( - LoadedMergeTreeDataPartInfoForReader(data_part, alter_conversions), storage_snapshot, - required_columns, virt_column_names, nullptr, actions_settings, reader_settings, /*with_subcolumns=*/ true); + OpenTelemetry::SpanHolder span("MergeTreeSelectWithHybridSearchProcessor::initializeReaders()"); + + /// Special handling of partition key condition in prehwere condition + can_skip_peform_prefilter = canSkipPrewhereForPart(storage_snapshot->getMetadataForQuery()); + if (can_skip_peform_prefilter) + { + LOG_DEBUG(log, "Skip to call performPrefilter() for part {} due to a prewhere condition with partition key is true.", data_part->name); + + /// Normal as regular read + task_columns = getReadTaskColumns( + LoadedMergeTreeDataPartInfoForReader(data_part, alter_conversions), storage_snapshot, + required_columns, virt_column_names, prewhere_info, actions_settings, reader_settings, /*with_subcolumns=*/ true); + } + else + { + task_columns = getReadTaskColumns( + LoadedMergeTreeDataPartInfoForReader(data_part, alter_conversions), storage_snapshot, + required_columns, virt_column_names, /*prewhere_info*/ nullptr, actions_settings, reader_settings, /*with_subcolumns=*/ true); + } /// Will be used to distinguish between PREWHERE and WHERE columns when applying filter const auto & column_names = task_columns.columns.getNames(); column_name_set = NameSet{column_names.begin(), column_names.end()}; if (use_uncompressed_cache) - owned_uncompressed_cache = storage.getContext()->getUncompressedCache(); + owned_uncompressed_cache = context->getUncompressedCache(); + + owned_mark_cache = context->getMarkCache(); - owned_mark_cache = storage.getContext()->getMarkCache(); + /// Referenced from MergeTreeBaseSelectProcessor::initializeMergeTreeReadersForPart() + const auto & metadata_snapshot = storage_snapshot->getMetadataForQuery(); - reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(), + reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), alter_conversions, reader_settings, {}, {}); + /// Referenced from IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart() pre_reader_for_step.clear(); /// Add lightweight delete filtering step if (reader_settings.apply_deleted_mask && data_part->hasLightweightDelete()) { - pre_reader_for_step.push_back(data_part->getReader({LightweightDeleteDescription::FILTER_COLUMN}, storage_snapshot->getMetadataForQuery(), + pre_reader_for_step.push_back(data_part->getReader({LightweightDeleteDescription::FILTER_COLUMN}, metadata_snapshot, all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), alter_conversions, reader_settings, {}, {})); } + + /// Need to apply prewhere if performPrefilter() is skipped + if (prewhere_info && can_skip_peform_prefilter) + { + for (const auto & pre_columns_per_step : task_columns.pre_columns) + { + pre_reader_for_step.push_back( + data_part->getReader( + pre_columns_per_step, metadata_snapshot, all_mark_ranges, + owned_uncompressed_cache.get(), owned_mark_cache.get(), + alter_conversions, reader_settings, {}, {})); + } + } +} + +/// Referenced from IMergeTreeSelectAlgorithm::initializeRangeReaders() +void MergeTreeSelectWithHybridSearchProcessor::initializeRangeReadersWithHybridSearch(MergeTreeReadTask & current_task) +{ + bool has_lightweight_delete = current_task.data_part->hasLightweightDelete(); + + /// Initialize primary key cache + const auto & primary_key = storage_snapshot->metadata->getPrimaryKey(); + const bool enable_primary_key_cache = current_task.data_part->storage.canUsePrimaryKeyCache(); + LOG_DEBUG(log, "Reader setting: enable_primary_key_cache = {}", enable_primary_key_cache); + + /// consider cache if and only if + /// 1. this task is vector search and no prewhere info + /// 2. primary key is only a column, and select columns are (pk, hybrid_search_func) + /// 3. primary key's value is represented by number + use_primary_key_cache = enable_primary_key_cache && PKCacheManager::isSupportedPrimaryKey(primary_key) + && isHybridSearchByPk(primary_key.column_names, ordered_names); + + /// Add _part_offset to non_const_virtual_column_names if part has lightweight delete + /// prewhere info will be apply on read result + if (has_lightweight_delete || can_skip_peform_prefilter) + { + bool found = false; + for (const auto & column_name : non_const_virtual_column_names) + { + if (column_name == "_part_offset") + { + found = true; + break; + } + } + + if (!found) + { + non_const_virtual_column_names.emplace_back("_part_offset"); + need_remove_part_offset = true; + } + } + + auto & range_reader = current_task.range_reader; + auto & pre_range_readers = current_task.pre_range_readers; + + MergeTreeRangeReader* prev_reader = nullptr; + bool last_reader = false; + size_t pre_readers_shift = 0; + + /// Add filtering step with lightweight delete mask + if (reader_settings.apply_deleted_mask && has_lightweight_delete) + { + MergeTreeRangeReader pre_range_reader(pre_reader_for_step[0].get(), prev_reader, &lightweight_delete_filter_step, last_reader, non_const_virtual_column_names); + pre_range_readers.push_back(std::move(pre_range_reader)); + prev_reader = &pre_range_readers.back(); + pre_readers_shift++; + } + + if (prewhere_info && can_skip_peform_prefilter) + { + if (prewhere_actions->steps.size() + pre_readers_shift != pre_reader_for_step.size()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "PREWHERE steps count mismatch, actions: {}, readers: {}", + prewhere_actions->steps.size(), pre_reader_for_step.size()); + } + + for (size_t i = 0; i < prewhere_actions->steps.size(); ++i) + { + last_reader = reader->getColumns().empty() && (i + 1 == prewhere_actions->steps.size()); + + MergeTreeRangeReader current_reader(pre_reader_for_step[i + pre_readers_shift].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names); + + pre_range_readers.push_back(std::move(current_reader)); + prev_reader = &pre_range_readers.back(); + } + } + + range_reader = MergeTreeRangeReader(reader.get(), prev_reader, nullptr, true, non_const_virtual_column_names); +} + +bool MergeTreeSelectWithHybridSearchProcessor::canSkipPrewhereForPart(const StorageMetadataPtr & metadata_snapshot) +{ + if (!prewhere_info || !metadata_snapshot->hasPartitionKey() || !prewhere_info->prewhere_actions) + return false; + + if (!context->getSettingsRef().optimize_prefilter_in_search) + return false; + + const auto & partition_key = metadata_snapshot->getPartitionKey(); + + /// Get column names required for partition key + auto minmax_columns_names = storage.getMinMaxColumnsNames(partition_key); + + /// Quick check if requried column names has any partition key column name + /// Get column names in prewhere condition + auto required_columns = prewhere_info->prewhere_actions->getRequiredColumnsNames(); + bool exists = false; + + for (const auto & required_column : required_columns) + { + if (std::find(minmax_columns_names.begin(), minmax_columns_names.end(), required_column) + != minmax_columns_names.end()) + { + exists = true; + break; + } + } + + /// Not found partition key column in prewhere, cannot skip. + if (!exists) + return false; + + /// Reference PartitionPrunner using KeyCondition, difference is that FUNCTION_UNKNOWN returns false. + KeyCondition partition_prunner_condition( + prewhere_info->prewhere_actions, context, partition_key.column_names, + partition_key.expression, {}, true /* single_point */, false, true /* known_false */); + + const auto & partition_value = data_part->partition.value; + std::vector index_value(partition_value.begin(), partition_value.end()); + for (auto & field : index_value) + { + // NULL_LAST + if (field.isNull()) + field = POSITIVE_INFINITY; + } + + if (partition_prunner_condition.mayBeTrueInRange( + partition_value.size(), index_value.data(), index_value.data(), partition_key.data_types)) + return true; + + /// Try minmax idx on columns required by partition key + auto minmax_expression_actions = storage.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)); + DataTypes minmax_columns_types = storage.getMinMaxColumnsTypes(partition_key); + + KeyCondition minmax_idx_condition( + prewhere_info->prewhere_actions, context, minmax_columns_names, + minmax_expression_actions, {}, false /* single_point */, false, true /* known_false */); + + return minmax_idx_condition.checkInHyperrectangle(data_part->minmax_idx->hyperrectangle, minmax_columns_types).can_be_true; } VIBitmapPtr MergeTreeSelectWithHybridSearchProcessor::performPrefilter(MarkRanges & mark_ranges) @@ -128,33 +302,102 @@ VIBitmapPtr MergeTreeSelectWithHybridSearchProcessor::performPrefilter(MarkRange } } - /// No need to return prewhere column + /// Clone an prewhere_info for performPrefilter() + PrewhereInfoPtr prewhere_info_copy = prewhere_info->clone(); + prewhere_info_copy->need_filter = true; + prewhere_info_copy->remove_prewhere_column = true; + + /// Only one part + RangesInDataParts parts_with_ranges; + parts_with_ranges.emplace_back(data_part, std::make_shared(), 0, mark_ranges); + + /// spreadMarkRangesAmongStreams() + const auto & settings = context->getSettingsRef(); + const auto data_settings = storage.getSettings(); + + size_t sum_marks = data_part->getMarksCount(); + size_t min_marks_for_concurrent_read = 0; + min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead( + settings.merge_tree_min_rows_for_concurrent_read, settings.merge_tree_min_bytes_for_concurrent_read, + data_settings->index_granularity, data_settings->index_granularity_bytes, sum_marks); + + size_t num_streams = max_streamns_for_prewhere; + if (num_streams > 1) { - std::lock_guard lock(prewhere_info->prewhere_info_mutex); - if (!prewhere_info->remove_prewhere_column) - prewhere_info->remove_prewhere_column = true; + /// Reduce the number of num_streams if the data is small. + if (sum_marks < num_streams * min_marks_for_concurrent_read && parts_with_ranges.size() < num_streams) + num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts_with_ranges.size()); } - auto algorithm = std::make_unique( - storage, - storage_snapshot, - data_part, - alter_conversions, - max_block_size_rows, - preferred_block_size_bytes, - preferred_max_column_in_block_size_bytes, - requried_columns, - mark_ranges, - use_uncompressed_cache, - prewhere_info, - actions_settings, - reader_settings, - nullptr, - system_columns); + Pipe pipe; + + if (num_streams > 1) + { + Pipes pipes; + + if (max_block_size_rows && !storage.canUseAdaptiveGranularity()) + { + size_t fixed_index_granularity = storage.getSettings()->index_granularity; + min_marks_for_concurrent_read = (min_marks_for_concurrent_read * fixed_index_granularity + max_block_size_rows - 1) + / max_block_size_rows * max_block_size_rows / fixed_index_granularity; + } + + MergeTreeReadPoolPtr pool; + pool = std::make_shared( + num_streams, + sum_marks, + min_marks_for_concurrent_read, + std::move(parts_with_ranges), + storage_snapshot, + prewhere_info_copy, + actions_settings, + reader_settings, + required_columns, + system_columns, + context, + false); + + for (size_t i = 0; i < num_streams; ++i) + { + auto algorithm = std::make_unique( + i, pool, min_marks_for_concurrent_read, max_block_size_rows, + settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes, + storage, storage_snapshot, use_uncompressed_cache, + prewhere_info, actions_settings, reader_settings, system_columns); + + auto source = std::make_shared(std::move(algorithm)); + + if (i == 0) + source->addTotalRowsApprox(total_rows); + + pipes.emplace_back(std::move(source)); + } + + pipe = Pipe::unitePipes(std::move(pipes)); + } + else + { + auto algorithm = std::make_unique( + storage, + storage_snapshot, + data_part, + alter_conversions, + max_block_size_rows, + preferred_block_size_bytes, + preferred_max_column_in_block_size_bytes, + requried_columns, + mark_ranges, + use_uncompressed_cache, + prewhere_info_copy, + actions_settings, + reader_settings, + nullptr, + system_columns); - auto source = std::make_shared(std::move(algorithm)); + auto source = std::make_shared(std::move(algorithm)); - Pipe pipe(std::move(source)); + pipe = Pipe(std::move(source)); + } QueryPipeline filter_pipeline(std::move(pipe)); PullingPipelineExecutor filter_executor(filter_pipeline); @@ -211,7 +454,7 @@ bool MergeTreeSelectWithHybridSearchProcessor::readPrimaryKeyBin(Columns & out_c storage_snapshot->metadata, MarkRanges{MarkRange(0, task->data_part->getMarksCount())}, nullptr, - storage.getContext()->getMarkCache().get(), + context->getMarkCache().get(), alter_conversions, reader_settings, {}, @@ -287,52 +530,7 @@ IMergeTreeSelectAlgorithm::BlockAndProgress MergeTreeSelectWithHybridSearchProce { OpenTelemetry::SpanHolder span("MergeTreeSelectWithHybridSearchProcessor::readFromPart()"); if (!task->range_reader.isInitialized()) - { - MergeTreeRangeReader* prev_reader = nullptr; - bool last_reader = false; - - /// Initialize primary key cache - const auto & primary_key = storage_snapshot->metadata->getPrimaryKey(); - const bool enable_primary_key_cache = task->data_part->storage.canUsePrimaryKeyCache(); - LOG_DEBUG(log, "Reader setting: enable_primary_key_cache = {}", enable_primary_key_cache); - - /// consider cache if and only if - /// 1. this task is vector search and no prewhere info - /// 2. primary key is only a column, and select columns are (pk, hybrid_search_func) - /// 3. primary key's value is represented by number - use_primary_key_cache = enable_primary_key_cache && PKCacheManager::isSupportedPrimaryKey(primary_key) - && isHybridSearchByPk(primary_key.column_names, ordered_names); - - /// Add _part_offset to non_const_virtual_column_names if part has lightweight delete - if (task->data_part->hasLightweightDelete()) - { - bool found = false; - for (const auto & column_name : non_const_virtual_column_names) - { - if (column_name == "_part_offset") - { - found = true; - break; - } - } - - if (!found) - { - non_const_virtual_column_names.emplace_back("_part_offset"); - need_remove_part_offset = true; - } - } - - /// Add filtering step with lightweight delete mask - if (reader_settings.apply_deleted_mask && task->data_part->hasLightweightDelete()) - { - task->pre_range_readers.push_back( - MergeTreeRangeReader(pre_reader_for_step[0].get(), prev_reader, &lightweight_delete_filter_step, last_reader, non_const_virtual_column_names)); - prev_reader = &task->pre_range_readers.back(); - } - - task->range_reader = MergeTreeRangeReader(reader.get(), prev_reader, nullptr, true, non_const_virtual_column_names); - } + initializeRangeReadersWithHybridSearch(*task); /// original read logic, considering prewhere optimization return readFromPartWithHybridSearch(); @@ -442,8 +640,9 @@ IMergeTreeSelectAlgorithm::BlockAndProgress MergeTreeSelectWithHybridSearchProce Block res_block; - /// Add prewhere column name to avoid column not found error - if (prewhere_info && !original_remove_prewhere_column) + /// Add prewhere column name to avoid prewhere_column not found error + /// Used for vector scan to handle cases when both prewhere and where exist + if (!can_skip_peform_prefilter && prewhere_info && !prewhere_info->remove_prewhere_column) { ColumnWithTypeAndName prewhere_col; @@ -584,13 +783,13 @@ IMergeTreeSelectAlgorithm::BlockAndProgress MergeTreeSelectWithHybridSearchProce } } - Columns result_columns; - result_columns.assign( + Columns tmp_result_columns; + tmp_result_columns.assign( std::make_move_iterator(result_pk_cols.begin()), std::make_move_iterator(result_pk_cols.end()) ); - LOG_DEBUG(log, "Fetch from primary key cache size = {}", result_columns[0]->size()); + LOG_DEBUG(log, "Fetch from primary key cache size = {}", tmp_result_columns[0]->size()); /// Get _part_offset if exists. if (mutable_part_offset_col) @@ -598,8 +797,8 @@ IMergeTreeSelectAlgorithm::BlockAndProgress MergeTreeSelectWithHybridSearchProce /// _part_offset column exists in original select columns if (!need_remove_part_offset) { - result_columns.emplace_back(std::move(mutable_part_offset_col)); - part_offset = typeid_cast(result_columns.back().get()); + tmp_result_columns.emplace_back(std::move(mutable_part_offset_col)); + part_offset = typeid_cast(tmp_result_columns.back().get()); } else part_offset = typeid_cast(mutable_part_offset_col.get()); @@ -610,12 +809,22 @@ IMergeTreeSelectAlgorithm::BlockAndProgress MergeTreeSelectWithHybridSearchProce size_t result_row_num = 0; base_search_manager->mergeResult( - result_columns, /// _Inout_ + tmp_result_columns, /// _Inout_ result_row_num, /// _Out_ read_ranges, nullptr, part_offset); + Columns result_columns; + + if(!need_remove_part_offset){ + result_columns = tmp_result_columns; + }else{ + result_columns.emplace_back(tmp_result_columns[0]); + result_columns.emplace_back(tmp_result_columns.back()); + } + + task->mark_ranges.clear(); if (result_row_num > 0) { @@ -637,7 +846,7 @@ try return false; if (!reader) - initializeReadersWithHybridSearch(); + initializeReaders(); MarkRanges mark_ranges_for_task; mark_ranges_for_task = std::move(all_mark_ranges); @@ -647,11 +856,8 @@ try : getSizePredictor(data_part, task_columns, sample_block); /// perform vector scan, then filter mark ranges of read task - if (!prewhere_info) - { + if (!prewhere_info || can_skip_peform_prefilter) base_search_manager->executeSearchBeforeRead(data_part); - filterMarkRangesByVectorScanResult(data_part, base_search_manager, mark_ranges_for_task); - } else { /// try to process prewhere here, get part_offset columns @@ -663,18 +869,15 @@ try ReadRange read_range{0, data_part->rows_count, 0, data_part->index_granularity.getMarksCount()}; read_ranges.emplace_back(read_range); base_search_manager->executeSearchWithFilter(data_part, read_ranges, filter); - filterMarkRangesByVectorScanResult(data_part, base_search_manager, mark_ranges_for_task); } + filterMarkRangesByVectorScanResult(data_part, base_search_manager, mark_ranges_for_task); + for (const auto & range : mark_ranges_for_task) - { LOG_DEBUG(log, "Keep range: {} - {}", range.begin, range.end); - } if (mark_ranges_for_task.empty()) - { return false; - } task = std::make_unique( data_part, @@ -683,10 +886,7 @@ try part_index_in_query, column_name_set, task_columns, - std::move(size_predictor), - 0, - std::future(), - std::vector>()); + std::move(size_predictor)); return true; } diff --git a/src/VectorIndex/Storages/MergeTreeSelectWithHybridSearchProcessor.h b/src/VectorIndex/Storages/MergeTreeSelectWithHybridSearchProcessor.h index acf004012ab..f24bb267dd4 100644 --- a/src/VectorIndex/Storages/MergeTreeSelectWithHybridSearchProcessor.h +++ b/src/VectorIndex/Storages/MergeTreeSelectWithHybridSearchProcessor.h @@ -34,9 +34,11 @@ class MergeTreeSelectWithHybridSearchProcessor final : public MergeTreeSelectAlg using ReadRanges = MergeTreeRangeReader::ReadResult::ReadRangesInfo; template - explicit MergeTreeSelectWithHybridSearchProcessor(MergeTreeBaseSearchManagerPtr base_search_manager_, Args &&... args) + explicit MergeTreeSelectWithHybridSearchProcessor(MergeTreeBaseSearchManagerPtr base_search_manager_, ContextPtr context_, size_t max_streamns, Args &&... args) : MergeTreeSelectAlgorithm{std::forward(args)...} , base_search_manager(base_search_manager_) + , context(context_) + , max_streamns_for_prewhere(max_streamns) { LOG_TRACE( log, @@ -45,16 +47,15 @@ class MergeTreeSelectWithHybridSearchProcessor final : public MergeTreeSelectAlg data_part->name, total_rows, data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin)); - - /// Save original remove_prewhere_column, which will be changed to true in performPrefilter() - if (prewhere_info) - original_remove_prewhere_column = prewhere_info->remove_prewhere_column; } String getName() const override { return "MergeTreeReadWithHybridSearch"; } protected: BlockAndProgress readFromPart() override; - void initializeReadersWithHybridSearch(); + void initializeReaders(); + + /// Sets up range readers corresponding to data readers + void initializeRangeReadersWithHybridSearch(MergeTreeReadTask & task); bool readPrimaryKeyBin(Columns & out_columns); @@ -65,6 +66,9 @@ class MergeTreeSelectWithHybridSearchProcessor final : public MergeTreeSelectAlg BlockAndProgress readFromPartWithHybridSearch(); BlockAndProgress readFromPartWithPrimaryKeyCache(bool & success); + /// Evaluate the prehwere condition with the partition key value in part. Similar as PartitionPruner + bool canSkipPrewhereForPart(const StorageMetadataPtr & metadata_snapshot); + Search::DenseBitmapPtr performPrefilter(MarkRanges & mark_ranges); Poco::Logger * log = &Poco::Logger::get("MergeTreeSelectWithHybridSearchProcessor"); @@ -72,6 +76,9 @@ class MergeTreeSelectWithHybridSearchProcessor final : public MergeTreeSelectAlg /// Shared_ptr for base class, the dynamic type may be derived class TextSearch/VectorScan/HybridSearch MergeTreeBaseSearchManagerPtr base_search_manager = nullptr; + ContextPtr context; + size_t max_streamns_for_prewhere; + /// True if _part_offset column is added for vector scan, but should not exist in select result. bool need_remove_part_offset = false; @@ -81,9 +88,10 @@ class MergeTreeSelectWithHybridSearchProcessor final : public MergeTreeSelectAlg /// True if the query can use primary key cache. bool use_primary_key_cache = false; - /// Used for vector scan to handle cases when both prewhere and where exist - /// remove_prewhere_column is set to true when vector scan try to get _part_offset for rows satisfying prewhere conds. - bool original_remove_prewhere_column = false; + /// Used for cases when prewhere can be skipped before vector search + /// True when performPreFilter() is skipped, prewhere_info can be performed after vector search, during reading other columns. + /// False when performPreFilter() is executed before vector search + bool can_skip_peform_prefilter = false; }; } diff --git a/src/VectorIndex/Storages/MergeTreeVSManager.cpp b/src/VectorIndex/Storages/MergeTreeVSManager.cpp index a5b17841fdd..a4e16448b1f 100644 --- a/src/VectorIndex/Storages/MergeTreeVSManager.cpp +++ b/src/VectorIndex/Storages/MergeTreeVSManager.cpp @@ -676,7 +676,7 @@ void MergeTreeVSManager::mergeBatchVectorScanResult( prev_row_num += read_range.row_num; } } - else + else // part_offset != nullptr { /// when no filter, the prev read result should be continuous, so we just need to scan all result rows and /// keep results of which the row id is contained in label_column @@ -839,8 +839,28 @@ VectorScanResultPtr MergeTreeVSManager::vectorScanWithoutIndex( size_t current_rows_in_mark = 0; size_t current_rows_in_range = 0; + /// used to test filter passed in is correct + LOG_TRACE( + log, + "VectorScanManager with filter, Part: {}, Filter Size: {}, Filter Byte Size: {}, Count in Filter is:{}", + part->name, + filter->get_size(), + filter->byte_size(), + filter->count() + ); + for (const auto & single_range : read_ranges) { + LOG_TRACE( + log, + "VectorScanManager Part: {}, Range: {}, Row Numbers in Range: {}, Start Mark: {}, End Mark: {}, start_row: {}, ", + part->name, + range_num, + read_ranges[range_num].row_num, + read_ranges[range_num].start_mark, + read_ranges[range_num].end_mark, + read_ranges[range_num].start_row); + /// for each single_range, will only fetch data of one mark each time current_mark = single_range.start_mark; current_rows_in_range = 0; @@ -895,6 +915,11 @@ VectorScanResultPtr MergeTreeVSManager::vectorScanWithoutIndex( /// filter out the data we want to do ANN on using the filter size_t start_pos = filter_parsed; + LOG_TRACE( + get_logger(), + "filter_parsed:{}", + filter_parsed); + /// this outer for loop's i is the position of data relative to the filter /// imagine filter as a array of array:[0,1,0,0,0,1,...][0,1,0...][...] /// where actually all these arrays are concatenated into a single array and @@ -915,6 +940,13 @@ VectorScanResultPtr MergeTreeVSManager::vectorScanWithoutIndex( for (size_t offset = vec_start_offset; offset < vec_end_offset; ++offset) vector_raw_data.emplace_back(src_vec[offset]); + LOG_TRACE( + get_logger(), + "current_rows_in_range:{}, i:{}, src_vec[vec_start_offset]:{}", + current_rows_in_range, + i, + src_vec[vec_start_offset]); + actual_id_in_range.emplace_back(current_rows_in_range); mark_left_rows ++; } @@ -1045,12 +1077,43 @@ VectorScanResultPtr MergeTreeVSManager::vectorScanWithoutIndex( row_exists, 0); + if(mark_left_rows) + { + LOG_TRACE( + log, + "Part: {}, Range: {}, Mark: {}, Rows In Mark: {}, rows left in mark: {}, raw_data size: {}, vector index name: {}, path: {}", + part->name, + range_num, + current_mark, + current_rows_in_mark, + mark_left_rows, + vector_raw_data.size(), + "brute force", + part->getDataPartStorage().getFullPath()); + } range_left_rows += mark_left_rows; } + if(range_left_rows) + { + LOG_TRACE( + log, + "Part: {}, Range:{}, Total Marks:{}, Rows In Range: {}, filter read:{}, rows left in range:{}", + part->name, + range_num, + single_range.end_mark - single_range.start_mark, + current_rows_in_range, + filter_parsed, + range_left_rows); + } range_num ++; part_left_rows += range_left_rows; } + + if(part_left_rows) + { + LOG_TRACE(log, "Part:{}, rows left in part:{}", part->name, part_left_rows); + } } else { @@ -1058,6 +1121,12 @@ VectorScanResultPtr MergeTreeVSManager::vectorScanWithoutIndex( /// has no filter, will pass the vector data and the dense bitmap for deleted rows to search function while (num_rows_read < total_rows_in_part) { + LOG_TRACE( + log, + "VectorScanManager Part: {}, Row numbers in mark: {}, ", + part->name, + total_rows_in_part); + size_t max_read_row = std::min((total_rows_in_part - num_rows_read), default_read_num); Columns result; result.resize(cols.size()); @@ -1141,18 +1210,30 @@ VectorScanResultPtr MergeTreeVSManager::vectorScanWithoutIndex( throw DB::Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "Vector column type for BinaryVector is not FixString(N) in column {}", search_column); } + LOG_TRACE( + log, + "Part: {}, " + "num_rows: {}, " + "raw_data size: {}", + part->name, + num_rows, + vector_raw_data.size()); + int deleted_row_num = 0; VIBitmapPtr row_exists = std::make_shared(total_rows, true); //make sure result contain lwd row_exists column if (result.size() == 2 && part->storage.hasLightweightDeletedMask()) { + LOG_TRACE( + log, + "Try to get row exists col, result size: {}", + result.size()); const auto& row_exists_col = result[1]; if (row_exists_col) { const ColumnUInt8 * col = checkAndGetColumn(row_exists_col.get()); const auto & col_data = col->getData(); - for (size_t i = 0; i < col_data.size(); i++) { if (!col_data[i]) diff --git a/src/VectorIndex/Storages/StorageSystemVIs.cpp b/src/VectorIndex/Storages/StorageSystemVIs.cpp index 56753a151c6..673ec71d8b6 100644 --- a/src/VectorIndex/Storages/StorageSystemVIs.cpp +++ b/src/VectorIndex/Storages/StorageSystemVIs.cpp @@ -192,12 +192,18 @@ class DataVectorIndicesSource : public ISource } // total data parts if (column_mask[src_index++]) + { res_columns[res_index++]->insert(data_parts.size()); + } // vector index built parts if (column_mask[src_index++]) + { res_columns[res_index++]->insert(getBuiltParts(data_parts, index)); + } if (column_mask[src_index++]) + { res_columns[res_index++]->insert(getSmallParts(data, data_parts, index)); + } // vector index status if (column_mask[src_index++]) { @@ -224,11 +230,16 @@ class DataVectorIndicesSource : public ISource res_columns[res_index++]->insert(host); } + /// latest failed part if (column_mask[src_index++]) + { res_columns[res_index++]->insert(fail_status.latest_failed_part); - + } + /// latest fail reason if (column_mask[src_index++]) + { res_columns[res_index++]->insert(fail_status.latest_fail_reason); + } } } } diff --git a/src/VectorIndex/Storages/StorageSystemVIsWithPart.h b/src/VectorIndex/Storages/StorageSystemVIsWithPart.h index cb34a55595d..dedb152a33b 100644 --- a/src/VectorIndex/Storages/StorageSystemVIsWithPart.h +++ b/src/VectorIndex/Storages/StorageSystemVIsWithPart.h @@ -14,7 +14,6 @@ */ #pragma once - #include namespace DB diff --git a/src/VectorIndex/Storages/VIBuilderUpdater.cpp b/src/VectorIndex/Storages/VIBuilderUpdater.cpp index 36d82fcd587..f56057346a9 100644 --- a/src/VectorIndex/Storages/VIBuilderUpdater.cpp +++ b/src/VectorIndex/Storages/VIBuilderUpdater.cpp @@ -130,6 +130,7 @@ void VIBuilderUpdater::removeDroppedVectorIndices(const StorageMetadataPtr & met LOG_DEBUG(log, "Find Vector Index in metadata"); VIParameter params = cache_item.second; + /// Support multiple vector indices /// TODO: Further check whether the paramters are the same. for (const auto & vec_index_desc : metadata_snapshot->getVectorIndices()) { @@ -262,6 +263,7 @@ VIEntryPtr VIBuilderUpdater::selectPartToBuildVI(const StorageMetadataPtr & meta continue; } + /// Support multiple vector indices for (const auto & vec_index : metadata_snapshot->getVectorIndices()) { auto column_index_opt = part->vector_index.getColumnIndex(vec_index.name); @@ -340,6 +342,7 @@ void VIBuilderUpdater::checkPartCanBuildVI( ErrorCodes::LOGICAL_ERROR, "Column index {} for part {} does not exits, it's a bug.", vec_desc.name, data_part->name); /// If we already have this vector index in this part, we do not need to do anything. + /// Support multiple vector indices if (data_part->vector_index.alreadyWithVIndexSegment(vec_desc.name)) throw Exception(ErrorCodes::VECTOR_INDEX_ALREADY_EXISTS, "No need to build vector index {} for part {}, because the part already has ready vector index.", vec_desc.name, data_part->name); } @@ -364,6 +367,7 @@ VIContextPtr VIBuilderUpdater::prepareBuildVIContext( bool has_vector_index_desc = false; for (auto & vec_index_desc : metadata_snapshot->getVectorIndices()) { + /// Support multiple vector indices if (vec_index_desc.name != vector_index_name) continue; @@ -712,7 +716,7 @@ VIBuiltStatus VIBuilderUpdater::TryMoveVIFiles(const VIContextPtr ctx) { LOG_INFO(log, "Will Move Index File to Part {}", future_part->name); /// lock part for move build vector index, avoid concurrently mutation - auto move_part_lock = future_part->vector_index.tryLockTimed(RWLockImpl::Type::Write, std::chrono::milliseconds(1000)); + auto move_index_lock = future_part->vector_index.tryLockTimed(RWLockImpl::Type::Write, std::chrono::milliseconds(1000)); /// check part is active if (future_part->getState() != MergeTreeDataPartState::Active) { @@ -848,13 +852,9 @@ void VIBuilderUpdater::moveVIFilesToFuturePartAndCache( auto column_index = dest_part->vector_index.getColumnIndex(vec_index_desc.name).value(); auto old_index_segment_metadata = column_index->getIndexSegmentMetadata(); - /// No need to Apply lightweight delete bitmap to index's bitmap, because load index func already update bitmap + /// FIXME: Release move index lock in advance to reduce blocking time if (!column_index->cache(build_index)) - { - auto segment_ids = getAllSegmentIds(dest_part, vector_index_segment_metadata); - for (auto & segment_id : segment_ids) - column_index->load(segment_id); - } + LOG_DEBUG(log, "Load vector index from index ptr error, index may be obtained by fetch index."); column_index->setIndexSegmentMetadata(vector_index_segment_metadata); diff --git a/src/VectorIndex/Storages/VIDescriptions.cpp b/src/VectorIndex/Storages/VIDescriptions.cpp index 96561aed67b..f9723b67a5e 100644 --- a/src/VectorIndex/Storages/VIDescriptions.cpp +++ b/src/VectorIndex/Storages/VIDescriptions.cpp @@ -87,7 +87,6 @@ VIDescription & VIDescription::operator=(const VIDescription & other) parameters = other.parameters; dim = other.dim; vector_search_type = other.vector_search_type; - return *this; } @@ -193,8 +192,8 @@ VIDescription VIDescription::getVectorIndexFromAST( size_t comma_index = 0; if ((comma_index = param_str.rfind(',')) != String::npos) param_str.erase(comma_index, 1); - } + LOG_TRACE(&Poco::Logger::get("get vector index from ast"), "after parameter check, param_str is {}", param_str); if (result.arguments.size() > 0) { try @@ -315,7 +314,7 @@ String VIDescription::parse_arg(String & input, const String verify_json, const ErrorCodes::BAD_ARGUMENTS, "{}: Value for parameter `{}` can't be string", Poco::toUpper(index_type), key); } } - + // Checking IVFPQ if (Search::getVectorIndexType(index_type, vector_search_type) == Search::IndexType::IVFPQ) { if (key == "M" && (std::stoi(value) == 0 || _dim == 0 || _dim % std::stoi(value) != 0)) diff --git a/src/VectorIndex/Storages/VIInfo.h b/src/VectorIndex/Storages/VIInfo.h index 2190e29e34f..608ee8b68fd 100644 --- a/src/VectorIndex/Storages/VIInfo.h +++ b/src/VectorIndex/Storages/VIInfo.h @@ -85,6 +85,7 @@ class VIInfo UInt64 dimension; /// Total number of vectors (including deleted ones) UInt64 total_vec; + // size of vector index in memory UInt64 memory_usage_bytes = 0; // Size of vector index on disk diff --git a/src/VectorIndex/Storages/VITaskBase.cpp b/src/VectorIndex/Storages/VITaskBase.cpp index 429cefc8214..eea921dd61a 100644 --- a/src/VectorIndex/Storages/VITaskBase.cpp +++ b/src/VectorIndex/Storages/VITaskBase.cpp @@ -33,17 +33,18 @@ void VITaskBase::recordBuildStatus() /// Check vector index exists in table's latest metadata auto & latest_vec_indices = storage.getInMemoryMetadataPtr()->getVectorIndices(); - VIDescription vec_desc; + std::optional vec_desc = std::nullopt; for (auto & vec_index_desc : metadata_snapshot->getVectorIndices()) if (vec_index_desc.name == vector_index_name) vec_desc = vec_index_desc; + if (!vec_desc.has_value() || !latest_vec_indices.has(vec_desc.value())) + return; + bool is_success = build_status.getStatus() == VIBuiltStatus::SUCCESS || build_status.getStatus() == VIBuiltStatus::BUILD_SKIPPED; - if (!latest_vec_indices.has(vec_desc)) - return; - + // storage.updateVectorIndexBuildStatus(part_name, vector_index_name, is_success, build_status.err_msg); bool record_build_status = true; if (ctx) { diff --git a/src/VectorIndex/Storages/VSDescription.h b/src/VectorIndex/Storages/VSDescription.h index 74db5016dce..5b34f176be4 100644 --- a/src/VectorIndex/Storages/VSDescription.h +++ b/src/VectorIndex/Storages/VSDescription.h @@ -51,6 +51,7 @@ struct VSDescription uint64_t search_column_dim{0}; int topk = -1; /// topK value extracted from limit N int direction = 1; /// 1 - ascending, -1 - descending. + }; using VSDescriptions = std::vector; diff --git a/src/VectorIndex/Utils/VIUtils.cpp b/src/VectorIndex/Utils/VIUtils.cpp index ffd25abd76c..04e70437ccf 100644 --- a/src/VectorIndex/Utils/VIUtils.cpp +++ b/src/VectorIndex/Utils/VIUtils.cpp @@ -447,10 +447,8 @@ void convertBitmap( const std::shared_ptr & inverted_row_sources_map) { if (segment_id.fromMergedParts()) - { if (!inverted_row_sources_map || !inverted_row_ids_map) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Inverted Row Id Data corruption"); - } if (segment_id.fromMergedParts()) { @@ -524,23 +522,17 @@ getMergedSourcePartsFromFileName(const String & index_name, const DB::MergeTreeD return old_part_names; } -String getUniqueVectorIndexCachePrefix( +String getVectorIndexCachePrefix( const String & table_relative_path, const String & part_name, - const String & index_name, - const String & path_uuid) + const String & index_name) { auto global_context = DB::Context::getGlobalContextInstance(); - String uuid_str; - if (path_uuid.empty()) - uuid_str = generateUUIDv4(); - else - uuid_str = path_uuid; if (global_context) { return fs::path(global_context->getVectorIndexCachePath()) / fs::path(table_relative_path).parent_path().parent_path() - / String(cutMutVer(part_name) + "-" + index_name + "-" + uuid_str) + / String(cutMutVer(part_name) + "-" + index_name) / ""; } else diff --git a/src/VectorIndex/Utils/VIUtils.h b/src/VectorIndex/Utils/VIUtils.h index f574ab4c1a3..d8765a701c3 100644 --- a/src/VectorIndex/Utils/VIUtils.h +++ b/src/VectorIndex/Utils/VIUtils.h @@ -112,11 +112,10 @@ VIBitmapPtr getRealBitmap( std::vector getMergedSourcePartsFromFileName(const String & index_name, const DB::MergeTreeDataPartChecksums vector_index_checksums); -String getUniqueVectorIndexCachePrefix( +String getVectorIndexCachePrefix( const String & table_relative_path, const String & part_name, - const String & index_name, - const String & path_uuid = ""); + const String & index_name); std::pair getPartNameUUIDFromNvmeCachePath(const String & path_with_uuid); @@ -124,7 +123,7 @@ String generateUUIDv4(); void printMemoryInfo(const Poco::Logger * log, std::string msg); -/// Update part's single delete bitmap after lightweight delete on cache if exists. +/// Update part's single delete bitmap after lightweight delete on disk and cache if exists. void updateBitMap(SegmentId & segment_id, const std::vector & deleted_row_ids); /// Update part's single delete bitmap after lightweight delete on disk and cache if exists. diff --git a/tests/integration/test_mqvs_cancel_building_vector_index/test.py b/tests/integration/test_mqvs_cancel_building_vector_index/test.py index 35bb7ea0a36..acb7044b422 100644 --- a/tests/integration/test_mqvs_cancel_building_vector_index/test.py +++ b/tests/integration/test_mqvs_cancel_building_vector_index/test.py @@ -72,40 +72,40 @@ def test_drop_table_release_index_cache(started_cluster): assert instance.contains_in_log("Num of cache items after forceExpire 0") -def test_drop_index_cancel_building_mstg_index(started_cluster): +def test_drop_index_cancel_building_scann_index(started_cluster): instance.query( """ - DROP TABLE IF EXISTS test_drop_mstg_index; - CREATE TABLE test_drop_mstg_index(id UInt32, text String, vector Array(Float32), CONSTRAINT vector_len CHECK length(vector) = 512) Engine MergeTree ORDER BY id; - INSERT INTO test_drop_mstg_index SELECT number, randomPrintableASCII(80), range(512) FROM numbers(500000); - optimize table test_drop_mstg_index final; - ALTER TABLE test_drop_mstg_index ADD VECTOR INDEX v1 vector TYPE MSTG; + DROP TABLE IF EXISTS test_drop_scann_index; + CREATE TABLE test_drop_scann_index(id UInt32, text String, vector Array(Float32), CONSTRAINT vector_len CHECK length(vector) = 512) Engine MergeTree ORDER BY id; + INSERT INTO test_drop_scann_index SELECT number, randomPrintableASCII(80), range(512) FROM numbers(500000); + optimize table test_drop_scann_index final; + ALTER TABLE test_drop_scann_index ADD VECTOR INDEX v1 vector TYPE SCANN; """ ) - instance.wait_for_log_line("MSTG adding data finished") + instance.wait_for_log_line("SCANN adding data finished") - instance.query("ALTER TABLE test_drop_mstg_index DROP VECTOR INDEX v1;") + instance.query("ALTER TABLE test_drop_scann_index DROP VECTOR INDEX v1;") instance.wait_for_log_line("Aborting the SearchIndex now") assert instance.contains_in_log("Aborting the SearchIndex now") - instance.query("DROP TABLE IF EXISTS test_drop_mstg_index") + instance.query("DROP TABLE IF EXISTS test_drop_scann_index") -def test_drop_table_cancel_building_mstg_index(started_cluster): +def test_drop_table_cancel_building_scann_index(started_cluster): instance.query( """ - DROP TABLE IF EXISTS test_drop_mstg_table; - CREATE TABLE test_drop_mstg_table(id UInt32, text String, vector Array(Float32), CONSTRAINT vector_len CHECK length(vector) = 768) Engine MergeTree ORDER BY id; - INSERT INTO test_drop_mstg_table SELECT number, randomPrintableASCII(80), range(768) FROM numbers(500000); - optimize table test_drop_mstg_table final; - ALTER TABLE test_drop_mstg_table ADD VECTOR INDEX v1 vector TYPE MSTG; + DROP TABLE IF EXISTS test_drop_scann_table; + CREATE TABLE test_drop_scann_table(id UInt32, text String, vector Array(Float32), CONSTRAINT vector_len CHECK length(vector) = 768) Engine MergeTree ORDER BY id; + INSERT INTO test_drop_scann_table SELECT number, randomPrintableASCII(80), range(768) FROM numbers(500000); + optimize table test_drop_scann_table final; + ALTER TABLE test_drop_scann_table ADD VECTOR INDEX v1 vector TYPE SCANN; """ ) - instance.wait_for_log_line("MSTG adding data finished") + instance.wait_for_log_line("SCANN adding data finished") - instance.query("DROP TABLE test_drop_mstg_table SYNC;") + instance.query("DROP TABLE test_drop_scann_table SYNC;") assert instance.contains_in_log("Aborting the SearchIndex now") diff --git a/tests/integration/test_mqvs_control_decouple_flag/test.py b/tests/integration/test_mqvs_control_decouple_flag/test.py index 67c6d79b583..49548602528 100644 --- a/tests/integration/test_mqvs_control_decouple_flag/test.py +++ b/tests/integration/test_mqvs_control_decouple_flag/test.py @@ -23,7 +23,7 @@ def test_disable_decouple_vector(started_cluster): DROP TABLE IF EXISTS test_disable_decouple_vector; CREATE TABLE test_disable_decouple_vector(id UInt32, text String, vector Array(Float32), CONSTRAINT vector_len CHECK length(vector) = 768) Engine MergeTree ORDER BY id settings enable_decouple_vector_index=0; INSERT INTO test_disable_decouple_vector SELECT number, randomPrintableASCII(80), range(768) FROM numbers(10000); - ALTER TABLE test_disable_decouple_vector ADD VECTOR INDEX v1 vector TYPE MSTG; + ALTER TABLE test_disable_decouple_vector ADD VECTOR INDEX v1 vector TYPE SCANN; """ ) @@ -79,7 +79,7 @@ def test_disable_decouple_vector(started_cluster): time.sleep(5) node_disable.query("SELECT id, distance(vector, [0.,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,251,252,253,254,255,256,257,258,259,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,277,278,279,280,281,282,283,284,285,286,287,288,289,290,291,292,293,294,295,296,297,298,299,300,301,302,303,304,305,306,307,308,309,310,311,312,313,314,315,316,317,318,319,320,321,322,323,324,325,326,327,328,329,330,331,332,333,334,335,336,337,338,339,340,341,342,343,344,345,346,347,348,349,350,351,352,353,354,355,356,357,358,359,360,361,362,363,364,365,366,367,368,369,370,371,372,373,374,375,376,377,378,379,380,381,382,383,384,385,386,387,388,389,390,391,392,393,394,395,396,397,398,399,400,401,402,403,404,405,406,407,408,409,410,411,412,413,414,415,416,417,418,419,420,421,422,423,424,425,426,427,428,429,430,431,432,433,434,435,436,437,438,439,440,441,442,443,444,445,446,447,448,449,450,451,452,453,454,455,456,457,458,459,460,461,462,463,464,465,466,467,468,469,470,471,472,473,474,475,476,477,478,479,480,481,482,483,484,485,486,487,488,489,490,491,492,493,494,495,496,497,498,499,500,501,502,503,504,505,506,507,508,509,510,511,512,513,514,515,516,517,518,519,520,521,522,523,524,525,526,527,528,529,530,531,532,533,534,535,536,537,538,539,540,541,542,543,544,545,546,547,548,549,550,551,552,553,554,555,556,557,558,559,560,561,562,563,564,565,566,567,568,569,570,571,572,573,574,575,576,577,578,579,580,581,582,583,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599,600,601,602,603,604,605,606,607,608,609,610,611,612,613,614,615,616,617,618,619,620,621,622,623,624,625,626,627,628,629,630,631,632,633,634,635,636,637,638,639,640,641,642,643,644,645,646,647,648,649,650,651,652,653,654,655,656,657,658,659,660,661,662,663,664,665,666,667,668,669,670,671,672,673,674,675,676,677,678,679,680,681,682,683,684,685,686,687,688,689,690,691,692,693,694,695,696,697,698,699,700,701,702,703,704,705,706,707,708,709,710,711,712,713,714,715,716,717,718,719,720,721,722,723,724,725,726,727,728,729,730,731,732,733,734,735,736,737,738,739,740,741,742,743,744,745,746,747,748,749,750,751,752,753,754,755,756,757,758,759,760,761,762,763,764,765,766,767]) AS d FROM test_disable_decouple_vector ORDER BY d ASC LIMIT 10") - assert node_disable.contains_in_log("SearchIndex: MSTG::searchImpl with queries of size 1") + assert node_disable.contains_in_log("SearchIndex: SCANN::searchImpl with queries of size 1") node_disable.query("DROP TABLE IF EXISTS test_disable_decouple_vector;") @@ -90,7 +90,7 @@ def test_enable_decouple_vector(started_cluster): DROP TABLE IF EXISTS test_enable_decouple_vector; CREATE TABLE test_enable_decouple_vector(id UInt32, text String, vector Array(Float32), CONSTRAINT vector_len CHECK length(vector) = 768) Engine MergeTree ORDER BY id settings enable_decouple_vector_index=1; INSERT INTO test_enable_decouple_vector SELECT number, randomPrintableASCII(80), range(768) FROM numbers(10000); - ALTER TABLE test_enable_decouple_vector ADD VECTOR INDEX v1 vector TYPE MSTG; + ALTER TABLE test_enable_decouple_vector ADD VECTOR INDEX v1 vector TYPE SCANN; """ ) @@ -150,6 +150,6 @@ def test_enable_decouple_vector(started_cluster): time.sleep(5) node_enable.query("SELECT id, distance(vector, [0.,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,251,252,253,254,255,256,257,258,259,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,277,278,279,280,281,282,283,284,285,286,287,288,289,290,291,292,293,294,295,296,297,298,299,300,301,302,303,304,305,306,307,308,309,310,311,312,313,314,315,316,317,318,319,320,321,322,323,324,325,326,327,328,329,330,331,332,333,334,335,336,337,338,339,340,341,342,343,344,345,346,347,348,349,350,351,352,353,354,355,356,357,358,359,360,361,362,363,364,365,366,367,368,369,370,371,372,373,374,375,376,377,378,379,380,381,382,383,384,385,386,387,388,389,390,391,392,393,394,395,396,397,398,399,400,401,402,403,404,405,406,407,408,409,410,411,412,413,414,415,416,417,418,419,420,421,422,423,424,425,426,427,428,429,430,431,432,433,434,435,436,437,438,439,440,441,442,443,444,445,446,447,448,449,450,451,452,453,454,455,456,457,458,459,460,461,462,463,464,465,466,467,468,469,470,471,472,473,474,475,476,477,478,479,480,481,482,483,484,485,486,487,488,489,490,491,492,493,494,495,496,497,498,499,500,501,502,503,504,505,506,507,508,509,510,511,512,513,514,515,516,517,518,519,520,521,522,523,524,525,526,527,528,529,530,531,532,533,534,535,536,537,538,539,540,541,542,543,544,545,546,547,548,549,550,551,552,553,554,555,556,557,558,559,560,561,562,563,564,565,566,567,568,569,570,571,572,573,574,575,576,577,578,579,580,581,582,583,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599,600,601,602,603,604,605,606,607,608,609,610,611,612,613,614,615,616,617,618,619,620,621,622,623,624,625,626,627,628,629,630,631,632,633,634,635,636,637,638,639,640,641,642,643,644,645,646,647,648,649,650,651,652,653,654,655,656,657,658,659,660,661,662,663,664,665,666,667,668,669,670,671,672,673,674,675,676,677,678,679,680,681,682,683,684,685,686,687,688,689,690,691,692,693,694,695,696,697,698,699,700,701,702,703,704,705,706,707,708,709,710,711,712,713,714,715,716,717,718,719,720,721,722,723,724,725,726,727,728,729,730,731,732,733,734,735,736,737,738,739,740,741,742,743,744,745,746,747,748,749,750,751,752,753,754,755,756,757,758,759,760,761,762,763,764,765,766,767]) AS d FROM test_enable_decouple_vector ORDER BY d ASC LIMIT 10") - assert node_enable.contains_in_log("SearchIndex: MSTG::searchImpl with queries of size 1") + assert node_enable.contains_in_log("SearchIndex: SCANN::searchImpl with queries of size 1") node_enable.query("DROP TABLE IF EXISTS test_enable_decouple_vector;") diff --git a/tests/integration/test_mqvs_vector_index_upgrade/test.py b/tests/integration/test_mqvs_vector_index_upgrade/test.py index ab4ac9b352f..f1a2ab89dfb 100644 --- a/tests/integration/test_mqvs_vector_index_upgrade/test.py +++ b/tests/integration/test_mqvs_vector_index_upgrade/test.py @@ -22,7 +22,7 @@ def test_decouple_index_upgrade(started_cluster): CREATE TABLE test_decouple_upgrade(id UInt32, text String, vector Array(Float32), CONSTRAINT vector_len CHECK length(vector) = 768) Engine MergeTree ORDER BY id settings disable_rebuild_for_decouple=1; INSERT INTO test_decouple_upgrade SELECT number, randomPrintableASCII(80), range(768) FROM numbers(5000); INSERT INTO test_decouple_upgrade SELECT number + 5000, randomPrintableASCII(80), range(768) FROM numbers(5000); - ALTER TABLE test_decouple_upgrade ADD VECTOR INDEX v1 vector TYPE MSTG; + ALTER TABLE test_decouple_upgrade ADD VECTOR INDEX v1 vector TYPE SCANN; """ ) @@ -77,7 +77,7 @@ def test_single_vector_index_upgrade(started_cluster): """ CREATE TABLE test_single_index_upgrade(id UInt32, text String, vector Array(Float32), CONSTRAINT vector_len CHECK length(vector) = 768) Engine MergeTree ORDER BY id; INSERT INTO test_single_index_upgrade SELECT number, randomPrintableASCII(80), range(768) FROM numbers(5000); - ALTER TABLE test_single_index_upgrade ADD VECTOR INDEX v1 vector TYPE MSTG; + ALTER TABLE test_single_index_upgrade ADD VECTOR INDEX v1 vector TYPE SCANN; """ ) @@ -120,7 +120,7 @@ def test_reuse_vector_index(started_cluster): """ CREATE TABLE test_reuse_index(id UInt32, text String, vector Array(Float32), CONSTRAINT vector_len CHECK length(vector) = 768) Engine MergeTree ORDER BY id; INSERT INTO test_reuse_index SELECT number, randomPrintableASCII(80), range(768) FROM numbers(5000); - ALTER TABLE test_reuse_index ADD VECTOR INDEX v1 vector TYPE MSTG; + ALTER TABLE test_reuse_index ADD VECTOR INDEX v1 vector TYPE SCANN; """ ) From 1d7768b98119aa763a6a49c5f2e57fc84c96b9e1 Mon Sep 17 00:00:00 2001 From: Shanfeng Pang Date: Wed, 12 Jun 2024 15:05:59 +0800 Subject: [PATCH 2/2] update search index --- contrib/search-index | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/search-index b/contrib/search-index index 385f379dca5..09f6bbf887f 160000 --- a/contrib/search-index +++ b/contrib/search-index @@ -1 +1 @@ -Subproject commit 385f379dca5e629bf525ac64d20bb53ba3487ea1 +Subproject commit 09f6bbf887f2752d2bcb93aa7d578204ebd88e2e