Skip to content

Commit

Permalink
kmeans tree vector index search (#12639)
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt authored Dec 23, 2024
1 parent ac47d76 commit 3c2e65a
Show file tree
Hide file tree
Showing 21 changed files with 774 additions and 158 deletions.
20 changes: 16 additions & 4 deletions ydb/core/base/table_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,19 @@ bool Contains(const auto& names, std::string_view str) {
return std::find(std::begin(names), std::end(names), str) != std::end(names);
}

constexpr std::string_view ImplTables[] = {ImplTable, NTableVectorKmeansTreeIndex::LevelTable, NTableVectorKmeansTreeIndex::PostingTable};
constexpr std::string_view ImplTables[] = {
ImplTable, NTableVectorKmeansTreeIndex::LevelTable, NTableVectorKmeansTreeIndex::PostingTable,
};

constexpr std::string_view GlobalSecondaryImplTables[] = {
ImplTable,
};
static_assert(std::is_sorted(std::begin(GlobalSecondaryImplTables), std::end(GlobalSecondaryImplTables)));

constexpr std::string_view GlobalKMeansTreeImplTables[] = {
NTableVectorKmeansTreeIndex::LevelTable, NTableVectorKmeansTreeIndex::PostingTable,
};
static_assert(std::is_sorted(std::begin(GlobalKMeansTreeImplTables), std::end(GlobalKMeansTreeImplTables)));

}

Expand Down Expand Up @@ -142,11 +154,11 @@ bool IsCompatibleIndex(NKikimrSchemeOp::EIndexType indexType, const TTableColumn
return true;
}

TVector<TString> GetImplTables(NKikimrSchemeOp::EIndexType indexType) {
std::span<const std::string_view> GetImplTables(NKikimrSchemeOp::EIndexType indexType) {
if (indexType == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree) {
return { NTableVectorKmeansTreeIndex::LevelTable, NTableVectorKmeansTreeIndex::PostingTable };
return GlobalKMeansTreeImplTables;
} else {
return { ImplTable };
return GlobalSecondaryImplTables;
}
}

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/base/table_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include <util/generic/string.h>
#include <util/string/builder.h>

#include <span>
#include <string_view>

namespace NKikimr::NTableIndex {

struct TTableColumns {
Expand All @@ -24,7 +27,7 @@ inline constexpr const char* ImplTable = "indexImplTable";
bool IsCompatibleIndex(NKikimrSchemeOp::EIndexType type, const TTableColumns& table, const TIndexColumns& index, TString& explain);
TTableColumns CalcTableImplDescription(NKikimrSchemeOp::EIndexType type, const TTableColumns& table, const TIndexColumns& index);

TVector<TString> GetImplTables(NKikimrSchemeOp::EIndexType indexType);
std::span<const std::string_view> GetImplTables(NKikimrSchemeOp::EIndexType indexType);
bool IsImplTable(std::string_view tableName);
bool IsBuildImplTable(std::string_view tableName);

Expand Down
65 changes: 37 additions & 28 deletions ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "actors/kqp_ic_gateway_actors.h"

#include <ydb/core/base/path.h>
#include <ydb/core/base/table_index.h>
#include <ydb/core/external_sources/external_source_factory.h>
#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
Expand Down Expand Up @@ -175,7 +176,7 @@ TTableMetadataResult GetTableMetadataResult(const NSchemeCache::TSchemeCacheNavi
THashMap<TString, NYql::TKikimrPathId> sequences;

for (const auto& sequenceDesc : entry.Sequences) {
sequences[sequenceDesc.GetName()] =
sequences[sequenceDesc.GetName()] =
NYql::TKikimrPathId(sequenceDesc.GetPathId().GetOwnerId(), sequenceDesc.GetPathId().GetLocalId());
}

Expand All @@ -187,7 +188,7 @@ TTableMetadataResult GetTableMetadataResult(const NSchemeCache::TSchemeCacheNavi
const TString typeName = GetTypeName(NScheme::TTypeInfoMod{columnDesc.PType, columnDesc.PTypeMod});
auto defaultKind = NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_UNSPECIFIED;
NYql::TKikimrPathId defaultFromSequencePathId = {};

if (columnDesc.IsDefaultFromSequence()) {
defaultKind = NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE;
auto sequenceIt = sequences.find(columnDesc.DefaultFromSequence);
Expand All @@ -196,7 +197,7 @@ TTableMetadataResult GetTableMetadataResult(const NSchemeCache::TSchemeCacheNavi
} else if (columnDesc.IsDefaultFromLiteral()) {
defaultKind = NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_LITERAL;
}

tableMeta->Columns.emplace(
columnDesc.Name,
NYql::TKikimrColumnMetadata(
Expand Down Expand Up @@ -400,11 +401,15 @@ TString GetDebugString(const std::pair<NKikimr::TIndexId, TString>& id) {
return TStringBuilder() << " Path: " << id.second << " TableId: " << id.first;
}

void UpdateMetadataIfSuccess(NYql::TKikimrTableMetadataPtr ptr, size_t idx, const TTableMetadataResult& value) {
if (value.Success()) {
ptr->SecondaryGlobalIndexMetadata[idx] = value.Metadata;
void UpdateMetadataIfSuccess(NYql::TKikimrTableMetadataPtr& implTable, TTableMetadataResult& value) {
YQL_ENSURE(value.Success());
if (!implTable) {
implTable = std::move(value.Metadata);
return;
}

YQL_ENSURE(!implTable->Next);
YQL_ENSURE(implTable->Name < value.Metadata->Name);
implTable->Next = std::move(value.Metadata);
}

void SetError(TTableMetadataResult& externalDataSourceMetadata, const TString& error) {
Expand Down Expand Up @@ -618,28 +623,21 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadIndexMeta
const auto& tableName = tableMetadata->Name;
const size_t indexesCount = tableMetadata->Indexes.size();

TVector<NThreading::TFuture<TGenericResult>> children;
TVector<NThreading::TFuture<TTableMetadataResult>> children;
children.reserve(indexesCount);

tableMetadata->SecondaryGlobalIndexMetadata.resize(indexesCount);
const ui64 tableOwnerId = tableMetadata->PathId.OwnerId();

for (size_t i = 0; i < indexesCount; i++) {
const auto& index = tableMetadata->Indexes[i];
const auto indexTablePaths = NSchemeHelpers::CreateIndexTablePath(tableName, index.Type, index.Name);
for (const auto& indexTablePath : indexTablePaths) {
const auto implTablePaths = NSchemeHelpers::CreateIndexTablePath(tableName, index.Type, index.Name);
for (const auto& implTablePath : implTablePaths) {
if (!index.SchemaVersion) {
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load index metadata without schema version check index: " << index.Name);
children.push_back(
LoadTableMetadata(cluster, indexTablePath,
LoadTableMetadata(cluster, implTablePath,
TLoadTableMetadataSettings().WithPrivateTables(true), database, userToken)
.Apply([i, tableMetadata](const TFuture<TTableMetadataResult>& result) {
auto value = result.GetValue();
UpdateMetadataIfSuccess(tableMetadata, i, value);
return static_cast<TGenericResult>(value);
})
);

} else {
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load index metadata with schema version check"
<< "index: " << index.Name
Expand All @@ -650,12 +648,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadIndexMeta
auto ownerId = index.PathOwnerId ? index.PathOwnerId : tableOwnerId; //for compat with 20-2
children.push_back(
LoadIndexMetadataByPathId(cluster,
NKikimr::TIndexId(ownerId, index.LocalPathId, index.SchemaVersion), indexTablePath, database, userToken)
.Apply([i, tableMetadata](const TFuture<TTableMetadataResult>& result) {
auto value = result.GetValue();
UpdateMetadataIfSuccess(tableMetadata, i, value);
return static_cast<TGenericResult>(value);
})
NKikimr::TIndexId(ownerId, index.LocalPathId, index.SchemaVersion), implTablePath, database, userToken)
);

}
Expand All @@ -666,14 +659,26 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadIndexMeta
auto loadIndexMetadataChecker =
[ptr, result{std::move(loadTableMetadataResult)}, children](const NThreading::TFuture<void>) mutable {
bool loadOk = true;
for (const auto& child : children) {
result.AddIssues(child.GetValue().Issues());
if (!child.GetValue().Success()) {
loadOk = false;

const auto indexesCount = result.Metadata->Indexes.size();
result.Metadata->ImplTables.resize(indexesCount);
auto it = children.begin();
for (size_t i = 0; i < indexesCount; i++) {
for (const auto& _ : NTableIndex::GetImplTables(NYql::TIndexDescription::ConvertIndexType(
result.Metadata->Indexes[i].Type))) {
YQL_ENSURE(it != children.end());
auto value = it++->ExtractValue();
result.AddIssues(value.Issues());
if (loadOk && (loadOk = value.Success())) {
UpdateMetadataIfSuccess(result.Metadata->ImplTables[i], value);
}
}
}
YQL_ENSURE(it == children.end());

auto locked = ptr.lock();
if (!loadOk || !locked) {
result.Metadata->ImplTables.clear();
result.SetStatus(TIssuesIds::KIKIMR_INDEX_METADATA_LOAD_FAILED);
} else {
locked->OnLoadedTableMetadata(result);
Expand Down Expand Up @@ -909,13 +914,17 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
case EKind::KindIndex: {
Y_ENSURE(entry.ListNodeEntry, "expected children list");
for (const auto& child : entry.ListNodeEntry->Children) {
if (!table.EndsWith(child.Name)) {
continue;
}
TIndexId pathId = TIndexId(child.PathId, child.SchemaVersion);

LoadTableMetadataCache(cluster, std::make_pair(pathId, table), settings, database, userToken)
.Apply([promise](const TFuture<TTableMetadataResult>& result) mutable
{
promise.SetValue(result.GetValue());
});
break;
}
break;
}
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ TStatus AnnotateReadTable(const TExprNode::TPtr& node, TExprContext& ctx, const
TKikimrTableMetadataPtr meta;

if (readIndex) {
meta = table.second->Metadata->GetIndexMetadata(TString(node->Child(TKqlReadTableIndex::idx_Index)->Content())).first;
meta = table.second->Metadata->GetIndexMetadata(node->Child(TKqlReadTableIndex::idx_Index)->Content()).first;
if (!meta) {
return TStatus::Error;
}
Expand Down Expand Up @@ -455,7 +455,7 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
if (isStreamLookup && !EnsureArgsCount(*node, TKqlStreamLookupIndex::Match(node.Get()) ? 5 : 4, ctx)) {
return TStatus::Error;
}

if (!isStreamLookup && !EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) {
return TStatus::Error;
}
Expand Down Expand Up @@ -560,7 +560,7 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
if (!EnsureAtom(*index, ctx)) {
return TStatus::Error;
}
auto indexMeta = table.second->Metadata->GetIndexMetadata(TString(index->Content())).first;
auto indexMeta = table.second->Metadata->GetIndexMetadata(index->Content()).first;

if (!CalcKeyColumnsCount(ctx, node->Pos(), *structType, *table.second, *indexMeta, keyColumnsCount)) {
return TStatus::Error;
Expand Down Expand Up @@ -713,7 +713,7 @@ TStatus AnnotateUpsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const
}

if (TKqlUpsertRowsIndex::Match(node.Get())) {
Y_ENSURE(!table.second->Metadata->SecondaryGlobalIndexMetadata.empty());
Y_ENSURE(!table.second->Metadata->ImplTables.empty());
}

auto effectType = MakeKqpEffectType(ctx);
Expand Down Expand Up @@ -1683,7 +1683,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext

} else if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows
|| settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {

if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
Expand Down Expand Up @@ -1820,7 +1820,7 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
} else {
node->SetTypeAnn(ctx.MakeType<TListExprType>(outputRowType));
}

return TStatus::Ok;
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ std::pair<TExprBase, TCoAtomList> CreateRowsToReplace(const TExprBase& input,

bool HasIndexesToWrite(const TKikimrTableDescription& tableData) {
bool hasIndexesToWrite = false;
YQL_ENSURE(tableData.Metadata->Indexes.size() == tableData.Metadata->SecondaryGlobalIndexMetadata.size());
YQL_ENSURE(tableData.Metadata->Indexes.size() == tableData.Metadata->ImplTables.size());
for (const auto& index : tableData.Metadata->Indexes) {
if (index.ItUsedForWrite()) {
hasIndexesToWrite = true;
Expand Down Expand Up @@ -893,7 +893,7 @@ TIntrusivePtr<TKikimrTableMetadata> GetIndexMetadata(const TKqlReadTableIndex& r
const TKikimrTablesData& tables, TStringBuf cluster)
{
const auto& tableDesc = GetTableData(tables, cluster, read.Table().Path());
const auto& [indexMeta, _ ] = tableDesc.Metadata->GetIndexMetadata(read.Index().StringValue());
const auto& [indexMeta, _ ] = tableDesc.Metadata->GetIndexMetadata(read.Index().Value());
return indexMeta;
}

Expand Down
Loading

0 comments on commit 3c2e65a

Please sign in to comment.