Skip to content

Commit

Permalink
Added full graph explore
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Feb 4, 2025
1 parent fee5500 commit 9de8d63
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 135 deletions.
156 changes: 21 additions & 135 deletions ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ struct TKiExploreTxResults {

bool ConcurrentResults = true;

THashSet<const TExprNode*> Ops;
TVector<TExprBase> Sync;
TVector<TKiQueryBlock> QueryBlocks;
bool HasExecute;
Expand Down Expand Up @@ -331,16 +330,6 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
return false;
}

bool IsPgRead(const TExprBase& node, TTypeAnnotationContext& types) {
if (auto maybePgRead = node.Maybe<TPgTableContent>()) {
auto dataSourceProviderIt = types.DataSourceMap.find(NYql::PgProviderName);
if (dataSourceProviderIt != types.DataSourceMap.end()) {
return true;
}
}
return false;
}

bool IsDqWrite(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types) {
if (node.Ref().ChildrenSize() <= 1) {
return false;
Expand All @@ -362,23 +351,9 @@ bool IsDqWrite(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
return false;
}

bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes,
bool ExploreNode(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes,
TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types) {

if (txRes.Ops.cend() != txRes.Ops.find(node.Raw())) {
return true;
}

if (node.Maybe<TCoWorld>()) {
txRes.Ops.insert(node.Raw());
return true;
}

if (auto maybeLeft = node.Maybe<TCoLeft>()) {
txRes.Ops.insert(node.Raw());
return ExploreTx(maybeLeft.Cast().Input(), ctx, dataSink, txRes, tablesData, types);
}

auto checkDataSource = [dataSink] (const TKiDataSource& ds) {
return dataSink.Cluster().Raw() == ds.Cluster().Raw();
};
Expand All @@ -399,28 +374,14 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
YQL_ENSURE(key.Extract(read.TableKey().Ref()));
YQL_ENSURE(key.GetKeyType() == TKikimrKey::Type::Table);
auto table = key.GetTablePath();
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(maybeRead.Cast().World(), ctx, dataSink, txRes, tablesData, types);

YQL_ENSURE(tablesData);
const auto& tableData = tablesData->ExistingTable(cluster, table);
YQL_ENSURE(tableData.Metadata);
auto readColumns = read.GetSelectColumns(ctx, tableData);
txRes.AddReadOpToQueryBlock(key, readColumns, tableData.Metadata);
txRes.AddTableOperation(BuildTableOpNode(cluster, table, TYdbOperation::Select, read.Pos(), ctx));
return result;
}

if (IsDqRead(node, ctx, types, true, &txRes.HasErrors)) {
txRes.Ops.insert(node.Raw());
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
}

if (IsPgRead(node, types)) {
txRes.Ops.insert(node.Raw());
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
return true;
}

if (auto maybeWrite = node.Maybe<TKiWriteTable>()) {
Expand All @@ -430,8 +391,6 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}

auto table = write.Table().Value();
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(write.World(), ctx, dataSink, txRes, tablesData, types);
auto tableOp = GetTableOp(write);

YQL_ENSURE(tablesData);
Expand Down Expand Up @@ -472,14 +431,12 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}

txRes.AddTableOperation(BuildTableOpNode(cluster, table, tableOp, write.Pos(), ctx));
return result;
return true;
}

if (IsDqWrite(node, ctx, types)) {
txRes.Ops.insert(node.Raw());
txRes.AddEffect(node, THashMap<TString, TPrimitiveYdbOperations>{});
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
return true;
}

if (auto maybeUpdate = node.Maybe<TKiUpdateTable>()) {
Expand All @@ -489,8 +446,6 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}

auto table = update.Table().Value();
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(update.World(), ctx, dataSink, txRes, tablesData, types);
const auto tableOp = TYdbOperation::Update;

YQL_ENSURE(tablesData);
Expand Down Expand Up @@ -525,7 +480,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}

txRes.AddTableOperation(BuildTableOpNode(cluster, table, tableOp, update.Pos(), ctx));
return result;
return true;
}

if (auto maybeDelete = node.Maybe<TKiDeleteTable>()) {
Expand All @@ -535,8 +490,6 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}

auto table = del.Table().Value();
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(del.World(), ctx, dataSink, txRes, tablesData, types);
const auto tableOp = TYdbOperation::Delete;

YQL_ENSURE(tablesData);
Expand Down Expand Up @@ -564,7 +517,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}

txRes.AddTableOperation(BuildTableOpNode(cluster, table, tableOp, del.Pos(), ctx));
return result;
return true;
}

if (auto maybeCreate = node.Maybe<TKiCreateTable>()) {
Expand All @@ -574,10 +527,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}

auto table = create.Table().Value();
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(create.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildTableOpNode(cluster, table, TYdbOperation::CreateTable, create.Pos(), ctx));
return result;
return true;
}

if (auto maybeDrop = node.Maybe<TKiDropTable>()) {
Expand All @@ -587,10 +538,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}

auto table = drop.Table().Value();
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(drop.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildTableOpNode(cluster, table, TYdbOperation::DropTable, drop.Pos(), ctx));
return result;
return true;
}

if (auto maybeAlter = node.Maybe<TKiAlterTable>()) {
Expand All @@ -600,10 +549,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}

auto table = alter.Table().Value();
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(alter.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildTableOpNode(cluster, table, TYdbOperation::AlterTable, alter.Pos(), ctx));
return result;
return true;
}

if (auto maybeCreateUser = node.Maybe<TKiCreateUser>()) {
Expand All @@ -612,10 +559,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return false;
}

txRes.Ops.insert(node.Raw());
auto result = ExploreTx(createUser.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildYdbOpNode(cluster, TYdbOperation::CreateUser, createUser.Pos(), ctx));
return result;
return true;
}

if (auto maybeAlterUser = node.Maybe<TKiAlterUser>()) {
Expand All @@ -624,10 +569,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return false;
}

txRes.Ops.insert(node.Raw());
auto result = ExploreTx(alterUser.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildYdbOpNode(cluster, TYdbOperation::AlterUser, alterUser.Pos(), ctx));
return result;
return true;
}

if (auto maybeDropUser = node.Maybe<TKiDropUser>()) {
Expand All @@ -636,10 +579,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return false;
}

txRes.Ops.insert(node.Raw());
auto result = ExploreTx(dropUser.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildYdbOpNode(cluster, TYdbOperation::DropUser, dropUser.Pos(), ctx));
return result;
return true;
}

if (auto maybeCreateGroup = node.Maybe<TKiCreateGroup>()) {
Expand All @@ -648,10 +589,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return false;
}

txRes.Ops.insert(node.Raw());
auto result = ExploreTx(createGroup.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildYdbOpNode(cluster, TYdbOperation::CreateGroup, createGroup.Pos(), ctx));
return result;
return true;
}

if (auto maybeAlterGroup = node.Maybe<TKiAlterGroup>()) {
Expand All @@ -660,10 +599,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return false;
}

txRes.Ops.insert(node.Raw());
auto result = ExploreTx(alterGroup.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildYdbOpNode(cluster, TYdbOperation::AlterGroup, alterGroup.Pos(), ctx));
return result;
return true;
}

if (auto maybeRenameGroup = node.Maybe<TKiRenameGroup>()) {
Expand All @@ -672,10 +609,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return false;
}

txRes.Ops.insert(node.Raw());
auto result = ExploreTx(renameGroup.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildYdbOpNode(cluster, TYdbOperation::RenameGroup, renameGroup.Pos(), ctx));
return result;
return true;
}

if (auto maybeDropGroup = node.Maybe<TKiDropGroup>()) {
Expand All @@ -684,10 +619,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return false;
}

txRes.Ops.insert(node.Raw());
auto result = ExploreTx(dropGroup.World(), ctx, dataSink, txRes, tablesData, types);
txRes.AddTableOperation(BuildYdbOpNode(cluster, TYdbOperation::DropGroup, dropGroup.Pos(), ctx));
return result;
return true;
}

if (auto maybeExecQuery = node.Maybe<TKiExecDataQuery>()) {
Expand All @@ -705,78 +638,36 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T

if (commit.DataSink().Maybe<TKiDataSink>() && checkDataSink(commit.DataSink().Cast<TKiDataSink>())) {
txRes.Sync.push_back(commit);
return true;
}

return ExploreTx(commit.World(), ctx, dataSink, txRes, tablesData, types);
}

if (auto maybeSync = node.Maybe<TCoSync>()) {
txRes.Ops.insert(node.Raw());
for (auto child : maybeSync.Cast()) {
if (!ExploreTx(child, ctx, dataSink, txRes, tablesData, types)) {
return false;
}
}
return true;
}

if (node.Maybe<TResWrite>() ||
node.Maybe<TResPull>())
{
txRes.Ops.insert(node.Raw());
bool result = ExploreTx(TExprBase(node.Ref().ChildPtr(0)), ctx, dataSink, txRes, tablesData, types);
// Cerr << KqpExprToPrettyString(*node.Raw(), ctx) << Endl;
txRes.AddResult(node);
return result;
return true;
}

if (node.Ref().IsCallable(ConfigureName)) {
txRes.Sync.push_back(node);
return true;
}

return false;
}

bool IsKikimrPureNode(const TExprNode::TPtr& node) {
if (node->IsCallable("TypeOf")) {
return true;
}

if (TMaybeNode<TCoDataSource>(node) ||
TMaybeNode<TCoDataSink>(node))
{
return true;
}

if (!node->GetTypeAnn()->IsComposable()) {
return false;
}

return true;
}

bool CheckTx(TExprBase txStart, const TKiDataSink& dataSink, const THashSet<const TExprNode*>& txOps,
const THashSet<const TExprNode*>& txSync)
bool ExploreTx(TExprBase root, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes,
TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types)
{
bool hasErrors = false;
VisitExpr(txStart.Ptr(), [&txOps, &txSync, &hasErrors, dataSink] (const TExprNode::TPtr& node) {
VisitExpr(root.Ptr(), {}, [&hasErrors, &ctx, &dataSink, &txRes, tablesData, &types](const TExprNode::TPtr& node) {
if (hasErrors) {
return false;
}

if (txSync.find(node.Get()) != txSync.cend()) {
return false;
}

if (auto maybeCommit = TMaybeNode<TCoCommit>(node)) {
if (maybeCommit.Cast().DataSink().Raw() != dataSink.Raw()) {
return true;
}
}

if (!IsKikimrPureNode(node) && txOps.find(node.Get()) == txOps.cend()) {
if (!ExploreNode(TExprBase(node), ctx, dataSink, txRes, tablesData, types)) {
hasErrors = true;
return false;
}
Expand Down Expand Up @@ -964,11 +855,6 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TStringBuf datab
return node.Ptr();
}

auto txSyncSet = txExplore.GetSyncSet();
if (!CheckTx(commit.World(), kiDataSink, txExplore.Ops, txSyncSet)) {
return node.Ptr();
}

bool hasScheme;
bool hasData;
txExplore.GetTableOperations(hasScheme, hasData);
Expand Down
Loading

0 comments on commit 9de8d63

Please sign in to comment.