Skip to content

Commit

Permalink
*: support vector index and adding/dropping vector index when doing s…
Browse files Browse the repository at this point in the history
…yncTableSchema (pingcap#9451)

ref pingcap#9032

*: support vector index and adding/dropping vector index when doing syncTableSchema

Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: JaySon <[email protected]>
  • Loading branch information
Lloyd-Pottiger and JaySon-Huang authored Sep 25, 2024
1 parent 5a41ce2 commit 1a781c7
Show file tree
Hide file tree
Showing 22 changed files with 958 additions and 76 deletions.
83 changes: 83 additions & 0 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <Poco/StringTokenizer.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Types.h>
#include <TiDB/Decode/TypeMapping.h>
#include <TiDB/Schema/TiDB.h>

Expand All @@ -46,6 +47,7 @@ extern const int UNKNOWN_TABLE;
} // namespace ErrorCodes

using ColumnInfo = TiDB::ColumnInfo;
using IndexInfo = TiDB::IndexInfo;
using TableInfo = TiDB::TableInfo;
using PartitionInfo = TiDB::PartitionInfo;
using PartitionDefinition = TiDB::PartitionDefinition;
Expand Down Expand Up @@ -546,6 +548,87 @@ void MockTiDB::dropPartition(const String & database_name, const String & table_
version_diff[version] = diff;
}

IndexInfo reverseGetIndexInfo(
IndexID id,
const NameAndTypePair & column,
Int32 offset,
TiDB::VectorIndexDefinitionPtr vector_index)
{
IndexInfo index_info;
index_info.id = id;
index_info.state = TiDB::StatePublic;

std::vector<TiDB::IndexColumnInfo> idx_cols;
Poco::JSON::Object::Ptr idx_col_json = new Poco::JSON::Object();
Poco::JSON::Object::Ptr name_json = new Poco::JSON::Object();
name_json->set("O", column.name);
name_json->set("L", column.name);
idx_col_json->set("name", name_json);
idx_col_json->set("length", -1);
idx_col_json->set("offset", offset);
TiDB::IndexColumnInfo idx_col(idx_col_json);
index_info.idx_cols.push_back(idx_col);
index_info.vector_index = vector_index;

return index_info;
}

void MockTiDB::addVectorIndexToTable(
const String & database_name,
const String & table_name,
const IndexID index_id,
const NameAndTypePair & column_name,
Int32 offset,
TiDB::VectorIndexDefinitionPtr vector_index)
{
std::lock_guard lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
String qualified_name = database_name + "." + table_name;
auto & indexes = table->table_info.index_infos;
if (std::find_if(indexes.begin(), indexes.end(), [&](const IndexInfo & index_) { return index_.id == index_id; })
!= indexes.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Index {} already exists in TiDB table {}",
index_id,
qualified_name);
IndexInfo index_info = reverseGetIndexInfo(index_id, column_name, offset, vector_index);
indexes.push_back(index_info);

version++;

SchemaDiff diff;
diff.type = SchemaActionType::ActionAddVectorIndex;
diff.schema_id = table->database_id;
diff.table_id = table->id();
diff.version = version;
version_diff[version] = diff;
}

void MockTiDB::dropVectorIndexFromTable(const String & database_name, const String & table_name, IndexID index_id)
{
std::lock_guard lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
String qualified_name = database_name + "." + table_name;

auto & indexes = table->table_info.index_infos;
auto it
= std::find_if(indexes.begin(), indexes.end(), [&](const IndexInfo & index_) { return index_.id == index_id; });
RUNTIME_CHECK_MSG(it != indexes.end(), "Index {} does not exist in TiDB table {}", index_id, qualified_name);
indexes.erase(it);

version++;

SchemaDiff diff;
diff.type = SchemaActionType::DropIndex;
diff.schema_id = table->database_id;
diff.table_id = table->id();
diff.version = version;
version_diff[version] = diff;
}

void MockTiDB::addColumnToTable(
const String & database_name,
const String & table_name,
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ class MockTiDB : public ext::Singleton<MockTiDB>

void dropDB(Context & context, const String & database_name, bool drop_regions);

void addVectorIndexToTable(
const String & database_name,
const String & table_name,
IndexID index_id,
const NameAndTypePair & column_name,
Int32 offset,
TiDB::VectorIndexDefinitionPtr vector_index);

void dropVectorIndexFromTable(const String & database_name, const String & table_name, IndexID index_id);

void addColumnToTable(
const String & database_name,
const String & table_name,
Expand Down
30 changes: 27 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/Index/IndexInfo.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/UnorderedInputStream.h>
Expand All @@ -62,6 +63,8 @@
#include <ext/scope_guard.h>
#include <magic_enum.hpp>
#include <memory>
#include <mutex>
#include <shared_mutex>


namespace ProfileEvents
Expand Down Expand Up @@ -219,7 +222,7 @@ DeltaMergeStore::DeltaMergeStore(
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
LocalIndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
: global_context(db_context.getGlobalContext())
Expand Down Expand Up @@ -339,7 +342,7 @@ DeltaMergeStorePtr DeltaMergeStore::create(
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
LocalIndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
{
Expand Down Expand Up @@ -2017,8 +2020,29 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info)
original_table_columns.swap(new_original_table_columns);
store_columns.swap(new_store_columns);

// TODO(local index): There could be some local indexes added/dropped after DDL
// copy the local_index_infos to check whether any new index is created
LocalIndexInfosPtr local_index_infos_copy = nullptr;
{
std::shared_lock index_read_lock(mtx_local_index_infos);
local_index_infos_copy = std::shared_ptr<LocalIndexInfos>(local_index_infos);
}

std::atomic_store(&original_table_header, std::make_shared<Block>(toEmptyBlock(original_table_columns)));

// release the lock because `checkAllSegmentsLocalIndex` will try to acquire the lock
// and generate tasks on segments
lock.unlock();

auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_copy, table_info, log);
if (new_local_index_infos)
{
{
// new index created, update the info in-memory
std::unique_lock index_write_lock(mtx_local_index_infos);
local_index_infos.swap(new_local_index_infos);
}
checkAllSegmentsLocalIndex();
} // else no new index is created
}

SortDescription DeltaMergeStore::getPrimarySortDescription() const
Expand Down
19 changes: 15 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ struct LocalIndexStats
{
String column_name{};
UInt64 column_id{};
UInt64 index_id{};
String index_kind{};

UInt64 rows_stable_indexed{}; // Total rows
Expand Down Expand Up @@ -296,7 +297,7 @@ class DeltaMergeStore
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
LocalIndexInfosPtr local_index_infos_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

Expand All @@ -314,7 +315,7 @@ class DeltaMergeStore
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
LocalIndexInfosPtr local_index_infos_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

Expand Down Expand Up @@ -720,7 +721,7 @@ class DeltaMergeStore

void segmentEnsureStableIndex(
DMContext & dm_context,
const IndexInfosPtr & index_info,
const LocalIndexInfosPtr & index_info,
const DMFiles & dm_files,
const String & source_segment_info);

Expand Down Expand Up @@ -863,6 +864,15 @@ class DeltaMergeStore
const SegmentPtr & segment,
const DMFiles & new_dm_files);

// Get a snap of local_index_infos to check whether any new index is created.
LocalIndexInfosPtr getLocalIndexInfosSnapshot() const
{
std::shared_lock index_read_lock(mtx_local_index_infos);
if (!local_index_infos || local_index_infos->empty())
return nullptr;
return std::make_shared<LocalIndexInfos>(*local_index_infos);
}

/**
* Check whether there are new local indexes should be built for all segments.
*/
Expand Down Expand Up @@ -950,7 +960,8 @@ class DeltaMergeStore
// Some indexes are built in TiFlash locally. For example, Vector Index.
// Compares to the lightweight RoughSet Indexes, these indexes require lot
// of resources to build, so they will be built in separated background pool.
IndexInfosPtr local_index_infos;
LocalIndexInfosPtr local_index_infos;
mutable std::shared_mutex mtx_local_index_infos;

struct DMFileIDToSegmentIDs
{
Expand Down
14 changes: 8 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ SegmentPtr DeltaMergeStore::segmentMerge(

void DeltaMergeStore::checkAllSegmentsLocalIndex()
{
if (!local_index_infos || local_index_infos->empty())
if (!getLocalIndexInfosSnapshot())
return;

LOG_INFO(log, "CheckAllSegmentsLocalIndex - Begin");
Expand Down Expand Up @@ -529,12 +529,13 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment)
RUNTIME_CHECK(segment != nullptr);

// TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL
if (!local_index_infos || local_index_infos->empty())
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
if (!local_index_infos_snap)
return false;

// No lock is needed, stable meta is immutable.
auto dm_files = segment->getStable()->getDMFiles();
auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos, dm_files);
auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files);
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty())
return false;

Expand Down Expand Up @@ -569,13 +570,14 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co
RUNTIME_CHECK(segment != nullptr);

// TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL
if (!local_index_infos || local_index_infos->empty())
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
if (!local_index_infos_snap)
return true;

// No lock is needed, stable meta is immutable.
auto segment_id = segment->segmentId();
auto dm_files = segment->getStable()->getDMFiles();
auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos, dm_files);
auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files);
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty())
return true;

Expand Down Expand Up @@ -653,7 +655,7 @@ SegmentPtr DeltaMergeStore::segmentUpdateMeta(

void DeltaMergeStore::segmentEnsureStableIndex(
DMContext & dm_context,
const IndexInfosPtr & index_info,
const LocalIndexInfosPtr & index_info,
const DMFiles & dm_files,
const String & source_segment_info)
{
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,18 @@ SegmentsStats DeltaMergeStore::getSegmentsStats()

LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
{
std::shared_lock lock(read_write_mutex);

if (!local_index_infos || local_index_infos->empty())
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
if (!local_index_infos_snap)
return {};

std::shared_lock lock(read_write_mutex);

LocalIndexesStats stats;
for (const auto & index_info : *local_index_infos)
for (const auto & index_info : *local_index_infos_snap)
{
LocalIndexStats index_stats;
index_stats.column_id = index_info.column_id;
index_stats.index_id = index_info.index_id;
index_stats.column_name = index_info.column_name;
index_stats.index_kind = "HNSW"; // TODO: Support more.

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileIndexWriter.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter.h>
#include <Storages/DeltaMerge/Index/IndexInfo.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/PathPool.h>
Expand All @@ -32,14 +33,14 @@ namespace DB::DM
{

DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo(
const IndexInfosPtr & index_infos,
const LocalIndexInfosPtr & index_infos,
const DMFiles & dm_files)
{
assert(index_infos != nullptr);
static constexpr double VECTOR_INDEX_SIZE_FACTOR = 1.2;

LocalIndexBuildInfo build;
build.indexes_to_build = std::make_shared<IndexInfos>();
build.indexes_to_build = std::make_shared<LocalIndexInfos>();
build.file_ids.reserve(dm_files.size());
for (const auto & dmfile : dm_files)
{
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ class DMFileIndexWriter
{
std::vector<LocalIndexerScheduler::FileID> file_ids;
size_t estimated_memory_bytes = 0;
IndexInfosPtr indexes_to_build;
LocalIndexInfosPtr indexes_to_build;
};

static LocalIndexBuildInfo getLocalIndexBuildInfo(const IndexInfosPtr & index_infos, const DMFiles & dm_files);
static LocalIndexBuildInfo getLocalIndexBuildInfo(const LocalIndexInfosPtr & index_infos, const DMFiles & dm_files);

struct Options
{
const StoragePathPoolPtr path_pool;
const IndexInfosPtr index_infos;
const LocalIndexInfosPtr index_infos;
const DMFiles dm_files;
const DMContext & dm_context;
};
Expand Down
Loading

0 comments on commit 1a781c7

Please sign in to comment.