Skip to content

Commit

Permalink
Merge 8388cfe into 21b0c4d
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Jan 9, 2025
2 parents 21b0c4d + 8388cfe commit 854a456
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 26 deletions.
57 changes: 38 additions & 19 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2575,7 +2575,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

for (auto& [shardId, shardTx] : datashardTxs) {
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
if (columnShardArbiter) {
if (!columnShardArbiter) {
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
}
} else if (!sendingShardsSet.empty() && !receivingShards.empty()) {
shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter);
shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter);
if (sendingShardsSet.contains(shardId)) {
Expand All @@ -2584,23 +2590,30 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (receivingShardsSet.contains(shardId)) {
shardTx->MutableLocks()->AddReceivingShards(shardId);
}
std::sort(
std::begin(*shardTx->MutableLocks()->MutableSendingShards()),
std::end(*shardTx->MutableLocks()->MutableSendingShards()));
std::sort(
std::begin(*shardTx->MutableLocks()->MutableReceivingShards()),
std::end(*shardTx->MutableLocks()->MutableReceivingShards()));
AFL_ENSURE(!arbiter);
} else {
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
}
}
}

for (auto& [shardId, tx] : evWriteTxs) {
tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
if (columnShardArbiter && *columnShardArbiter == shardId) {
if (!columnShardArbiter) {
*tx->MutableLocks()->MutableSendingShards() = sendingShards;
*tx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
tx->MutableLocks()->SetArbiterShard(arbiter);
}
} else if (*columnShardArbiter == shardId
&& !sendingShardsSet.empty() && !receivingShardsSet.empty()) {
tx->MutableLocks()->SetArbiterColumnShard(*columnShardArbiter);
*tx->MutableLocks()->MutableSendingShards() = sendingShards;
*tx->MutableLocks()->MutableReceivingShards() = receivingShards;
} else if (columnShardArbiter) {
} else if (!sendingShardsSet.empty() && !receivingShardsSet.empty()) {
tx->MutableLocks()->SetArbiterColumnShard(*columnShardArbiter);
tx->MutableLocks()->AddSendingShards(*columnShardArbiter);
tx->MutableLocks()->AddReceivingShards(*columnShardArbiter);
Expand All @@ -2610,18 +2623,21 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (receivingShardsSet.contains(shardId)) {
tx->MutableLocks()->AddReceivingShards(shardId);
}
} else {
*tx->MutableLocks()->MutableSendingShards() = sendingShards;
*tx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
tx->MutableLocks()->SetArbiterShard(arbiter);
}
std::sort(
std::begin(*tx->MutableLocks()->MutableSendingShards()),
std::end(*tx->MutableLocks()->MutableSendingShards()));
std::sort(
std::begin(*tx->MutableLocks()->MutableReceivingShards()),
std::end(*tx->MutableLocks()->MutableReceivingShards()));
}
}

for (auto& [shardId, t] : topicTxs) {
t.tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
if (columnShardArbiter) {
if (!columnShardArbiter) {
*t.tx.MutableSendingShards() = sendingShards;
*t.tx.MutableReceivingShards() = receivingShards;
} else if (!sendingShardsSet.empty() && !receivingShardsSet.empty()) {
t.tx.AddSendingShards(*columnShardArbiter);
t.tx.AddReceivingShards(*columnShardArbiter);
if (sendingShardsSet.contains(shardId)) {
Expand All @@ -2630,9 +2646,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (receivingShardsSet.contains(shardId)) {
t.tx.AddReceivingShards(shardId);
}
} else {
*t.tx.MutableSendingShards() = sendingShards;
*t.tx.MutableReceivingShards() = receivingShards;
std::sort(
std::begin(*t.tx.MutableSendingShards()),
std::end(*t.tx.MutableSendingShards()));
std::sort(
std::begin(*t.tx.MutableReceivingShards()),
std::end(*t.tx.MutableReceivingShards()));
}
YQL_ENSURE(!arbiter);
}
Expand Down
13 changes: 11 additions & 2 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ namespace {
if (prepareSettings.Arbiter) {
protoLocks->SetArbiterShard(*prepareSettings.Arbiter);
}
} else if (prepareSettings.ArbiterColumnShard == shardId) {
} else if (prepareSettings.ArbiterColumnShard == shardId
&& !prepareSettings.SendingShards.empty()
&& !prepareSettings.ReceivingShards.empty()) {
protoLocks->SetArbiterColumnShard(*prepareSettings.ArbiterColumnShard);
for (const ui64 sendingShardId : prepareSettings.SendingShards) {
protoLocks->AddSendingShards(sendingShardId);
}
for (const ui64 receivingShardId : prepareSettings.ReceivingShards) {
protoLocks->AddReceivingShards(receivingShardId);
}
} else {
} else if (!prepareSettings.SendingShards.empty()
&& !prepareSettings.ReceivingShards.empty()) {
protoLocks->SetArbiterColumnShard(*prepareSettings.ArbiterColumnShard);
protoLocks->AddSendingShards(*prepareSettings.ArbiterColumnShard);
protoLocks->AddReceivingShards(*prepareSettings.ArbiterColumnShard);
Expand All @@ -92,6 +95,12 @@ namespace {
if (prepareSettings.ReceivingShards.contains(shardId)) {
protoLocks->AddReceivingShards(shardId);
}
std::sort(
std::begin(*protoLocks->MutableSendingShards()),
std::end(*protoLocks->MutableSendingShards()));
std::sort(
std::begin(*protoLocks->MutableReceivingShards()),
std::end(*protoLocks->MutableReceivingShards()));
}

const auto locks = txManager->GetLocks(shardId);
Expand Down
13 changes: 8 additions & 5 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1291,18 +1291,21 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
request.LocksOp = ELocksOp::Commit;
}
} else {
AFL_ENSURE(!tx || !txCtx.HasOlapTable);
if (hasLocks || txCtx.TopicOperations.HasOperations()) {
if (!txCtx.GetSnapshot().IsValid() || txCtx.TxHasEffects() || txCtx.TopicOperations.HasOperations()) {
bool hasUncommittedEffects = false;
for (auto& [lockId, lock] : txCtx.Locks.LocksMap) {
auto dsLock = ExtractLock(lock.GetValueRef(txCtx.Locks.LockType));
request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock);
hasUncommittedEffects |= dsLock.GetHasWrites();
}
if (!txCtx.GetSnapshot().IsValid() || (tx && txCtx.TxHasEffects()) || hasUncommittedEffects || txCtx.TopicOperations.HasOperations()) {
LOG_D("TExecPhysicalRequest, tx has commit locks");
request.LocksOp = ELocksOp::Commit;
} else {
LOG_D("TExecPhysicalRequest, tx has rollback locks");
request.LocksOp = ELocksOp::Rollback;
}
for (auto& [lockId, lock] : txCtx.Locks.LocksMap) {
auto dsLock = ExtractLock(lock.GetValueRef(txCtx.Locks.LockType));
request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock);
}
}
}

Expand Down

0 comments on commit 854a456

Please sign in to comment.