Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 committed Dec 22, 2024
1 parent 95b4cfe commit 2a982f1
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 10 deletions.
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,10 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
// Don't need snapshot for WriteOnly transaction.
return false;
} else if (hasEffects) {
// Has reads & writes => need snapshot
// ReadWrite transaction => need snapshot
return true;
}
// ReadOnly transaction here
} else {
YQL_ENSURE(txCtx.EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE);

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
if (NeedUncommittedChangesFlush || HasOlapTable) {
return !DeferredEffects.Empty();
}
if (EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW && !tx) {
if (EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW && !tx && HasTableRead) {
// RW transaction
YQL_ENSURE(HasTableWrite);
return !DeferredEffects.Empty();
}

Expand Down Expand Up @@ -351,6 +353,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;
bool HasTableRead = false;

bool NeedUncommittedChangesFlush = false;
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,9 @@ class TKqpQueryState : public TNonCopyable {
}

if (TxCtx->EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) {
// Snapshot isolation can use only uncommitted data.
return false;
// ReadWrite snapshot isolation transaction with can only use uncommitted data.
// WriteOnly snapshot isolation transaction is executed like serializable transaction.
return !TxCtx->HasTableRead;
}

if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
Expand Down
11 changes: 5 additions & 6 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,9 +899,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
const bool hasOlapWrite = ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
const bool hasOltpWrite = ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
QueryState->TxCtx->HasOlapTable |= hasOlapWrite || ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery);
QueryState->TxCtx->HasOltpTable |= hasOltpWrite || ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery);
const bool hasOlapRead = ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery);
const bool hasOltpRead = ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery);
QueryState->TxCtx->HasOlapTable |= hasOlapWrite || hasOlapRead;
QueryState->TxCtx->HasOltpTable |= hasOltpWrite || hasOltpRead;
QueryState->TxCtx->HasTableWrite |= hasOlapWrite || hasOltpWrite;
QueryState->TxCtx->HasTableRead |= hasOlapRead || hasOltpRead;
if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite
&& !Settings.TableService.GetEnableHtapTx() && !QueryState->IsSplitted()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
Expand Down Expand Up @@ -1136,15 +1139,11 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
return;
}

Cerr << "TEST:: ExecuteOrDefer" << Endl;
if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) {
Cerr << "TEST:: EXEC DEF" << Endl;
ExecuteDeferredEffectsImmediately(tx);
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) {
Cerr << "TEST:: COMMIT SHOULD = " << commit << Endl;
ExecutePhyTx(tx, commit);
} else {
Cerr << "TEST:: SKIP" << Endl;
ReplySuccess();
}
}
Expand Down

0 comments on commit 2a982f1

Please sign in to comment.