Skip to content

Commit

Permalink
Apply changes in correct order (#13232)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jan 9, 2025
1 parent c456fce commit 14a7b7e
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 34 deletions.
36 changes: 30 additions & 6 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1539,12 +1539,10 @@ void ApplyChanges(
}

TRowVersion CommitWrites(
Tests::TServer::TPtr server,
TTestActorRuntime& runtime,
const TVector<TString>& tables,
ui64 writeTxId)
{
auto& runtime = *server->GetRuntime();

TActorId sender = runtime.AllocateEdgeActor();

{
Expand All @@ -1571,6 +1569,14 @@ TRowVersion CommitWrites(
return { step, txId };
}

TRowVersion CommitWrites(
Tests::TServer::TPtr server,
const TVector<TString>& tables,
ui64 writeTxId)
{
return CommitWrites(*server->GetRuntime(), tables, writeTxId);
}

ui64 AsyncDropTable(
Tests::TServer::TPtr server,
TActorId sender,
Expand Down Expand Up @@ -2615,6 +2621,7 @@ namespace {
PRINT_PRIMITIVE(Datetime64);
PRINT_PRIMITIVE(Timestamp64);
PRINT_PRIMITIVE(String);
PRINT_PRIMITIVE(Utf8);
PRINT_PRIMITIVE(DyNumber);

default:
Expand Down Expand Up @@ -2654,13 +2661,12 @@ namespace {
} // namespace

TReadShardedTableState StartReadShardedTable(
Tests::TServer::TPtr server,
TTestActorRuntime& runtime,
const TString& path,
TRowVersion snapshot,
bool pause,
bool ordered)
{
auto& runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();
auto worker = runtime.Register(new TReadTableImpl(sender, path, snapshot, pause, ordered));
auto ev = runtime.GrabEdgeEventRethrow<TReadTableImpl::TEvResult>(sender);
Expand All @@ -2671,6 +2677,16 @@ TReadShardedTableState StartReadShardedTable(
return { sender, worker, result };
}

TReadShardedTableState StartReadShardedTable(
Tests::TServer::TPtr server,
const TString& path,
TRowVersion snapshot,
bool pause,
bool ordered)
{
return StartReadShardedTable(*server->GetRuntime(), path, snapshot, pause, ordered);
}

void ResumeReadShardedTable(
Tests::TServer::TPtr server,
TReadShardedTableState& state)
Expand All @@ -2681,12 +2697,20 @@ void ResumeReadShardedTable(
state.Result = ev->Get()->Result;
}

TString ReadShardedTable(
TTestActorRuntime& runtime,
const TString& path,
TRowVersion snapshot)
{
return StartReadShardedTable(runtime, path, snapshot, /* pause = */ false).Result;
}

TString ReadShardedTable(
Tests::TServer::TPtr server,
const TString& path,
TRowVersion snapshot)
{
return StartReadShardedTable(server, path, snapshot, /* pause = */ false).Result;
return ReadShardedTable(*server->GetRuntime(), path, snapshot);
}

void SendViaPipeCache(
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ void ApplyChanges(
NKikimrTxDataShard::TEvApplyReplicationChangesResult::EStatus expected =
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK);

TRowVersion CommitWrites(
TTestActorRuntime& runtime,
const TVector<TString>& tables,
ui64 writeTxId);
TRowVersion CommitWrites(
Tests::TServer::TPtr server,
const TVector<TString>& tables,
Expand Down Expand Up @@ -724,6 +728,12 @@ struct TReadShardedTableState {
TString Result;
};

TReadShardedTableState StartReadShardedTable(
TTestActorRuntime& runtime,
const TString& path,
TRowVersion snapshot = TRowVersion::Max(),
bool pause = true,
bool ordered = true);
TReadShardedTableState StartReadShardedTable(
Tests::TServer::TPtr server,
const TString& path,
Expand All @@ -735,6 +745,10 @@ void ResumeReadShardedTable(
Tests::TServer::TPtr server,
TReadShardedTableState& state);

TString ReadShardedTable(
TTestActorRuntime& runtime,
const TString& path,
TRowVersion snapshot = TRowVersion::Max());
TString ReadShardedTable(
Tests::TServer::TPtr server,
const TString& path,
Expand Down
70 changes: 57 additions & 13 deletions ydb/core/tx/replication/service/base_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/services/services.pb.h>

#include <util/generic/hash.h>
#include <util/generic/map.h>
#include <util/generic/maybe.h>
#include <util/generic/set.h>
Expand Down Expand Up @@ -447,14 +448,19 @@ class TLocalTableWriter

if (auto it = TxIds.upper_bound(version); it != TxIds.end()) {
record->RewriteTxId(it->second);
if (PendingTxId.empty()) {
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
} else {
BlockedRecords.insert(record->GetOrder());
}
} else {
versionsWithoutTxId.insert(version);
PendingTxId[version].push_back(std::move(record));
continue;
PendingTxId.insert(record->GetOrder());
}
} else {
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
}

records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
Y_ABORT_UNLESS(PendingRecords.emplace(record->GetOrder(), std::move(record)).second);
}

Expand Down Expand Up @@ -485,18 +491,55 @@ class TLocalTableWriter
const auto version = TRowVersion::FromProto(kv.GetVersion());
TxIds.emplace(version, kv.GetTxId());

for (auto it = PendingTxId.begin(); it != PendingTxId.end();) {
if (it->first >= version) {
break;
auto pendingIt = PendingTxId.begin();
auto blockedIt = BlockedRecords.begin();

auto processPending = [this, &records, &version, txId = kv.GetTxId()](auto& it) -> bool {
Y_ABORT_UNLESS(PendingRecords.contains(*it));
auto& record = PendingRecords.at(*it);

if (TRowVersion(record->GetStep(), record->GetTxId()) >= version) {
return false;
}

for (auto& record : it->second) {
record->RewriteTxId(kv.GetTxId());
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());
Y_ABORT_UNLESS(PendingRecords.emplace(record->GetOrder(), std::move(record)).second);
record->RewriteTxId(txId);
records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());

it = PendingTxId.erase(it);
return true;
};

auto processBlocked = [this, &records](auto& it) {
Y_ABORT_UNLESS(PendingRecords.contains(*it));
auto& record = PendingRecords.at(*it);

records.emplace_back(record->GetOrder(), TablePathId, record->GetBody().size());

it = BlockedRecords.erase(it);
};

while (pendingIt != PendingTxId.end() && blockedIt != BlockedRecords.end()) {
if (*pendingIt < *blockedIt) {
if (!processPending(pendingIt)) {
break;
}
} else {
processBlocked(blockedIt);
}
}

PendingTxId.erase(it++);
if (blockedIt == BlockedRecords.end()) {
for (; pendingIt != PendingTxId.end();) {
if (!processPending(pendingIt)) {
break;
}
}
}

if (pendingIt == PendingTxId.end()) {
for (; blockedIt != BlockedRecords.end();) {
processBlocked(blockedIt);
}
}
}

Expand Down Expand Up @@ -624,9 +667,10 @@ class TLocalTableWriter
bool Resolving = false;
bool Initialized = false;

TMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingRecords;
THashMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingRecords;
TMap<TRowVersion, ui64> TxIds; // key is non-inclusive right hand edge
TMap<TRowVersion, TVector<NChangeExchange::IChangeRecord::TPtr>> PendingTxId;
TSet<ui64> PendingTxId;
TSet<ui64> BlockedRecords;
TRowVersion PendingHeartbeat = TRowVersion::Min();

}; // TLocalTableWriter
Expand Down
67 changes: 52 additions & 15 deletions ydb/core/tx/replication/service/table_writer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
#include "table_writer.h"
#include "worker.h"

#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include <ydb/core/tx/replication/ut_helpers/test_env.h>
#include <ydb/core/tx/replication/ut_helpers/test_table.h>

#include <library/cpp/string_utils/base64/base64.h>
#include <library/cpp/testing/unittest/registar.h>

#include <util/string/printf.h>
#include <util/string/strip.h>

namespace NKikimr::NReplication::NService {

Expand Down Expand Up @@ -187,9 +189,6 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"),
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"),
}));

env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"),
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[22,0]})"),
}));
Expand All @@ -211,7 +210,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
}

Y_UNIT_TEST(WaitTxIds) {
class TMockWorker: public TActorBootstrapped<TMockWorker> {
class TMockWorker: public TActor<TMockWorker> {
void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
if (ev->Sender == Edge) {
ev->Sender = SelfId();
Expand All @@ -232,14 +231,11 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {

public:
explicit TMockWorker(const TActorId& writer, const TActorId& edge)
: Writer(writer)
: TActor(&TThis::StateWork)
, Writer(writer)
, Edge(edge)
{}

void Bootstrap() {
Become(&TThis::StateWork);
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
Expand Down Expand Up @@ -288,7 +284,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
}

Y_UNIT_TEST(DataAlongWithHeartbeat) {
class TMockWorker: public TActorBootstrapped<TMockWorker> {
class TMockWorker: public TActor<TMockWorker> {
void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
if (ev->Sender == Edge) {
ev->Sender = SelfId();
Expand All @@ -315,14 +311,11 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {

public:
explicit TMockWorker(const TActorId& writer, const TActorId& edge)
: Writer(writer)
: TActor(&TThis::StateWork)
, Writer(writer)
, Edge(edge)
{}

void Bootstrap() {
Become(&TThis::StateWork);
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
Expand Down Expand Up @@ -369,6 +362,50 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
}));
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
}

Y_UNIT_TEST(ApplyInCorrectOrder) {
TEnv env;
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG);

env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{
.Name = "Table",
.KeyColumns = {"key"},
.Columns = {
{.Name = "key", .Type = "Uint32"},
{.Name = "value", .Type = "Utf8"},
},
.ReplicationConfig = TTestTableDescription::TReplicationConfig{
.Mode = TTestTableDescription::TReplicationConfig::MODE_READ_ONLY,
.ConsistencyLevel = TTestTableDescription::TReplicationConfig::CONSISTENCY_LEVEL_GLOBAL,
},
}));

auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"), EWriteMode::Consistent));
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());

env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
}));
env.Send<TEvWorker::TEvPoll>(writer, MakeTxIdResult({
{TRowVersion(10, 0), 1},
}));

env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"),
TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
TRecord(4, R"({"resolved":[20,0]})"),
}));
env.Send<TEvService::TEvHeartbeat>(writer, MakeTxIdResult({
{TRowVersion(20, 0), 2},
}));
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());

CommitWrites(env.GetRuntime(), {"/Root/Table"}, 1);
CommitWrites(env.GetRuntime(), {"/Root/Table"}, 2);

auto content = ReadShardedTable(env.GetRuntime(), "/Root/Table");
UNIT_ASSERT_STRINGS_EQUAL(StripInPlace(content), "key = 1, value = 10\nkey = 2, value = 20\nkey = 3, value = 30");
}
}

}
1 change: 1 addition & 0 deletions ydb/core/tx/replication/service/ut_table_writer/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ FORK_SUBTESTS()
SIZE(MEDIUM)

PEERDIR(
ydb/core/tx/datashard/ut_common
ydb/core/tx/replication/ut_helpers
library/cpp/string_utils/base64
library/cpp/testing/unittest
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/replication/ut_helpers/write_topic.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

namespace NKikimr::NReplication::NTestHelpers {
Expand Down

0 comments on commit 14a7b7e

Please sign in to comment.