diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index 76f47d0c0df0..2ce5cac7ae22 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -1539,12 +1539,10 @@ void ApplyChanges( } TRowVersion CommitWrites( - Tests::TServer::TPtr server, + TTestActorRuntime& runtime, const TVector& tables, ui64 writeTxId) { - auto& runtime = *server->GetRuntime(); - TActorId sender = runtime.AllocateEdgeActor(); { @@ -1571,6 +1569,14 @@ TRowVersion CommitWrites( return { step, txId }; } +TRowVersion CommitWrites( + Tests::TServer::TPtr server, + const TVector& tables, + ui64 writeTxId) +{ + return CommitWrites(*server->GetRuntime(), tables, writeTxId); +} + ui64 AsyncDropTable( Tests::TServer::TPtr server, TActorId sender, @@ -2615,6 +2621,7 @@ namespace { PRINT_PRIMITIVE(Datetime64); PRINT_PRIMITIVE(Timestamp64); PRINT_PRIMITIVE(String); + PRINT_PRIMITIVE(Utf8); PRINT_PRIMITIVE(DyNumber); default: @@ -2654,13 +2661,12 @@ namespace { } // namespace TReadShardedTableState StartReadShardedTable( - Tests::TServer::TPtr server, + TTestActorRuntime& runtime, const TString& path, TRowVersion snapshot, bool pause, bool ordered) { - auto& runtime = *server->GetRuntime(); auto sender = runtime.AllocateEdgeActor(); auto worker = runtime.Register(new TReadTableImpl(sender, path, snapshot, pause, ordered)); auto ev = runtime.GrabEdgeEventRethrow(sender); @@ -2671,6 +2677,16 @@ TReadShardedTableState StartReadShardedTable( return { sender, worker, result }; } +TReadShardedTableState StartReadShardedTable( + Tests::TServer::TPtr server, + const TString& path, + TRowVersion snapshot, + bool pause, + bool ordered) +{ + return StartReadShardedTable(*server->GetRuntime(), path, snapshot, pause, ordered); +} + void ResumeReadShardedTable( Tests::TServer::TPtr server, TReadShardedTableState& state) @@ -2681,12 +2697,20 @@ void ResumeReadShardedTable( state.Result = ev->Get()->Result; } +TString ReadShardedTable( + TTestActorRuntime& runtime, + const TString& path, + TRowVersion snapshot) +{ + return StartReadShardedTable(runtime, path, snapshot, /* pause = */ false).Result; +} + TString ReadShardedTable( Tests::TServer::TPtr server, const TString& path, TRowVersion snapshot) { - return StartReadShardedTable(server, path, snapshot, /* pause = */ false).Result; + return ReadShardedTable(*server->GetRuntime(), path, snapshot); } void SendViaPipeCache( diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index 6d6b973a81b2..b94c32cfca20 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -597,6 +597,10 @@ void ApplyChanges( NKikimrTxDataShard::TEvApplyReplicationChangesResult::EStatus expected = NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK); +TRowVersion CommitWrites( + TTestActorRuntime& runtime, + const TVector& tables, + ui64 writeTxId); TRowVersion CommitWrites( Tests::TServer::TPtr server, const TVector& tables, @@ -724,6 +728,12 @@ struct TReadShardedTableState { TString Result; }; +TReadShardedTableState StartReadShardedTable( + TTestActorRuntime& runtime, + const TString& path, + TRowVersion snapshot = TRowVersion::Max(), + bool pause = true, + bool ordered = true); TReadShardedTableState StartReadShardedTable( Tests::TServer::TPtr server, const TString& path, @@ -735,6 +745,10 @@ void ResumeReadShardedTable( Tests::TServer::TPtr server, TReadShardedTableState& state); +TString ReadShardedTable( + TTestActorRuntime& runtime, + const TString& path, + TRowVersion snapshot = TRowVersion::Max()); TString ReadShardedTable( Tests::TServer::TPtr server, const TString& path, diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp index 3840181b13e6..7c6c57dfc6e4 100644 --- a/ydb/core/tx/replication/service/base_table_writer.cpp +++ b/ydb/core/tx/replication/service/base_table_writer.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -447,14 +448,19 @@ class TLocalTableWriter if (auto it = TxIds.upper_bound(version); it != TxIds.end()) { record->RewriteTxId(it->second); + if (PendingTxId.empty()) { + records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size()); + } else { + BlockedRecords.insert(record->GetOrder()); + } } else { versionsWithoutTxId.insert(version); - PendingTxId[version].push_back(std::move(record)); - continue; + PendingTxId.insert(record->GetOrder()); } + } else { + records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size()); } - records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size()); Y_ABORT_UNLESS(PendingRecords.emplace(record->GetOrder(), std::move(record)).second); } @@ -485,18 +491,55 @@ class TLocalTableWriter const auto version = TRowVersion::FromProto(kv.GetVersion()); TxIds.emplace(version, kv.GetTxId()); - for (auto it = PendingTxId.begin(); it != PendingTxId.end();) { - if (it->first >= version) { - break; + auto pendingIt = PendingTxId.begin(); + auto blockedIt = BlockedRecords.begin(); + + auto processPending = [this, &records, &version, txId = kv.GetTxId()](auto& it) -> bool { + Y_ABORT_UNLESS(PendingRecords.contains(*it)); + auto& record = PendingRecords.at(*it); + + if (TRowVersion(record->GetStep(), record->GetTxId()) >= version) { + return false; } - for (auto& record : it->second) { - record->RewriteTxId(kv.GetTxId()); - records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size()); - Y_ABORT_UNLESS(PendingRecords.emplace(record->GetOrder(), std::move(record)).second); + record->RewriteTxId(txId); + records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size()); + + it = PendingTxId.erase(it); + return true; + }; + + auto processBlocked = [this, &records](auto& it) { + Y_ABORT_UNLESS(PendingRecords.contains(*it)); + auto& record = PendingRecords.at(*it); + + records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size()); + + it = BlockedRecords.erase(it); + }; + + while (pendingIt != PendingTxId.end() && blockedIt != BlockedRecords.end()) { + if (*pendingIt < *blockedIt) { + if (!processPending(pendingIt)) { + break; + } + } else { + processBlocked(blockedIt); } + } - PendingTxId.erase(it++); + if (blockedIt == BlockedRecords.end()) { + for (; pendingIt != PendingTxId.end();) { + if (!processPending(pendingIt)) { + break; + } + } + } + + if (pendingIt == PendingTxId.end()) { + for (; blockedIt != BlockedRecords.end();) { + processBlocked(blockedIt); + } } } @@ -624,9 +667,10 @@ class TLocalTableWriter bool Resolving = false; bool Initialized = false; - TMap PendingRecords; + THashMap PendingRecords; TMap TxIds; // key is non-inclusive right hand edge - TMap> PendingTxId; + TSet PendingTxId; + TSet BlockedRecords; TRowVersion PendingHeartbeat = TRowVersion::Min(); }; // TLocalTableWriter diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index 11b45e9c042d..28ac777cb59f 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -2,6 +2,7 @@ #include "table_writer.h" #include "worker.h" +#include #include #include @@ -9,6 +10,7 @@ #include #include +#include namespace NKikimr::NReplication::NService { @@ -187,9 +189,6 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvData("TestSource", { TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"), TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"), - })); - - env.Send(writer, new TEvWorker::TEvData("TestSource", { TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"), TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[22,0]})"), })); @@ -211,7 +210,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { } Y_UNIT_TEST(WaitTxIds) { - class TMockWorker: public TActorBootstrapped { + class TMockWorker: public TActor { void Handle(TEvWorker::TEvHandshake::TPtr& ev) { if (ev->Sender == Edge) { ev->Sender = SelfId(); @@ -232,14 +231,11 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { public: explicit TMockWorker(const TActorId& writer, const TActorId& edge) - : Writer(writer) + : TActor(&TThis::StateWork) + , Writer(writer) , Edge(edge) {} - void Bootstrap() { - Become(&TThis::StateWork); - } - STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { hFunc(TEvWorker::TEvHandshake, Handle); @@ -288,7 +284,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { } Y_UNIT_TEST(DataAlongWithHeartbeat) { - class TMockWorker: public TActorBootstrapped { + class TMockWorker: public TActor { void Handle(TEvWorker::TEvHandshake::TPtr& ev) { if (ev->Sender == Edge) { ev->Sender = SelfId(); @@ -315,14 +311,11 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { public: explicit TMockWorker(const TActorId& writer, const TActorId& edge) - : Writer(writer) + : TActor(&TThis::StateWork) + , Writer(writer) , Edge(edge) {} - void Bootstrap() { - Become(&TThis::StateWork); - } - STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { hFunc(TEvWorker::TEvHandshake, Handle); @@ -369,6 +362,50 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { })); env.GetRuntime().GrabEdgeEvent(env.GetSender()); } + + Y_UNIT_TEST(ApplyInCorrectOrder) { + TEnv env; + env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); + + env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{ + .Name = "Table", + .KeyColumns = {"key"}, + .Columns = { + {.Name = "key", .Type = "Uint32"}, + {.Name = "value", .Type = "Utf8"}, + }, + .ReplicationConfig = TTestTableDescription::TReplicationConfig{ + .Mode = TTestTableDescription::TReplicationConfig::MODE_READ_ONLY, + .ConsistencyLevel = TTestTableDescription::TReplicationConfig::CONSISTENCY_LEVEL_GLOBAL, + }, + })); + + auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"), EWriteMode::Consistent)); + env.Send(writer, new TEvWorker::TEvHandshake()); + + env.Send(writer, new TEvWorker::TEvData("TestSource", { + TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), + })); + env.Send(writer, MakeTxIdResult({ + {TRowVersion(10, 0), 1}, + })); + + env.Send(writer, new TEvWorker::TEvData("TestSource", { + TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"), + TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), + TRecord(4, R"({"resolved":[20,0]})"), + })); + env.Send(writer, MakeTxIdResult({ + {TRowVersion(20, 0), 2}, + })); + env.GetRuntime().GrabEdgeEvent(env.GetSender()); + + CommitWrites(env.GetRuntime(), {"/Root/Table"}, 1); + CommitWrites(env.GetRuntime(), {"/Root/Table"}, 2); + + auto content = ReadShardedTable(env.GetRuntime(), "/Root/Table"); + UNIT_ASSERT_STRINGS_EQUAL(StripInPlace(content), "key = 1, value = 10\nkey = 2, value = 20\nkey = 3, value = 30"); + } } } diff --git a/ydb/core/tx/replication/service/ut_table_writer/ya.make b/ydb/core/tx/replication/service/ut_table_writer/ya.make index acea000ba75b..a32cf0dad640 100644 --- a/ydb/core/tx/replication/service/ut_table_writer/ya.make +++ b/ydb/core/tx/replication/service/ut_table_writer/ya.make @@ -5,6 +5,7 @@ FORK_SUBTESTS() SIZE(MEDIUM) PEERDIR( + ydb/core/tx/datashard/ut_common ydb/core/tx/replication/ut_helpers library/cpp/string_utils/base64 library/cpp/testing/unittest diff --git a/ydb/core/tx/replication/ut_helpers/write_topic.h b/ydb/core/tx/replication/ut_helpers/write_topic.h index 8d384a8d653a..60bf89eac355 100644 --- a/ydb/core/tx/replication/ut_helpers/write_topic.h +++ b/ydb/core/tx/replication/ut_helpers/write_topic.h @@ -1,3 +1,5 @@ +#pragma once + #include namespace NKikimr::NReplication::NTestHelpers {