Skip to content

Commit

Permalink
Data integrity for OLAP (#13277)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Jan 13, 2025
1 parent e9ac0af commit 0249c59
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 7 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteHuge
ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall
ydb/core/kqp/ut/cost KqpCost.OlapWriteRow
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertEvWrite
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
ydb/core/kqp/ut/olap KqpOlap.ManyColumnShardsWithRestarts
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/data_integrity_trails/data_integrity_trails.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace NKikimr {
namespace NDataIntegrity {

inline void LogKeyValue(const TString& key, const TString& value, TStringStream& ss, bool last = false) {
inline void LogKeyValue(const TStringBuf key, const TStringBuf value, TStringStream& ss, bool last = false) {
ss << key << ": " << (value.empty() ? "Empty" : value) << (last ? "" : ",");
}

Expand Down
20 changes: 20 additions & 0 deletions ydb/core/kqp/common/kqp_data_integrity_trails.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <openssl/sha.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/kqp/common/events/events.h>
#include <library/cpp/string_utils/base64/base64.h>

#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
Expand Down Expand Up @@ -115,5 +116,24 @@ inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId));
}

// WriteActor,BufferActor
inline void LogIntegrityTrails(const TString& txType, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx, const TStringBuf component) {
auto log = [](const auto& type, const auto& txId, const auto& shardId, const auto component) {
TStringStream ss;
LogKeyValue("Component", component, ss);
LogKeyValue("PhyTxId", ToString(txId), ss);

if (shardId) {
LogKeyValue("ShardId", ToString(*shardId), ss);
}

LogKeyValue("Type", type, ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txId, shardId, component));
}

}
}
11 changes: 9 additions & 2 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
#include <ydb/core/kqp/common/buffer/buffer.h>
#include <ydb/core/kqp/common/kqp_data_integrity_trails.h>
#include <ydb/core/kqp/common/kqp_tx_manager.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
Expand Down Expand Up @@ -888,6 +889,8 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
Counters->WriteActorImmediateWritesRetries->Inc();
}

NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWrite->Record.GetTxId(), shardId, TlsActivationContext->AsActorContext(), "WriteActor");

CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", isPrepare=" << isPrepare << ", isImmediateCommit=" << isImmediateCommit << ", TxId=" << evWrite->Record.GetTxId()
<< ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId()
<< ", Locks= " << [&]() {
Expand Down Expand Up @@ -1714,6 +1717,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager);
}

NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWrite->Record.GetTxId(), shardId, TlsActivationContext->AsActorContext(), "BufferActor");

SendTime[shardId] = TInstant::Now();
CA_LOG_D("Send EvWrite (external) to ShardID=" << shardId << ", isPrepare=" << !isRollback << ", isImmediateCommit=" << isRollback << ", TxId=" << evWrite->Record.GetTxId()
<< ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId()
Expand All @@ -1733,7 +1738,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
NKikimr::MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(evWrite.release(), shardId, /* subscribe */ true),
IEventHandle::FlagTrackDelivery,
0);
0,
BufferWriteActor.GetTraceId());
}
}

Expand Down Expand Up @@ -1816,7 +1822,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
item.SetFlags(shardInfo.AffectedFlags);
}

//TODO: NDataIntegrity
NDataIntegrity::LogIntegrityTrails("PlannedTx", *TxId, {}, TlsActivationContext->AsActorContext(), "BufferActor");

CA_LOG_D("Execute planned transaction, coordinator: " << commitInfo.Coordinator
<< ", volitale: " << ((transaction.GetFlags() & TEvTxProxy::TEvProposeTransaction::FlagVolatile) != 0)
<< ", shards: " << affectedSet.size());
Expand Down
71 changes: 68 additions & 3 deletions ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,79 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

// check executer logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 2);
// check write actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
// check datashard logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 4);
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
}

Y_UNIT_TEST_TWIN(UpsertEvWriteQueryService, isOlap) {
NKikimrConfig::TAppConfig AppConfig;
if (!isOlap) {
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
} else {
AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
}
TKikimrSettings serverSettings = TKikimrSettings().SetAppConfig(AppConfig);
TStringStream ss;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);

auto db = kikimr.GetQueryClient();
auto session = db.GetSession().GetValueSync().GetSession();

{
const TString query = Sprintf(R"(
CREATE TABLE `/Root/test_evwrite` (
Key Int64 NOT NULL,
Value String,
primary key (Key)
) WITH (STORE=%s);
)", isOlap ? "COLUMN" : "ROW");

auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

ss.Clear();

auto result = session.ExecuteQuery(R"(
--!syntax_v1
UPSERT INTO `/Root/test_evwrite` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

if (!isOlap) {
// check write actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 1);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
// check datashard logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
} else {
// check write actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 3);
// check executer logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 4);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
// check columnshard logs
// ColumnShard doesn't have integrity logs.
// UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: ColumnShard"), 6);
}
}

Y_UNIT_TEST(Ddl) {
Expand Down

0 comments on commit 0249c59

Please sign in to comment.