Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Support vector index and ANN hint #9261

Merged
merged 12 commits into from
Aug 12, 2024
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,6 @@
[submodule "contrib/not_null"]
path = contrib/not_null
url = https://github.com/bitwizeshift/not_null.git
[submodule "contrib/usearch"]
path = contrib/usearch
url = https://github.com/unum-cloud/usearch.git
2 changes: 2 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,5 @@ add_subdirectory(aws-cmake)
add_subdirectory(simdjson)

add_subdirectory(fastpforlib)

add_subdirectory(usearch-cmake)
1 change: 1 addition & 0 deletions contrib/usearch
Submodule usearch added at 5ea48c
15 changes: 15 additions & 0 deletions contrib/usearch-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
set(USEARCH_PROJECT_DIR "${TiFlash_SOURCE_DIR}/contrib/usearch")
set(USEARCH_SOURCE_DIR "${USEARCH_PROJECT_DIR}/include")

add_library(_usearch INTERFACE)

if (NOT EXISTS "${USEARCH_SOURCE_DIR}/usearch/index.hpp")
message (FATAL_ERROR "submodule contrib/usearch not found")
endif ()

target_include_directories(_usearch SYSTEM INTERFACE
${USEARCH_PROJECT_DIR}/simsimd/include
${USEARCH_PROJECT_DIR}/fp16/include
${USEARCH_SOURCE_DIR})

add_library(tiflash_contrib::usearch ALIAS _usearch)
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ target_link_libraries (dbms
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
absl::synchronization
tiflash_contrib::usearch
tiflash_contrib::aws_s3

etcdpb
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,16 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>

std::pair<UInt32, StringRef> getElementRef(size_t element_idx) const;

private:
ColumnPtr data;
ColumnPtr offsets;

size_t ALWAYS_INLINE offsetAt(size_t i) const { return i == 0 ? 0 : getOffsets()[i - 1]; }
size_t ALWAYS_INLINE sizeAt(size_t i) const
{
return i == 0 ? getOffsets()[0] : (getOffsets()[i] - getOffsets()[i - 1]);
}

private:
ColumnPtr data;
ColumnPtr offsets;

size_t ALWAYS_INLINE offsetAt(size_t i) const { return i == 0 ? 0 : getOffsets()[i - 1]; }

/// Multiply values if the nested column is ColumnVector<T>.
template <typename T>
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
tipb::ANNQueryInfo(),
empty_pushed_down_filters, // Not care now
scan_column_infos,
runtime_filter_ids,
Expand Down Expand Up @@ -231,6 +232,7 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
tipb::ANNQueryInfo(),
empty_pushed_down_filters, // Not care now
scan_column_infos,
runtime_filter_ids,
Expand Down Expand Up @@ -262,6 +264,7 @@ void MockStorage::buildExecFromDeltaMerge(
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
tipb::ANNQueryInfo(),
empty_pushed_down_filters, // Not care now
scan_column_infos,
runtime_filter_ids,
Expand Down Expand Up @@ -295,6 +298,7 @@ void MockStorage::buildExecFromDeltaMerge(
auto scan_column_infos = mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
empty_filters,
tipb::ANNQueryInfo(),
empty_pushed_down_filters, // Not care now
scan_column_infos,
runtime_filter_ids,
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Interpreters/TimezoneInfo.h>
#include <Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.h>
#include <google/protobuf/repeated_ptr_field.h>
#include <tipb/executor.pb.h>
#include <tipb/expression.pb.h>


Expand All @@ -28,13 +29,15 @@ struct DAGQueryInfo
{
DAGQueryInfo(
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters_,
const tipb::ANNQueryInfo & ann_query_info_,
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters_,
const ColumnInfos & source_columns_,
const std::vector<int> & runtime_filter_ids_,
const int rf_max_wait_time_ms_,
const TimezoneInfo & timezone_info_)
: source_columns(source_columns_)
, filters(filters_)
, ann_query_info(ann_query_info_)
, pushed_down_filters(pushed_down_filters_)
, runtime_filter_ids(runtime_filter_ids_)
, rf_max_wait_time_ms(rf_max_wait_time_ms_)
Expand All @@ -44,6 +47,8 @@ struct DAGQueryInfo
const ColumnInfos & source_columns;
// filters in dag request
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters;
// filters for approximate nearest neighbor (ann) vector search
const tipb::ANNQueryInfo & ann_query_info;
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
// filters have been push down to storage engine in dag request
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
query_info.query = dagContext().dummy_ast;
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions.conditions,
table_scan.getANNQueryInfo(),
table_scan.getPushedDownFilters(),
table_scan.getColumns(),
table_scan.getRuntimeFilterIDs(),
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ TiDBTableScan::TiDBTableScan(
is_partition_table_scan ? std::move(TiDB::toTiDBColumnInfos(table_scan->partition_table_scan().columns()))
: std::move(TiDB::toTiDBColumnInfos(table_scan->tbl_scan().columns())))
, pushed_down_filters(
is_partition_table_scan ? std::move(table_scan->partition_table_scan().pushed_down_filter_conditions())
: std::move(table_scan->tbl_scan().pushed_down_filter_conditions()))
is_partition_table_scan ? table_scan->partition_table_scan().pushed_down_filter_conditions()
: table_scan->tbl_scan().pushed_down_filter_conditions())
, ann_query_info(
is_partition_table_scan ? table_scan->partition_table_scan().ann_query() : table_scan->tbl_scan().ann_query())
// Only No-partition table need keep order when tablescan executor required keep order.
// If keep_order is not set, keep order for safety.
, keep_order(
Expand Down Expand Up @@ -105,6 +107,8 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table
tipb_table_scan->add_primary_prefix_column_ids(id);
tipb_table_scan->set_is_fast_scan(partition_table_scan.is_fast_scan());
tipb_table_scan->set_keep_order(false);
if (partition_table_scan.has_ann_query())
tipb_table_scan->mutable_ann_query()->CopyFrom(partition_table_scan.ann_query());
}
else
{
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class TiDBTableScan

const google::protobuf::RepeatedPtrField<tipb::Expr> & getPushedDownFilters() const { return pushed_down_filters; }

const tipb::ANNQueryInfo & getANNQueryInfo() const { return ann_query_info; }

private:
const tipb::Executor * table_scan;
String executor_id;
Expand All @@ -65,6 +67,8 @@ class TiDBTableScan
/// They will be executed on Storage layer.
const google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters;

const tipb::ANNQueryInfo ann_query_info;

bool keep_order;
bool is_fast_scan;
std::vector<Int32> runtime_filter_ids;
Expand Down
24 changes: 24 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
Expand Down Expand Up @@ -148,6 +149,7 @@ struct ContextShared
mutable DBGInvoker dbg_invoker; /// Execute inner functions, debug only.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable DM::MinMaxIndexCachePtr minmax_index_cache; /// Cache of minmax index in compressed files.
mutable DM::VectorIndexCachePtr vector_index_cache;
mutable DM::DeltaIndexManagerPtr delta_index_manager; /// Manage the Delta Indies of Segments.
ProcessList process_list; /// Executing queries at the moment.
ViewDependencies view_dependencies; /// Current dependencies
Expand Down Expand Up @@ -1386,6 +1388,28 @@ void Context::dropMinMaxIndexCache() const
shared->minmax_index_cache->reset();
}

void Context::setVectorIndexCache(size_t cache_size_in_bytes)
{
auto lock = getLock();

RUNTIME_CHECK(!shared->vector_index_cache);

shared->vector_index_cache = std::make_shared<DM::VectorIndexCache>(cache_size_in_bytes);
}

DM::VectorIndexCachePtr Context::getVectorIndexCache() const
{
auto lock = getLock();
return shared->vector_index_cache;
}

void Context::dropVectorIndexCache() const
{
auto lock = getLock();
if (shared->vector_index_cache)
shared->vector_index_cache->reset();
}

bool Context::isDeltaIndexLimited() const
{
// Don't need to use a lock here, as delta_index_manager should be set at starting up.
Expand Down
13 changes: 9 additions & 4 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ enum class PageStorageRunMode : UInt8;
namespace DM
{
class MinMaxIndexCache;
class VectorIndexCache;
class DeltaIndexManager;
class GlobalStoragePool;
class SharedBlockSchemas;
Expand Down Expand Up @@ -237,8 +238,8 @@ class Context
/// Compute and set actual user settings, client_info.current_user should be set
void calculateUserSettings();

ClientInfo & getClientInfo() { return client_info; };
const ClientInfo & getClientInfo() const { return client_info; };
ClientInfo & getClientInfo() { return client_info; }
const ClientInfo & getClientInfo() const { return client_info; }

void setQuota(
const String & name,
Expand Down Expand Up @@ -391,6 +392,10 @@ class Context
std::shared_ptr<DM::MinMaxIndexCache> getMinMaxIndexCache() const;
void dropMinMaxIndexCache() const;

void setVectorIndexCache(size_t cache_size_in_bytes);
std::shared_ptr<DM::VectorIndexCache> getVectorIndexCache() const;
void dropVectorIndexCache() const;

bool isDeltaIndexLimited() const;
void setDeltaIndexManager(size_t cache_size_in_bytes);
std::shared_ptr<DM::DeltaIndexManager> getDeltaIndexManager() const;
Expand Down Expand Up @@ -505,8 +510,8 @@ class Context

SharedQueriesPtr getSharedQueries();

const TimezoneInfo & getTimezoneInfo() const { return timezone_info; };
TimezoneInfo & getTimezoneInfo() { return timezone_info; };
const TimezoneInfo & getTimezoneInfo() const { return timezone_info; }
TimezoneInfo & getTimezoneInfo() { return timezone_info; }

/// User name and session identifier. Named sessions are local to users.
using SessionKey = std::pair<String, String>;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (minmax_index_cache_size)
global_context->setMinMaxIndexCache(minmax_index_cache_size);

// 1GiB vector index cache.
size_t vec_index_cache_size = config().getUInt64("vec_index_cache_size", 1ULL * 1024 * 1024 * 1024);
if (vec_index_cache_size)
global_context->setVectorIndexCache(vec_index_cache_size);

/// Size of max memory usage of DeltaIndex, used by DeltaMerge engine.
/// - In non-disaggregated mode, its default value is 0, means unlimited, and it
/// controls the number of total bytes keep in the memory.
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ class BitmapFilter
void set(UInt32 start, UInt32 limit, bool value = true);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;
// Caller should ensure n in [0, size).
inline bool get(UInt32 n) const { return filter[n]; }
// filter[start, satrt+limit) & f -> f
void rangeAnd(IColumn::Filter & f, UInt32 start, UInt32 limit) const;

void runOptimize();

String toDebugString() const;
size_t count() const;
inline size_t size() const { return filter.size(); }

friend class BitmapFilterView;

private:
void set(std::span<const UInt32> row_ids, const FilterPtr & f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,34 @@ BitmapFilterBlockInputStream::BitmapFilterBlockInputStream(

Block BitmapFilterBlockInputStream::read(FilterPtr & res_filter, bool return_filter)
{
auto [block, from_delta] = readBlock(stable, delta);
if (return_filter)
return readImpl(res_filter);

// The caller want a filtered resut, so let's filter by ourselves.

FilterPtr block_filter;
auto block = readImpl(block_filter);
if (!block)
return {};

// all rows in block are not filtered out, simply do nothing.
if (!block_filter) // NOLINT
return block;

// some rows should be filtered according to `block_filter`:
size_t passed_count = countBytesInFilter(*block_filter);
for (auto & col : block)
{
col.column = col.column->filter(*block_filter, passed_count);
}
return block;
}

Block BitmapFilterBlockInputStream::readImpl(FilterPtr & res_filter)
{
FilterPtr block_filter = nullptr;
auto [block, from_delta] = readBlockWithReturnFilter(stable, delta, block_filter);

if (block)
{
if (from_delta)
Expand All @@ -47,25 +74,36 @@ Block BitmapFilterBlockInputStream::read(FilterPtr & res_filter, bool return_fil

filter.resize(block.rows());
bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows());
if (!all_match)

if (!block_filter)
{
if (all_match)
res_filter = nullptr;
else
res_filter = &filter;
}
else
{
if (return_filter)
RUNTIME_CHECK(filter.size() == block_filter->size(), filter.size(), block_filter->size());
if (!all_match)
{
// We have a `block_filter`, and have a bitmap filter in `filter`.
// filter ← filter & block_filter.
std::transform( //
filter.begin(),
filter.end(),
block_filter->begin(),
filter.begin(),
[](UInt8 a, UInt8 b) { return a && b; });
res_filter = &filter;
}
else
{
size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
col.column = col.column->filter(filter, passed_count);
}
// We only have a `block_filter`.
// res_filter ← block_filter.
res_filter = block_filter;
}
}
else
{
res_filter = nullptr;
}
}
return block;
}
Expand Down
Loading