Skip to content

Commit

Permalink
Fix mediator merge for out-of-order transactions (ydb-platform#14182)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Feb 4, 2025
1 parent 60a898f commit e6e5842
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 63 deletions.
104 changes: 97 additions & 7 deletions ydb/core/tx/mediator/mediator_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,20 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
}
};

struct TCoordinatorIndex {
size_t Value;

explicit TCoordinatorIndex(size_t value)
: Value(value)
{}
};

class TMediatorTestWithWatcher : public NUnitTest::TBaseFixture {
public:
TMediatorTestWithWatcher(ui64 coordinatorCount = 1)
: CoordinatorCount(coordinatorCount)
{}

void SetUp(NUnitTest::TTestContext&) override {
auto& pm = PM.emplace();

Expand All @@ -434,7 +446,9 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TABLETQUEUE, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_PRIVATE, NLog::PRI_TRACE);

CoordinatorId = ChangeStateStorage(TTestTxConfig::Coordinator, Server->GetSettings().Domain);
for (ui64 i = 0; i < CoordinatorCount; ++i) {
CoordinatorIds.push_back(ChangeStateStorage(TTestTxConfig::Coordinator + i, Server->GetSettings().Domain));
}
MediatorId = ChangeStateStorage(TTestTxConfig::TxTablet0, Server->GetSettings().Domain);

MediatorBootstrapper = CreateTestBootstrapper(runtime,
Expand All @@ -451,7 +465,9 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
auto msg = std::make_unique<TEvSubDomain::TEvConfigure>();
msg->Record.SetVersion(1);
msg->Record.SetPlanResolution(500);
msg->Record.AddCoordinators(CoordinatorId);
for (ui64 coordinatorId : CoordinatorIds) {
msg->Record.AddCoordinators(coordinatorId);
}
msg->Record.AddMediators(MediatorId);
msg->Record.SetTimeCastBucketsPerMediator(1);
runtime.SendToPipe(MediatorId, Sender, msg.release());
Expand Down Expand Up @@ -482,25 +498,29 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
return client;
}

NKikimrTx::TEvCoordinatorSyncResult Sync(const TActorId& queue, ui64 genCookie) {
NKikimrTx::TEvCoordinatorSyncResult Sync(const TActorId& queue, TCoordinatorIndex coordinatorIndex, ui64 genCookie) {
auto& runtime = GetRuntime();

TActorId client = QueuePipeClient(queue);
runtime.SendToPipe(client, queue, new TEvTxCoordinator::TEvCoordinatorSync(genCookie, MediatorId, CoordinatorId));
runtime.SendToPipe(client, queue, new TEvTxCoordinator::TEvCoordinatorSync(genCookie, MediatorId, CoordinatorIds.at(coordinatorIndex.Value)));
auto ev = runtime.GrabEdgeEventRethrow<TEvTxCoordinator::TEvCoordinatorSyncResult>(queue);
auto* msg = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetCookie(), genCookie);
UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus(), NKikimrProto::OK);
return std::move(msg->Record);
}

void SendStep(const TActorId& queue, ui32 gen, ui64 step, THashMap<ui64, std::vector<ui64>> txs = {}) {
NKikimrTx::TEvCoordinatorSyncResult Sync(const TActorId& queue, ui64 genCookie) {
return Sync(queue, TCoordinatorIndex(0), genCookie);
}

void SendStep(const TActorId& queue, TCoordinatorIndex coordinatorIndex, ui32 gen, ui64 step, THashMap<ui64, std::vector<ui64>> txs = {}) {
auto& runtime = GetRuntime();

TActorId client = QueuePipeClient(queue);
// Note: prevStep is not actually used by mediator
auto msg = std::make_unique<TEvTxCoordinator::TEvCoordinatorStep>(
step, /* prevStep */ 0, MediatorId, CoordinatorId, gen);
step, /* prevStep */ 0, MediatorId, CoordinatorIds.at(coordinatorIndex.Value), gen);
size_t totalAffected = 0;
for (auto& pr : txs) {
auto* protoTx = msg->Record.AddTransactions();
Expand All @@ -514,6 +534,10 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
runtime.SendToPipe(client, queue, msg.release());
}

void SendStep(const TActorId& queue, ui32 gen, ui64 step, THashMap<ui64, std::vector<ui64>> txs = {}) {
return SendStep(queue, TCoordinatorIndex(0), gen, step, std::move(txs));
}

ui64 AddTargetTablet() {
auto& runtime = GetRuntime();

Expand Down Expand Up @@ -575,7 +599,8 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
TServer::TPtr Server;
TActorId Sender;

ui64 CoordinatorId;
ui64 CoordinatorCount;
std::vector<ui64> CoordinatorIds;

ui64 MediatorId;
TActorId MediatorBootstrapper;
Expand All @@ -589,6 +614,13 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
THashMap<TActorId, TActorId> PerQueuePipes; // Queue -> PipeClient
};

class TMediatorTestWithWatcherTwoCoordinators : public TMediatorTestWithWatcher {
public:
TMediatorTestWithWatcherTwoCoordinators()
: TMediatorTestWithWatcher(2)
{}
};

Y_UNIT_TEST_F(BasicTimecastUpdates, TMediatorTestWithWatcher) {
auto& runtime = GetRuntime();

Expand Down Expand Up @@ -1201,6 +1233,64 @@ Y_UNIT_TEST_SUITE(MediatorTest) {
}
}

Y_UNIT_TEST_F(OneCoordinatorResendTxNotLost, TMediatorTestWithWatcherTwoCoordinators) {
auto& runtime = GetRuntime();

ui64 tablet1 = AddTargetTablet();
ui64 tablet2 = AddTargetTablet();
AddWatchTablet(tablet1);
AddWatchTablet(tablet2);
WaitNoPending();
WatcherState->Updates.clear();

auto queue1 = runtime.AllocateEdgeActor();
Sync(queue1, TCoordinatorIndex(0), 1);
auto queue2 = runtime.AllocateEdgeActor();
Sync(queue2, TCoordinatorIndex(1), 1);

TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime);

SendStep(queue1, TCoordinatorIndex(0), /* gen */ 1, /* step */ 1010, {
{1, {tablet1, tablet2}},
});
SendStep(queue2, TCoordinatorIndex(1), /* gen */ 1, /* step */ 1010, {
{2, {tablet1, tablet2}},
});
runtime.SimulateSleep(TDuration::MilliSeconds(1));

UNIT_ASSERT_VALUES_EQUAL(WatcherState->Updates, MakeUpdates(
TGranularUpdate(1010, {{tablet1, 1009}, {tablet2, 1009}})));
WatcherState->Updates.clear();
UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);
blockedPlan.clear();

// Simulate one coordinator restarting and resending the step
auto queue3 = runtime.AllocateEdgeActor();
Sync(queue3, TCoordinatorIndex(1), 2);
SendStep(queue3, TCoordinatorIndex(1), /* gen */ 2, /* step */ 1010, {
{2, {tablet1, tablet2}},
});
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// Reboot tablets, we expect plans to be resent
RebootTablet(runtime, tablet1, Sender);
RebootTablet(runtime, tablet2, Sender);
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT_VALUES_EQUAL(WatcherState->Updates, MakeUpdates());
UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);

// Tablets must see both transactions
// Note: the bug was causing some transactions to be lost
THashSet<ui64> observedTxIds;
for (auto& ev : blockedPlan) {
auto* msg = ev->Get();
for (const auto& tx : msg->Record.GetTransactions()) {
observedTxIds.insert(tx.GetTxId());
}
}
UNIT_ASSERT_VALUES_EQUAL(observedTxIds.size(), 2u);
}

}

} // namespace NKikimr::NTxMediator
97 changes: 41 additions & 56 deletions ydb/core/tx/mediator/tablet_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
TStepEntry* const StepRef;
// The current list of transactions
TVector<TTx> Transactions;
// An updated list of transactions (after last sent to tablet)
// We may need to send acks to the updated AckTo actor
TVector<TTx> OutOfOrder;

TStep(TStepEntry* stepRef)
: StepRef(stepRef)
Expand All @@ -60,8 +57,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
return State == EState::Init && Queue.empty() && Watchers.empty();
}

void MergeOutOfOrder(TStep* sx);
void MergeToOutOfOrder(TStep* sx, TVector<TTx>&& update);
void MergeOutOfOrder(TStep* sx, TVector<TTx>&& update);
};

struct TStepEntry {
Expand Down Expand Up @@ -230,19 +226,19 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
TTabletEntry* tabletEntry = Tablets.FindPtr(tabletId);
if (tabletEntry == nullptr) {
// We don't have a tablet entry, which means no steps inflight
AckOoO(tabletId, step, msg->Transactions, ctx);
AckTransactions(tabletId, step, msg->Transactions, ctx);
return;
}

auto it = tabletEntry->Queue.find(step);
if (it == tabletEntry->Queue.end()) {
// This step is already confirmed, reply immediately
AckOoO(tabletId, step, msg->Transactions, ctx);
AckTransactions(tabletId, step, msg->Transactions, ctx);
return;
}

// Save possibly updated AckTo for later
tabletEntry->MergeToOutOfOrder(&it->second, std::move(msg->Transactions));
tabletEntry->MergeOutOfOrder(&it->second, std::move(msg->Transactions));
}

void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
Expand All @@ -264,8 +260,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
auto it = tabletEntry->Queue.begin();
while (it != tabletEntry->Queue.end()) {
TTabletEntry::TStep* sx = &it->second;
tabletEntry->MergeOutOfOrder(sx);
AckOoO(tabletId, sx->StepRef->Step, sx->Transactions, ctx);
AckTransactions(tabletId, sx->StepRef->Step, sx->Transactions, ctx);
// Note: will also auto-remove itself from sx->StepRef->TabletSteps
it = tabletEntry->Queue.erase(it);
}
Expand All @@ -288,7 +283,6 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
tabletEntry->State = TTabletEntry::EState::Connected;
for (auto& pr : tabletEntry->Queue) {
TTabletEntry::TStep* sx = &pr.second;
tabletEntry->MergeOutOfOrder(sx);
SendToTablet(sx, tabletId, ctx);
}
}
Expand Down Expand Up @@ -364,10 +358,14 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
}

TTabletEntry::TStep* sx = &it->second;
if (!sx->OutOfOrder.empty()) {
// Confirm out-of-order requests
AckOoO(tabletId, step, sx->OutOfOrder, ctx);
}
// Note: in the past we didn't ack all transactions (except out-of-order
// requests when coordinator reconnected), since they are acknowledged
// by tablets. However, events from tablets to coordinator may be lost,
// and those transactions will never be acknowledged until the network
// between coordinator and mediator also flaps. In the future we might
// want to avoid acking transactions by tablets directly, so it doesn't
// introduce unnecessary interconnect chatter to single nodes.
AckTransactions(tabletId, step, sx->Transactions, ctx);
// Note: will also auto-remove itself from sx->StepRef->TabletSteps
tabletEntry->Queue.erase(it);

Expand Down Expand Up @@ -402,7 +400,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
TimecastWatches.erase(ev->Sender);
}

void AckOoO(TTabletId tablet, TStepId step, const TVector<TTx>& transactions, const TActorContext& ctx) {
void AckTransactions(TTabletId tablet, TStepId step, const TVector<TTx>& transactions, const TActorContext& ctx) {
TMap<TActorId, std::unique_ptr<TEvTxProcessing::TEvPlanStepAck>> acks;
for (const TTx &tx : transactions) {
auto& ack = acks[tx.AckTo];
Expand Down Expand Up @@ -758,26 +756,6 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
THashMap<TActorId, TGranularServer> GranularServers;
};

/**
* Returns true when all transactions of x are present in superset
*/
static bool IsSubsetOf(const TVector<TTx>& x, const TVector<TTx>& superset) {
auto it = x.begin();
auto itSuperset = superset.begin();
while (it != x.end()) {
// Position superset to the lowerbound of the current TxId
while (itSuperset != superset.end() && itSuperset->TxId < it->TxId) {
++itSuperset;
}
if (itSuperset == superset.end() || it->TxId != itSuperset->TxId) {
return false;
}
++it;
++itSuperset;
}
return true;
}

static TString DumpTxIds(const TVector<TTx>& v) {
TStringBuilder stream;
stream << '{';
Expand All @@ -791,31 +769,38 @@ static TString DumpTxIds(const TVector<TTx>& v) {
return std::move(stream);
}

void TTxMediatorTabletQueue::TTabletEntry::MergeOutOfOrder(TStep* sx) {
if (!sx->OutOfOrder.empty()) {
// Since OutOfOrder is an update it might be missing some lost or
// already acknowledged transactions that we don't have to resend.
// We have previously validated that OutOfOrder ⊂ Transactions
sx->Transactions = std::move(sx->OutOfOrder);
}
}

void TTxMediatorTabletQueue::TTabletEntry::MergeToOutOfOrder(TStep* sx, TVector<TTx>&& update) {
// Update might be missing some lost or already acknowledged transactions
// that we don't have to resend later. We validate that update is a subset
// of a previously received step.
const TVector<TTx>& prev = sx->OutOfOrder.empty() ? sx->Transactions : sx->OutOfOrder;
if (Y_UNLIKELY(!IsSubsetOf(update, prev))) {
// Coordinator shouldn't add new transaction to existing steps, so we
// complain. However, even if that happens, it's ok for us to send
// those transactions later, or never.
void TTxMediatorTabletQueue::TTabletEntry::MergeOutOfOrder(TStep* sx, TVector<TTx>&& update) {
// Step transactions are a union from multiple coordinators, and the update
// is currently unacknowledged transactions from a single coordinator.
// The update must be a subset of the full transaction list and cannot
// introduce new transactions out of thin air.
auto dst = sx->Transactions.begin();
auto src = update.begin();
bool subset = true;
while (dst != sx->Transactions.end() && src != update.end()) {
if (dst->TxId < src->TxId) {
++dst;
continue;
}
if (Y_UNLIKELY(dst->TxId != src->TxId)) {
subset = false;
++src;
continue;
}
dst->AckTo = src->AckTo;
++dst;
++src;
}
if (Y_UNLIKELY(!subset) || Y_UNLIKELY(src != update.end())) {
// Coordinators shouldn't add new transactions to existing steps, so we
// complain. Even if that happens, however, it's ok for us to send
// those transactions later, or never. Currently we don't.
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_MEDIATOR_TABLETQUEUE,
"Received out-of-order step " << sx->StepRef->Step
<< " for tablet " << TabletId
<< " with transactions " << DumpTxIds(update)
<< " which are not a subset of previously received " << DumpTxIds(prev));
<< " which are not a subset of previously received " << DumpTxIds(sx->Transactions));
}
sx->OutOfOrder = std::move(update);
}

}
Expand Down

0 comments on commit e6e5842

Please sign in to comment.