Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Abort vector index building as soon as possible #9443

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,6 @@
[submodule "contrib/simsimd"]
path = contrib/simsimd
url = https://github.com/ashvardanian/SimSIMD
[submodule "contrib/highfive"]
path = contrib/highfive
url = https://github.com/BlueBrain/HighFive
5 changes: 5 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,8 @@ add_subdirectory(fastpforlib)
add_subdirectory(usearch-cmake)

add_subdirectory(simsimd-cmake)

if (ENABLE_TESTS AND NOT CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
add_subdirectory(hdf5-cmake)
add_subdirectory(highfive-cmake)
endif ()
1 change: 1 addition & 0 deletions contrib/hdf5-cmake/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/download/*
41 changes: 41 additions & 0 deletions contrib/hdf5-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
include(ExternalProject)

# hdf5 is too large. Instead of adding as a submodule, let's simply download from GitHub.
ExternalProject_Add(hdf5-external
PREFIX ${CMAKE_CURRENT_BINARY_DIR}
DOWNLOAD_DIR ${TiFlash_SOURCE_DIR}/contrib/hdf5-cmake/download
URL https://github.com/HDFGroup/hdf5/archive/refs/tags/hdf5_1.14.4.3.zip
URL_HASH MD5=bc987d22e787290127aacd7b99b4f31e
CMAKE_ARGS
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX=<INSTALL_DIR>
-DBUILD_STATIC_LIBS=ON
-DBUILD_SHARED_LIBS=OFF
-DBUILD_TESTING=OFF
-DHDF5_BUILD_HL_LIB=OFF
-DHDF5_BUILD_TOOLS=OFF
-DHDF5_BUILD_CPP_LIB=ON
-DHDF5_BUILD_EXAMPLES=OFF
-DHDF5_ENABLE_Z_LIB_SUPPORT=OFF
-DHDF5_ENABLE_SZIP_SUPPORT=OFF
BUILD_BYPRODUCTS <INSTALL_DIR>/lib/${CMAKE_FIND_LIBRARY_PREFIXES}hdf5.a # Workaround for Ninja
USES_TERMINAL_DOWNLOAD TRUE
USES_TERMINAL_CONFIGURE TRUE
USES_TERMINAL_BUILD TRUE
USES_TERMINAL_INSTALL TRUE
EXCLUDE_FROM_ALL TRUE
DOWNLOAD_EXTRACT_TIMESTAMP TRUE
)

ExternalProject_Get_Property(hdf5-external INSTALL_DIR)

add_library(tiflash_contrib::hdf5 STATIC IMPORTED GLOBAL)
set_target_properties(tiflash_contrib::hdf5 PROPERTIES
IMPORTED_LOCATION ${INSTALL_DIR}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}hdf5.a
)
add_dependencies(tiflash_contrib::hdf5 hdf5-external)

file(MAKE_DIRECTORY ${INSTALL_DIR}/include)
target_include_directories(tiflash_contrib::hdf5 SYSTEM INTERFACE
${INSTALL_DIR}/include
)
1 change: 1 addition & 0 deletions contrib/highfive
Submodule highfive added at 0d0259
18 changes: 18 additions & 0 deletions contrib/highfive-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
set(HIGHFIVE_PROJECT_DIR "${TiFlash_SOURCE_DIR}/contrib/highfive")
set(HIGHFIVE_SOURCE_DIR "${HIGHFIVE_PROJECT_DIR}/include")

if (NOT EXISTS "${HIGHFIVE_SOURCE_DIR}/highfive/highfive.hpp")
message (FATAL_ERROR "submodule contrib/highfive not found")
endif()

add_library(_highfive INTERFACE)

target_include_directories(_highfive SYSTEM INTERFACE
${HIGHFIVE_SOURCE_DIR}
)

target_link_libraries(_highfive INTERFACE
tiflash_contrib::hdf5
)

add_library(tiflash_contrib::highfive ALIAS _highfive)
2 changes: 1 addition & 1 deletion contrib/usearch
Submodule usearch updated 64 files
+39 −7 .github/workflows/prerelease.yml
+37 −19 .github/workflows/release.yml
+0 −19 .github/workflows/update_version.sh
+6 −4 .gitignore
+13 −0 .vscode/launch.json
+11 −1 .vscode/settings.json
+2 −2 .vscode/tasks.json
+14 −14 BENCHMARKS.md
+1 −1 CITATION.cff
+8 −8 CMakeLists.txt
+59 −27 CONTRIBUTING.md
+15 −15 Cargo.lock
+1 −1 Cargo.toml
+1 −0 MANIFEST.in
+98 −28 README.md
+1 −1 VERSION
+11 −12 binding.gyp
+5 −2 build.gradle
+12 −1 build.rs
+3 −0 c/README.md
+50 −13 c/lib.cpp
+60 −55 c/test.c
+23 −4 c/usearch.h
+1 −1 conanfile.py
+34 −13 cpp/README.md
+99 −47 cpp/bench.cpp
+649 −176 cpp/test.cpp
+1 −1 csharp/nuget/nuget-package.props
+10 −10 csharp/src/Cloud.Unum.USearch.Tests/USearchIndexTests.cs
+6 −44 csharp/src/Cloud.Unum.USearch/NativeMethods.cs
+295 −60 csharp/src/Cloud.Unum.USearch/USearchIndex.cs
+117 −8 csharp/src/Cloud.Unum.USearch/USearchTypes.cs
+1 −1 docs/index.rst
+0 −5 docs/java/index.rst
+0 −5 docs/javascript/index.rst
+35 −14 golang/README.md
+32 −24 golang/lib.go
+857 −264 include/usearch/index.hpp
+412 −156 include/usearch/index_dense.hpp
+84 −39 include/usearch/index_plugins.hpp
+56 −5 java/README.md
+126 −36 java/cloud/unum/usearch/Index.java
+3 −0 java/cloud/unum/usearch/NativeUtils.java
+38 −2 java/cloud/unum/usearch/cloud_unum_usearch_Index.cpp
+16 −0 java/cloud/unum/usearch/cloud_unum_usearch_Index.h
+57 −3 java/test/IndexTest.java
+3 −2 javascript/tsconfig-esm.json
+8 −7 javascript/usearch.ts
+6 −2 objc/USearchObjective.mm
+0 −10 package-ci.json
+2 −2 package.json
+12 −7 python/lib.cpp
+8 −0 python/scripts/test_index.py
+119 −0 python/scripts/test_jit.py
+0 −0 python/usearch/py.typed
+42 −29 rust/lib.cpp
+1 −1 rust/lib.hpp
+53 −5 rust/lib.rs
+10 −0 setup.py
+1 −1 simsimd
+30 −0 swift/Test.swift
+1 −1 wasmer.toml
+44 −4 wolfram/CMakeLists.txt
+30 −19 wolfram/lib.cpp
17 changes: 15 additions & 2 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ if (ENABLE_TESTS)
add_check(gtests_dbms)

add_target_pch("pch-dbms.h" gtests_dbms)

grep_bench_sources(${TiFlash_SOURCE_DIR}/dbms dbms_bench_sources)
add_executable(bench_dbms EXCLUDE_FROM_ALL
${dbms_bench_sources}
Expand All @@ -388,7 +387,21 @@ if (ENABLE_TESTS)
)
target_include_directories(bench_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR} ${benchmark_SOURCE_DIR}/include)
target_compile_definitions(bench_dbms PUBLIC DBMS_PUBLIC_GTEST)
target_link_libraries(bench_dbms gtest dbms test_util_bench_main benchmark tiflash_functions server_for_test delta_merge kvstore tiflash_aggregate_functions)
target_link_libraries(bench_dbms
gtest
benchmark

dbms
test_util_bench_main
tiflash_functions
server_for_test
delta_merge
tiflash_aggregate_functions
kvstore)

if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
target_link_libraries(bench_dbms tiflash_contrib::highfive)
endif()

add_check(bench_dbms)
endif ()
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,19 @@ class BitmapFilterView
RUNTIME_CHECK(filter_offset + filter_size <= filter->size(), filter_offset, filter_size, filter->size());
}

/**
* @brief Create a BitmapFilter and construct a BitmapFilterView with it.
* Should be only used in tests.
*/
static BitmapFilterView createWithFilter(UInt32 size, bool default_value)
{
return BitmapFilterView(std::make_shared<BitmapFilter>(size, default_value), 0, size);
}

// Caller should ensure n in [0, size).
inline bool get(UInt32 n) const { return filter->get(filter_offset + n); }

inline bool operator[](UInt32 n) const { return get(n); }

inline UInt32 size() const { return filter_size; }

inline UInt32 offset() const { return filter_offset; }
Expand Down
51 changes: 40 additions & 11 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ extern const Metric DT_SnapshotOfSegmentIngest;
extern const Metric DT_SnapshotOfSegmentIngestIndex;
} // namespace CurrentMetrics

namespace DB::ErrorCodes
{
extern const int ABORTED;
}

namespace DB::DM
{

Expand Down Expand Up @@ -673,18 +678,20 @@ void DeltaMergeStore::segmentEnsureStableIndex(
RUNTIME_CHECK(dm_files.size() == 1); // size > 1 is currently not supported.
const auto & dm_file = dm_files[0];

// 2. Check whether the DMFile has been referenced by any valid segment.
{
auto is_file_valid = [this, dm_file] {
std::shared_lock lock(read_write_mutex);
auto segment_ids = dmfile_id_to_segment_ids.get(dm_file->fileId());
if (segment_ids.empty())
{
LOG_DEBUG(
log,
"EnsureStableIndex - Give up because no segment to update, source_segment={}",
source_segment_info);
return;
}
return !segment_ids.empty();
};

// 2. Check whether the DMFile has been referenced by any valid segment.
if (!is_file_valid())
{
LOG_DEBUG(
log,
"EnsureStableIndex - Give up because no segment to update, source_segment={}",
source_segment_info);
return;
}

LOG_INFO(
Expand All @@ -700,7 +707,29 @@ void DeltaMergeStore::segmentEnsureStableIndex(
.dm_files = dm_files,
.dm_context = dm_context,
});
auto new_dmfiles = iw.build();

DMFiles new_dmfiles{};

try
{
// When file is not valid we need to abort the index build.
new_dmfiles = iw.build(is_file_valid);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(
log,
"EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={} "
"source_segment={}",
DMFile::info(dm_files),
source_segment_info);
return;
}
throw;
}

RUNTIME_CHECK(!new_dmfiles.empty());

LOG_INFO(
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFile.h>
Expand All @@ -22,6 +23,11 @@
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/PathPool.h>

namespace DB::ErrorCodes
{
extern const int ABORTED;
}

namespace DB::DM
{

Expand Down Expand Up @@ -63,7 +69,7 @@ DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo
return build;
}

size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) const
size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, ProceedCheckFn should_proceed) const
{
const auto column_defines = dm_file_mutable->getColumnDefines();
const auto del_cd_iter = std::find_if(column_defines.cbegin(), column_defines.cend(), [](const ColumnDefine & cd) {
Expand Down Expand Up @@ -128,6 +134,9 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) c
// Read all blocks and build index
while (true)
{
if (!should_proceed())
throw Exception(ErrorCodes::ABORTED, "Index build is interrupted");

auto block = read_stream->read();
if (!block)
break;
Expand All @@ -146,7 +155,7 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) c
const auto & col_with_type_and_name = block.safeGetByPosition(col_idx + 1);
RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns[col_idx + 1].id);
const auto & col = col_with_type_and_name.column;
index_builder->addBlock(*col, del_mark);
index_builder->addBlock(*col, del_mark, should_proceed);
}
}

Expand Down Expand Up @@ -187,7 +196,7 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) c
return total_built_index_bytes;
}

DMFiles DMFileIndexWriter::build() const
DMFiles DMFileIndexWriter::build(ProceedCheckFn should_proceed) const
{
RUNTIME_CHECK(!built);
// Create a clone of existing DMFile instances by using DMFile::restore,
Expand All @@ -214,7 +223,7 @@ DMFiles DMFileIndexWriter::build() const

for (const auto & cloned_dmfile : cloned_dm_files)
{
auto index_bytes = buildIndexForFile(cloned_dmfile);
auto index_bytes = buildIndexForFile(cloned_dmfile, should_proceed);
if (auto data_store = options.dm_context.global_context.getSharedContextDisagg()->remote_data_store;
!data_store)
{
Expand Down
13 changes: 10 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,23 @@ class DMFileIndexWriter
const DMContext & dm_context;
};

using ProceedCheckFn = std::function<bool()>;

explicit DMFileIndexWriter(const Options & options)
: logger(Logger::get())
, options(options)
{}

// Note: This method can only be called once.
DMFiles build() const;
// Note: You cannot call build() multiple times, as duplicate meta version will result in exceptions.
DMFiles build(ProceedCheckFn should_proceed) const;

DMFiles build() const
{
return build([]() { return true; });
}

private:
size_t buildIndexForFile(const DMFilePtr & dm_file_mutable) const;
size_t buildIndexForFile(const DMFilePtr & dm_file_mutable, ProceedCheckFn should_proceed) const;

private:
const LoggerPtr logger;
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/Index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class VectorIndexBuilder
/// The key is the row's offset in the DMFile.
using Key = UInt32;

using ProceedCheckFn = std::function<bool()>;

public:
static VectorIndexBuilderPtr create(const TiDB::VectorIndexDefinitionPtr & definition);

Expand All @@ -47,7 +49,11 @@ class VectorIndexBuilder

virtual ~VectorIndexBuilder() = default;

virtual void addBlock(const IColumn & column, const ColumnVector<UInt8> * del_mark) = 0;
virtual void addBlock( //
const IColumn & column,
const ColumnVector<UInt8> * del_mark,
ProceedCheckFn should_proceed)
= 0;

virtual void save(std::string_view path) const = 0;

Expand Down Expand Up @@ -80,6 +86,8 @@ class VectorIndexViewer
// Invalid rows in `valid_rows` will be discared when applying the search
virtual std::vector<Key> search(const ANNQueryInfoPtr & queryInfo, const RowFilter & valid_rows) const = 0;

virtual size_t size() const = 0;

// Get the value (i.e. vector content) of a Key.
virtual void get(Key key, std::vector<Float32> & out) const = 0;

Expand Down
Loading