Skip to content

Commit

Permalink
Make vector index creation more reliable and observable
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt committed Jan 10, 2025
1 parent 3949059 commit c1031d9
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 15 deletions.
50 changes: 50 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4543,6 +4543,56 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
}
}

// read kmeans tree state
{
auto rowset = db.Table<Schema::KMeansTreeState>().Range().Select();
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
TIndexBuildId id = rowset.GetValue<Schema::KMeansTreeState::Id>();
const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id);
Y_VERIFY_S(buildInfoPtr, "BuildIndex not found: id# " << id);
auto& buildInfo = *buildInfoPtr->Get();
buildInfo.KMeans.Set(
rowset.GetValue<Schema::KMeansTreeState::Level>(),
rowset.GetValue<Schema::KMeansTreeState::Parent>(),
rowset.GetValue<Schema::KMeansTreeState::State>()
);
buildInfo.Sample.Rows.reserve(buildInfo.KMeans.K);

if (!rowset.Next()) {
return false;
}
}
}


// read kmeans tree sample
{
auto rowset = db.Table<Schema::KMeansTreeSample>().Range().Select();
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
TIndexBuildId id = rowset.GetValue<Schema::KMeansTreeSample::Id>();
const auto* buildInfoPtr = Self->IndexBuilds.FindPtr(id);
Y_VERIFY_S(buildInfoPtr, "BuildIndex not found: id# " << id);
auto& buildInfo = *buildInfoPtr->Get();
buildInfo.Sample.Set(
rowset.GetValue<Schema::KMeansTreeSample::Row>(),
rowset.GetValue<Schema::KMeansTreeSample::Probability>(),
rowset.GetValue<Schema::KMeansTreeSample::Data>()
);

if (!rowset.Next()) {
return false;
}
}
}
}

// Read snapshot tables
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ void TSchemeShard::PersistBuildIndexForget(NIceDb::TNiceDb& db, const TIndexBuil
for(ui32 idx = 0; idx < info.BuildColumns.size(); ++idx) {
db.Table<Schema::BuildColumnOperationSettings>().Key(info.Id, idx).Delete();
}

if (info.IsBuildVectorIndex()) {
db.Table<Schema::KMeansTreeState>().Key(info.Id).Delete();
for (ui32 row = 0; row < info.KMeans.K * 2; ++row) {
db.Table<Schema::KMeansTreeSample>().Key(info.Id, row).Delete();
}
}
}

void TSchemeShard::Resume(const TDeque<TIndexBuildId>& indexIds, const TActorContext& ctx) {
Expand Down
41 changes: 34 additions & 7 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
}

void SendUploadSampleKRequest(TIndexBuildInfo& buildInfo) {
buildInfo.Sample.MakeStrictTop(buildInfo.KMeans.K);
auto path = TPath::Init(buildInfo.TablePathId, Self)
.Dive(buildInfo.IndexName)
.Dive(NTableIndex::NTableVectorKmeansTreeIndex::LevelTable);
Expand Down Expand Up @@ -758,8 +759,11 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
}

bool InitSingleKMeans(TIndexBuildInfo& buildInfo) {
if (!buildInfo.DoneShards.empty() || !buildInfo.InProgressShards.empty() || !buildInfo.ToUploadShards.empty()
|| buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::Local) {
if (!buildInfo.DoneShards.empty() || !buildInfo.InProgressShards.empty() || !buildInfo.ToUploadShards.empty()) {
return false;
}
if (buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::MultiLocal) {
InitMultiKMeans(buildInfo);
return false;
}
std::array<NScheme::TTypeInfo, 1> typeInfos{NScheme::NTypeIds::Uint32};
Expand Down Expand Up @@ -798,7 +802,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
}
}
}
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::Local;
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::MultiLocal;
buildInfo.Cluster2Shards.clear();
Y_ASSERT(buildInfo.InProgressShards.empty());
Y_ASSERT(buildInfo.DoneShards.empty());
Expand Down Expand Up @@ -833,6 +837,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
case TIndexBuildInfo::TKMeans::Reshuffle:
return SendKMeansReshuffle(buildInfo);
case TIndexBuildInfo::TKMeans::Local:
case TIndexBuildInfo::TKMeans::MultiLocal:
return SendKMeansLocal(buildInfo);
}
return true;
Expand All @@ -851,14 +856,22 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
Self->PersistBuildIndexProcessed(db, buildInfo);
}

void PersistKMeansState(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
NIceDb::TNiceDb db{txc.DB};
db.Table<Schema::KMeansTreeState>().Key(buildInfo.Id).Update(
NIceDb::TUpdate<Schema::KMeansTreeState::Level>(buildInfo.KMeans.Level),
NIceDb::TUpdate<Schema::KMeansTreeState::Parent>(buildInfo.KMeans.Parent),
NIceDb::TUpdate<Schema::KMeansTreeState::State>(buildInfo.KMeans.State)
);
}

bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
if (InitSingleKMeans(buildInfo)) {
LOG_D("FillIndex::SingleKMeans::Start " << buildInfo.KMeansTreeToDebugStr());
}
if (!SendVectorIndex(buildInfo)) {
return false;
}
// TODO(mbkkt) persist buildInfo.KMeans changes

LOG_D("FillIndex::SendVectorIndex::Done " << buildInfo.KMeansTreeToDebugStr());
if (!buildInfo.Sample.Rows.empty()) {
Expand All @@ -876,6 +889,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
if (!buildInfo.Sample.Rows.empty()) {
if (buildInfo.KMeans.NextState()) {
LOG_D("FillIndex::NextState::Start " << buildInfo.KMeansTreeToDebugStr());
PersistKMeansState(txc, buildInfo);
Progress(BuildId);
return false;
}
Expand All @@ -885,18 +899,21 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil

if (buildInfo.KMeans.NextParent()) {
LOG_D("FillIndex::NextParent::Start " << buildInfo.KMeansTreeToDebugStr());
PersistKMeansState(txc, buildInfo);
Progress(BuildId);
return false;
}

if (InitMultiKMeans(buildInfo)) {
LOG_D("FillIndex::MultiKMeans::Start " << buildInfo.KMeansTreeToDebugStr());
PersistKMeansState(txc, buildInfo);
Progress(BuildId);
return false;
}

if (buildInfo.KMeans.NextLevel()) {
LOG_D("FillIndex::NextLevel::Start " << buildInfo.KMeansTreeToDebugStr());
PersistKMeansState(txc, buildInfo);
ChangeState(BuildId, TIndexBuildInfo::EState::DropBuild);
Progress(BuildId);
return false;
Expand Down Expand Up @@ -1391,18 +1408,29 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
return true;
}

NIceDb::TNiceDb db(txc.DB);
if (record.ProbabilitiesSize()) {
Y_ASSERT(record.RowsSize());
auto& probabilities = record.GetProbabilities();
auto& rows = *record.MutableRows();
Y_ASSERT(probabilities.size() == rows.size());
auto& sample = buildInfo.Sample.Rows;
auto from = sample.size();
for (int i = 0; i != probabilities.size(); ++i) {
if (probabilities[i] >= buildInfo.Sample.MaxProbability) {
break;
}
buildInfo.Sample.Rows.emplace_back(probabilities[i], std::move(rows[i]));
sample.emplace_back(probabilities[i], std::move(rows[i]));
}
if (buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K)) {
from = 0;
}
for (; from < sample.size(); ++from) {
db.Table<Schema::KMeansTreeSample>().Key(buildInfo.Id, from).Update(
NIceDb::TUpdate<Schema::KMeansTreeSample::Row>(sample[from].P),
NIceDb::TUpdate<Schema::KMeansTreeSample::Data>(sample[from].Row)
);
}
buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K);
}

TBillingStats stats{0, 0, record.GetReadRows(), record.GetReadBytes()};
Expand All @@ -1414,7 +1442,6 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex
shardStatus.DebugMessage = issues.ToString();
shardStatus.Status = record.GetStatus();

NIceDb::TNiceDb db(txc.DB);
switch (shardStatus.Status) {
case NKikimrIndexBuilder::EBuildStatus::DONE:
if (buildInfo.InProgressShards.erase(shardIdx)) {
Expand Down
35 changes: 28 additions & 7 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3034,15 +3034,16 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {

struct TKMeans {
// TODO(mbkkt) move to TVectorIndexKmeansTreeDescription
ui32 K = 4;
ui32 Levels = 5;
ui32 K = 0;
ui32 Levels = 0;

// progress
enum EState : ui32 {
Sample = 0,
// Recompute,
Reshuffle,
Local,
MultiLocal,
};
ui32 Level = 1;

Expand Down Expand Up @@ -3113,6 +3114,17 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
return true;
}

void Set(ui32 level, ui32 parent, ui32 state) {
// TODO(mbkkt) make it without cycles
while (Level < level) {
NextLevel();
}
while (Parent < parent) {
NextParent();
}
State = static_cast<EState>(state);
}

NKikimrTxDataShard::TEvLocalKMeansRequest::EState GetUpload() const {
if (Parent == 0) {
if (NeedsAnotherLevel()) {
Expand Down Expand Up @@ -3244,13 +3256,14 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
ui64 MaxProbability = std::numeric_limits<ui64>::max();
bool Sent = false;

void MakeWeakTop(ui64 k) {
bool MakeWeakTop(ui64 k) {
// 2 * k is needed to make it linear, 2 * N at all.
// x * k approximately is x / (x - 1) * N, but with larger x more memory used
if (Rows.size() < 2 * k) {
return;
return false;
}
MakeTop(k);
return true;
}

void MakeStrictTop(ui64 k) {
Expand All @@ -3276,6 +3289,11 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
Sent = false;
}

void Set(ui32 row, ui64 probability, TString data) {
Rows.emplace_back(probability, std::move(data));
MaxProbability = std::max(probability + 1, MaxProbability + 1) - 1;
}

private:
void MakeTop(ui64 k) {
Y_ASSERT(k > 0);
Expand Down Expand Up @@ -3381,9 +3399,12 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
}

switch (creationConfig.GetSpecializedIndexDescriptionCase()) {
case NKikimrSchemeOp::TIndexCreationConfig::kVectorIndexKmeansTreeDescription:
indexInfo->SpecializedIndexDescription = std::move(*creationConfig.MutableVectorIndexKmeansTreeDescription());
break;
case NKikimrSchemeOp::TIndexCreationConfig::kVectorIndexKmeansTreeDescription: {
auto& desc = *creationConfig.MutableVectorIndexKmeansTreeDescription();
indexInfo->KMeans.K = std::max<ui32>(2, desc.settings().clusters());
indexInfo->KMeans.Levels = std::max<ui32>(1, desc.settings().levels());
indexInfo->SpecializedIndexDescription =std::move(desc);
} break;
case NKikimrSchemeOp::TIndexCreationConfig::SPECIALIZEDINDEXDESCRIPTION_NOT_SET:
/* do nothing */
break;
Expand Down
34 changes: 33 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -1874,6 +1874,36 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, Description>;
};

struct KMeansTreeState : Table<112> {
struct Id : Column<1, NScheme::NTypeIds::Uint64> { using Type = TIndexBuildId; };
struct Level : Column<2, NScheme::NTypeIds::Uint32> {};
struct Parent : Column<3, NScheme::NTypeIds::Uint32> {};
struct State : Column<4, NScheme::NTypeIds::Uint32> {};

using TKey = TableKey<Id>;
using TColumns = TableColumns<
Id,
Level,
Parent,
State
>;
};

struct KMeansTreeSample : Table<113> {
struct Id : Column<1, NScheme::NTypeIds::Uint64> { using Type = TIndexBuildId; };
struct Row : Column<2, NScheme::NTypeIds::Uint32> {};
struct Probability : Column<3, NScheme::NTypeIds::Uint64> {};
struct Data : Column<4, NScheme::NTypeIds::String> {};

using TKey = TableKey<Id, Row>;
using TColumns = TableColumns<
Id,
Row,
Probability,
Data
>;
};

using TTables = SchemaTables<
Paths,
TxInFlight,
Expand Down Expand Up @@ -1984,7 +2014,9 @@ struct Schema : NIceDb::Schema {
View,
BackgroundSessions,
ResourcePool,
BackupCollection
BackupCollection,
KMeansTreeState,
KMeansTreeSample
>;

static constexpr ui64 SysParam_NextPathId = 1;
Expand Down

0 comments on commit c1031d9

Please sign in to comment.