Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: Support drop vector index defined on ColumnInfo when column is dropped #9475

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ namespace DB
M(force_remote_read_for_batch_cop_once) \
M(exception_new_dynamic_thread) \
M(force_wait_index_timeout) \
M(force_not_support_vector_index) \
M(sync_schema_request_failure)

#define APPLY_FOR_FAILPOINTS(M) \
Expand Down
33 changes: 20 additions & 13 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2020,25 +2020,32 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info)
original_table_columns.swap(new_original_table_columns);
store_columns.swap(new_store_columns);

// Get a snapshot on the local_index_infos to check whether any new index is created
LocalIndexInfosSnapshot local_index_infos_snap = getLocalIndexInfosSnapshot();

std::atomic_store(&original_table_header, std::make_shared<Block>(toEmptyBlock(original_table_columns)));

// release the lock because `checkAllSegmentsLocalIndex` will try to acquire the lock
// release the lock because `applyLocalIndexChange ` will try to acquire the lock
// and generate tasks on segments
lock.unlock();

auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_snap, table_info, log);
if (new_local_index_infos)
applyLocalIndexChange(table_info);
}

void DeltaMergeStore::applyLocalIndexChange(const TiDB::TableInfo & new_table_info)
{
// Get a snapshot on the local_index_infos to check whether any new index is created
auto new_local_index_infos = generateLocalIndexInfos(getLocalIndexInfosSnapshot(), new_table_info, log);

// no index is created or dropped
if (!new_local_index_infos)
return;

{
{
// new index created, update the info in-memory thread safety between `getLocalIndexInfosSnapshot`
std::unique_lock index_write_lock(mtx_local_index_infos);
local_index_infos.swap(new_local_index_infos);
}
checkAllSegmentsLocalIndex();
} // else no new index is created
// new index created, update the info in-memory thread safety between `getLocalIndexInfosSnapshot`
std::unique_lock index_write_lock(mtx_local_index_infos);
local_index_infos.swap(new_local_index_infos);
}

// generate async tasks for building local index for all segments
checkAllSegmentsLocalIndex();
}

SortDescription DeltaMergeStore::getPrimarySortDescription() const
Expand Down
27 changes: 15 additions & 12 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,18 @@ class DeltaMergeStore
bool keep_order,
const PushDownFilterPtr & filter);

// Get a snap of local_index_infos for checking.
// Note that this is just a shallow copy of `local_index_infos`, do not
// modify the local indexes inside the snapshot.
LocalIndexInfosSnapshot getLocalIndexInfosSnapshot() const
{
std::shared_lock index_read_lock(mtx_local_index_infos);
if (!local_index_infos || local_index_infos->empty())
return nullptr;
// only make a shallow copy on the shared_ptr is OK
return local_index_infos;
}

public:
/// Methods mainly used by region split.

Expand Down Expand Up @@ -863,18 +875,6 @@ class DeltaMergeStore
const SegmentPtr & segment,
const DMFiles & new_dm_files);

// Get a snap of local_index_infos for checking.
// Note that this is just a shallow copy of `local_index_infos`, do not
// modify the local indexes inside the snapshot.
LocalIndexInfosSnapshot getLocalIndexInfosSnapshot() const
{
std::shared_lock index_read_lock(mtx_local_index_infos);
if (!local_index_infos || local_index_infos->empty())
return nullptr;
// only make a shallow copy on the shared_ptr is OK
return local_index_infos;
}

/**
* Check whether there are new local indexes should be built for all segments.
*/
Expand All @@ -894,6 +894,9 @@ class DeltaMergeStore
#else
public:
#endif

void applyLocalIndexChange(const TiDB::TableInfo & new_table_info);

/**
* Wait until the segment has stable index.
* If the index is ready or no need to build, it will return immediately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
return fallback();

auto ann_query_info = filter_with_ann->ann_query_info;
if (!ann_query_info)
if (!ann_query_info || ann_query_info->top_k() == std::numeric_limits<UInt32>::max())
return fallback();

if (!bitmap_filter.has_value())
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo
assert(index_infos != nullptr);
static constexpr double VECTOR_INDEX_SIZE_FACTOR = 1.2;

// TODO(vector-index): Now we only generate the build info when new index is added.
// The built indexes will be dropped (lazily) after the segment instance is updated.
// We can support dropping the vector index more quickly later.
LocalIndexBuildInfo build;
build.indexes_to_build = std::make_shared<LocalIndexInfos>();
build.file_ids.reserve(dm_files.size());
Expand Down
Loading