diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 444376163c1b..bcb66aa49750 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2575,7 +2575,13 @@ class TKqpDataExecuter : public TKqpExecuterBaseMutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); - if (columnShardArbiter) { + if (!columnShardArbiter) { + *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; + *shardTx->MutableLocks()->MutableReceivingShards() = receivingShards; + if (arbiter) { + shardTx->MutableLocks()->SetArbiterShard(arbiter); + } + } else if (!sendingShardsSet.empty() && !receivingShards.empty()) { shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter); shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter); if (sendingShardsSet.contains(shardId)) { @@ -2584,23 +2590,30 @@ class TKqpDataExecuter : public TKqpExecuterBaseMutableLocks()->AddReceivingShards(shardId); } + std::sort( + std::begin(*shardTx->MutableLocks()->MutableSendingShards()), + std::end(*shardTx->MutableLocks()->MutableSendingShards())); + std::sort( + std::begin(*shardTx->MutableLocks()->MutableReceivingShards()), + std::end(*shardTx->MutableLocks()->MutableReceivingShards())); AFL_ENSURE(!arbiter); - } else { - *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; - *shardTx->MutableLocks()->MutableReceivingShards() = receivingShards; - if (arbiter) { - shardTx->MutableLocks()->SetArbiterShard(arbiter); - } } } for (auto& [shardId, tx] : evWriteTxs) { tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); - if (columnShardArbiter && *columnShardArbiter == shardId) { + if (!columnShardArbiter) { + *tx->MutableLocks()->MutableSendingShards() = sendingShards; + *tx->MutableLocks()->MutableReceivingShards() = receivingShards; + if (arbiter) { + tx->MutableLocks()->SetArbiterShard(arbiter); + } + } else if (*columnShardArbiter == shardId + && !sendingShardsSet.empty() && !receivingShardsSet.empty()) { tx->MutableLocks()->SetArbiterColumnShard(*columnShardArbiter); *tx->MutableLocks()->MutableSendingShards() = sendingShards; *tx->MutableLocks()->MutableReceivingShards() = receivingShards; - } else if (columnShardArbiter) { + } else if (!sendingShardsSet.empty() && !receivingShardsSet.empty()) { tx->MutableLocks()->SetArbiterColumnShard(*columnShardArbiter); tx->MutableLocks()->AddSendingShards(*columnShardArbiter); tx->MutableLocks()->AddReceivingShards(*columnShardArbiter); @@ -2610,18 +2623,21 @@ class TKqpDataExecuter : public TKqpExecuterBaseMutableLocks()->AddReceivingShards(shardId); } - } else { - *tx->MutableLocks()->MutableSendingShards() = sendingShards; - *tx->MutableLocks()->MutableReceivingShards() = receivingShards; - if (arbiter) { - tx->MutableLocks()->SetArbiterShard(arbiter); - } + std::sort( + std::begin(*tx->MutableLocks()->MutableSendingShards()), + std::end(*tx->MutableLocks()->MutableSendingShards())); + std::sort( + std::begin(*tx->MutableLocks()->MutableReceivingShards()), + std::end(*tx->MutableLocks()->MutableReceivingShards())); } } for (auto& [shardId, t] : topicTxs) { t.tx.SetOp(NKikimrPQ::TDataTransaction::Commit); - if (columnShardArbiter) { + if (!columnShardArbiter) { + *t.tx.MutableSendingShards() = sendingShards; + *t.tx.MutableReceivingShards() = receivingShards; + } else if (!sendingShardsSet.empty() && !receivingShardsSet.empty()) { t.tx.AddSendingShards(*columnShardArbiter); t.tx.AddReceivingShards(*columnShardArbiter); if (sendingShardsSet.contains(shardId)) { @@ -2630,9 +2646,12 @@ class TKqpDataExecuter : public TKqpExecuterBaseSetArbiterShard(*prepareSettings.Arbiter); } - } else if (prepareSettings.ArbiterColumnShard == shardId) { + } else if (prepareSettings.ArbiterColumnShard == shardId + && !prepareSettings.SendingShards.empty() + && !prepareSettings.ReceivingShards.empty()) { protoLocks->SetArbiterColumnShard(*prepareSettings.ArbiterColumnShard); for (const ui64 sendingShardId : prepareSettings.SendingShards) { protoLocks->AddSendingShards(sendingShardId); @@ -82,7 +84,8 @@ namespace { for (const ui64 receivingShardId : prepareSettings.ReceivingShards) { protoLocks->AddReceivingShards(receivingShardId); } - } else { + } else if (!prepareSettings.SendingShards.empty() + && !prepareSettings.ReceivingShards.empty()) { protoLocks->SetArbiterColumnShard(*prepareSettings.ArbiterColumnShard); protoLocks->AddSendingShards(*prepareSettings.ArbiterColumnShard); protoLocks->AddReceivingShards(*prepareSettings.ArbiterColumnShard); @@ -92,6 +95,12 @@ namespace { if (prepareSettings.ReceivingShards.contains(shardId)) { protoLocks->AddReceivingShards(shardId); } + std::sort( + std::begin(*protoLocks->MutableSendingShards()), + std::end(*protoLocks->MutableSendingShards())); + std::sort( + std::begin(*protoLocks->MutableReceivingShards()), + std::end(*protoLocks->MutableReceivingShards())); } const auto locks = txManager->GetLocks(shardId); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index f64d2e75075b..151f32d48da2 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1291,18 +1291,22 @@ class TKqpSessionActor : public TActorBootstrapped { request.LocksOp = ELocksOp::Commit; } } else { + AFL_ENSURE(!tx || !txCtx.HasOlapTable); + AFL_ENSURE(txCtx.DeferredEffects.Empty() || !txCtx.HasOlapTable); if (hasLocks || txCtx.TopicOperations.HasOperations()) { - if (!txCtx.GetSnapshot().IsValid() || txCtx.TxHasEffects() || txCtx.TopicOperations.HasOperations()) { + bool hasUncommittedEffects = false; + for (auto& [lockId, lock] : txCtx.Locks.LocksMap) { + auto dsLock = ExtractLock(lock.GetValueRef(txCtx.Locks.LockType)); + request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock); + hasUncommittedEffects |= dsLock.GetHasWrites(); + } + if (!txCtx.GetSnapshot().IsValid() || (tx && txCtx.TxHasEffects()) || !txCtx.DeferredEffects.Empty() || hasUncommittedEffects || txCtx.TopicOperations.HasOperations()) { LOG_D("TExecPhysicalRequest, tx has commit locks"); request.LocksOp = ELocksOp::Commit; } else { LOG_D("TExecPhysicalRequest, tx has rollback locks"); request.LocksOp = ELocksOp::Rollback; } - for (auto& [lockId, lock] : txCtx.Locks.LocksMap) { - auto dsLock = ExtractLock(lock.GetValueRef(txCtx.Locks.LockType)); - request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock); - } } }