diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 3bffb2eb19e7..78515c88fc38 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -49,6 +49,8 @@ ydb/core/kqp/ut/olap [*/*] chunk chunk ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore ydb/core/kqp/ut/query KqpLimits.QueryExecTimeoutCancel ydb/core/kqp/ut/query KqpStats.SysViewClientLost +ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictWrite* +ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictRead* ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication diff --git a/ydb/core/data_integrity_trails/data_integrity_trails.h b/ydb/core/data_integrity_trails/data_integrity_trails.h index 003505d5dc17..1ec566048a51 100644 --- a/ydb/core/data_integrity_trails/data_integrity_trails.h +++ b/ydb/core/data_integrity_trails/data_integrity_trails.h @@ -27,6 +27,9 @@ inline void LogTxSettings(const TransactionSettings& txSettings, TStringStream& case TransactionSettings::kSnapshotReadOnly: LogKeyValue("TxMode", "SnapshotReadOnly", ss); break; + case TransactionSettings::kSnapshotReadWrite: + LogKeyValue("TxMode", "SnapshotReadWrite", ss); + break; case TransactionSettings::TX_MODE_NOT_SET: LogKeyValue("TxMode", "Undefined", ss); break; diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 1730d30a78f0..6e2164b8b200 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -64,6 +64,9 @@ bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::Tra case Ydb::Query::TransactionSettings::kSnapshotReadOnly: to.mutable_snapshot_read_only(); break; + case Ydb::Query::TransactionSettings::kSnapshotReadWrite: + to.mutable_snapshot_read_write(); + break; default: issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Invalid tx_settings")); diff --git a/ydb/core/grpc_services/query/rpc_kqp_tx.cpp b/ydb/core/grpc_services/query/rpc_kqp_tx.cpp index b9d6e9385a85..888380c22aee 100644 --- a/ydb/core/grpc_services/query/rpc_kqp_tx.cpp +++ b/ydb/core/grpc_services/query/rpc_kqp_tx.cpp @@ -104,6 +104,9 @@ class TBeginTransactionRPC : public TActorBootstrapped { case Ydb::Query::TransactionSettings::kSnapshotReadOnly: ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_snapshot_read_only(); break; + case Ydb::Query::TransactionSettings::kSnapshotReadWrite: + ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_snapshot_read_write(); + break; } ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX); diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index 4d9f7ab90574..e9b02b093c37 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -158,7 +158,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig Y_UNUSED(config); if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE && - *txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO) + *txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO && + *txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) return false; if (txCtx.GetSnapshot().IsValid()) @@ -211,26 +212,42 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig YQL_ENSURE(!hasSinkWrite || hasEffects); - // We don't want snapshot when there are effects at the moment, - // because it hurts performance when there are multiple single-shard - // reads and a single distributed commit. Taking snapshot costs - // similar to an additional distributed transaction, and it's very - // hard to predict when that happens, causing performance - // degradation. - if (hasEffects) { - return false; - } - // We need snapshot for stream lookup, besause it's used for dependent reads if (hasStreamLookup) { return true; } + if (*txCtx.EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) { + if (hasEffects && !txCtx.HasTableRead) { + YQL_ENSURE(txCtx.HasTableWrite); + // Don't need snapshot for WriteOnly transaction. + return false; + } else if (hasEffects) { + YQL_ENSURE(txCtx.HasTableWrite); + // ReadWrite transaction => need snapshot + return true; + } + // ReadOnly transaction here + } else { + // We don't want snapshot when there are effects at the moment, + // because it hurts performance when there are multiple single-shard + // reads and a single distributed commit. Taking snapshot costs + // similar to an additional distributed transaction, and it's very + // hard to predict when that happens, causing performance + // degradation. + if (hasEffects) { + return false; + } + } + + YQL_ENSURE(!hasEffects && !hasStreamLookup); + // We need snapshot when there are multiple table read phases, most // likely it involves multiple tables and we would have to use a // distributed commit otherwise. Taking snapshot helps as avoid TLI // for read-only transactions, and costs less than a final distributed // commit. + // NOTE: In case of read from single shard, we won't take snapshot. return readPhases > 1; } diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index 8d324e0ae0ec..d7353de0540a 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -235,6 +235,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { HasOlapTable = false; HasOltpTable = false; HasTableWrite = false; + HasTableRead = false; NeedUncommittedChangesFlush = false; } @@ -264,16 +265,24 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { Readonly = true; break; + case Ydb::Table::TransactionSettings::kSnapshotReadWrite: + EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW; + Readonly = false; + break; + case Ydb::Table::TransactionSettings::TX_MODE_NOT_SET: YQL_ENSURE(false, "tx_mode not set, settings: " << settings); break; }; } - bool ShouldExecuteDeferredEffects() const { + bool ShouldExecuteDeferredEffects(const TKqpPhyTxHolder::TConstPtr& tx) const { if (NeedUncommittedChangesFlush || HasOlapTable) { return !DeferredEffects.Empty(); } + if (EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW && !tx && HasTableRead) { + return !DeferredEffects.Empty(); + } return false; } @@ -343,6 +352,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { bool HasOlapTable = false; bool HasOltpTable = false; bool HasTableWrite = false; + bool HasTableRead = false; bool NeedUncommittedChangesFlush = false; THashSet ModifiedTablesSinceLastFlush; diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index d072d9e642c0..2e680ae3741c 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -652,6 +652,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel(); kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding(); kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes()); + kqpConfig.EnableSnapshotIsolationRW = serviceConfig.GetEnableSnapshotIsolationRW(); if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) { kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U)); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 115222e46388..b777eb5133ac 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -315,6 +315,8 @@ class TKqpCompileService : public TActorBootstrapped { TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes(); + bool enableSnapshotIsolationRW = TableServiceConfig.GetEnableSnapshotIsolationRW(); + TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig()); LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config"); @@ -346,7 +348,8 @@ class TKqpCompileService : public TActorBootstrapped { TableServiceConfig.GetEnableAstCache() != enableAstCache || TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes || TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams || - TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution) { + TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution || + TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW) { QueryCache->Clear(); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 8e5295d15d08..0b3612aa7975 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -131,20 +131,20 @@ namespace NKikimr::NKqp { using namespace NYql::NDq; using namespace NYql::NDqProto; -IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe lockTxId, ui32 lockNodeId, +IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, TComputeActorSchedulingOptions schedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode) { - return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory), + return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, std::move(traceId), std::move(arena), mode); } IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector&& computeActors, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings, - const ui64 txId, TMaybe lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId) { - return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, meta, shardsScanningPolicy, counters, std::move(traceId)); + const ui64 txId, TMaybe lockTxId, ui32 lockNodeId, TMaybe lockMode, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId) { + return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode, meta, shardsScanningPolicy, counters, std::move(traceId)); } } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 265332afbb8e..670481a20c19 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -52,14 +52,15 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode); -IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe lockTxId, ui32 lockNodeId, - NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, +IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task, + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings, + const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode); IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector&& computeActors, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings, - const ui64 txId, TMaybe lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId); + const ui64 txId, TMaybe lockTxId, ui32 lockNodeId, TMaybe lockMode, + const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId); NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory( TIntrusivePtr counters, diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 0df7f95021a5..59b2ef6fe181 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -212,8 +212,9 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { YQL_ENSURE(args.ComputesByStages); auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()); - IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.LockTxId, args.LockNodeId, args.Task, - AsyncIoFactory, runtimeSettings, memoryLimits, + IActor* computeActor = CreateKqpScanComputeActor( + args.ExecuterId, args.TxId, + args.Task, AsyncIoFactory, runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), std::move(args.SchedulingOptions), args.BlockTrackingMode); TActorId result = TlsActivationContext->Register(computeActor); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h index 7dbf222bd5e4..2dcbf7cb1b71 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h @@ -110,6 +110,7 @@ struct IKqpNodeComputeActorFactory { const ui64 TxId; const TMaybe LockTxId; const ui32 LockNodeId; + const TMaybe LockMode; NYql::NDqProto::TDqTask* Task; TIntrusivePtr TxInfo; const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 7e6c8c136cb9..86a6dd676c4d 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -23,15 +23,13 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50); } // anonymous namespace -TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, TMaybe lockTxId, ui32 lockNodeId, +TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, EBlockTrackingMode mode) : TBase(std::move(cpuOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena)) , ComputeCtx(settings.StatsMode) - , LockTxId(lockTxId) - , LockNodeId(lockNodeId) , BlockTrackingMode(mode) { InitializeTask(); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h index ae13ce8a4058..fd9889f956d5 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h @@ -25,8 +25,6 @@ class TKqpScanComputeActor: public TSchedulableComputeActorBase Fetchers; NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr; - const TMaybe LockTxId; - const ui32 LockNodeId; struct TLockHash { bool operator()(const NKikimrDataEvents::TLock& lock) { @@ -70,7 +68,7 @@ class TKqpScanComputeActor: public TSchedulableComputeActorBase lockTxId, ui32 lockNodeId, + TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, EBlockTrackingMode mode); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index 087f4979b404..8faead8aa320 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -23,7 +23,8 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3; TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, - const TComputeRuntimeSettings& settings, std::vector&& computeActors, const ui64 txId, const TMaybe lockTxId, const ui32 lockNodeId, + const TComputeRuntimeSettings& settings, std::vector&& computeActors, + const ui64 txId, const TMaybe lockTxId, const ui32 lockNodeId, const TMaybe lockMode, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId) : Meta(meta) @@ -32,6 +33,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps , TxId(txId) , LockTxId(lockTxId) , LockNodeId(lockNodeId) + , LockMode(lockMode) , ComputeActorIds(std::move(computeActors)) , Snapshot(snapshot) , ShardsScanningPolicy(shardsScanningPolicy) @@ -449,6 +451,9 @@ std::unique_ptr TKqpScanFetcherActor::BuildEv ev->Record.SetTxId(std::get(TxId)); if (LockTxId) { ev->Record.SetLockTxId(*LockTxId); + if (LockMode) { + ev->Record.SetLockMode(*LockMode); + } } ev->Record.SetLockNodeId(LockNodeId); ev->Record.SetTablePath(ScanDataMeta.TablePath); @@ -499,8 +504,6 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData state->LastKey = std::move(msg.LastKey); state->LastCursorProto = std::move(msg.LastCursorProto); const ui64 rowsCount = msg.GetRowsCount(); - AFL_ENSURE(!LockTxId || !msg.LocksInfo.Locks.empty() || !msg.LocksInfo.BrokenLocks.empty()); - AFL_ENSURE(LockTxId || (msg.LocksInfo.Locks.empty() && msg.LocksInfo.BrokenLocks.empty())); AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action", "got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached) ("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount()) ("from", ev->Sender)("shards remain", PendingShards.size()) diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h index d513939b4e18..73ead0a5ef0e 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h @@ -52,13 +52,16 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped LockTxId; const ui32 LockNodeId; + const TMaybe LockMode; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_SCAN_FETCH_ACTOR; } TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const NYql::NDq::TComputeRuntimeSettings& settings, - std::vector&& computeActors, const ui64 txId, const TMaybe lockTxId, const ui32 lockNodeId, + std::vector&& computeActors, + const ui64 txId, const TMaybe lockTxId, const ui32 lockNodeId, const TMaybe lockMode, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId); diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 9172c245f8a6..30854284d07b 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -156,9 +156,13 @@ class TKqpDataExecuter : public TKqpExecuterBase evData; - if (GetSnapshot().IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) { + if (GetSnapshot().IsValid() + && (ReadOnlyTx + || Request.UseImmediateEffects + || (Request.LocksOp == ELocksOp::Unspecified + && TasksGraph.GetMeta().LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION))) { evData.reset(new TEvDataShard::TEvProposeTransaction( NKikimrTxDataShard::TX_KIND_DATA, SelfId(), @@ -2664,6 +2675,7 @@ class TKqpDataExecuter : public TKqpExecuterBase { TasksGraph.GetMeta().SetLockTxId(lockTxId); TasksGraph.GetMeta().SetLockNodeId(SelfId().NodeId()); + switch (Request.IsolationLevel) { + case NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW: + TasksGraph.GetMeta().SetLockMode(NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION); + break; + default: + TasksGraph.GetMeta().SetLockMode(NKikimrDataEvents::OPTIMISTIC); + break; + } + LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId); if (IsDebugLogEnabled()) { for (auto& tx : Request.Transactions) { @@ -953,6 +962,16 @@ class TKqpExecuterBase : public TActorBootstrapped { if (!settings.GetInconsistentTx() && !settings.GetIsOlap()) { ActorIdToProto(BufferActorId, settings.MutableBufferActorId()); } + if (!settings.GetInconsistentTx() + && TasksGraph.GetMeta().LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION + && GetSnapshot().IsValid()) { + settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step); + settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId); + } + if (!settings.GetInconsistentTx() && TasksGraph.GetMeta().LockMode) { + settings.SetLockMode(*TasksGraph.GetMeta().LockMode); + } + output.SinkSettings.ConstructInPlace(); output.SinkSettings->PackFrom(settings); } else { @@ -1213,6 +1232,10 @@ class TKqpExecuterBase : public TActorBootstrapped { settings->SetLockNodeId(self.NodeId()); } + if (TasksGraph.GetMeta().LockMode) { + settings->SetLockMode(*TasksGraph.GetMeta().LockMode); + } + createdTasksIds.push_back(task.Id); return task; }; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 09d507607834..a126f30eeae9 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -85,6 +85,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) : TxId(args.TxId) , LockTxId(args.LockTxId) , LockNodeId(args.LockNodeId) + , LockMode(args.LockMode) , ExecuterId(args.Executer) , Snapshot(args.Snapshot) , Database(args.Database) @@ -207,6 +208,9 @@ std::unique_ptr TKqpPlanner::SerializeReque request.SetLockTxId(*LockTxId); request.SetLockNodeId(LockNodeId); } + if (LockMode) { + request.SetLockMode(*LockMode); + } ActorIdToProto(ExecuterId, request.MutableExecuterActorId()); if (Deadline) { @@ -487,6 +491,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) .TxId = TxId, .LockTxId = LockTxId, .LockNodeId = LockNodeId, + .LockMode = LockMode, .Task = taskDesc, .TxInfo = TxInfo, .RuntimeSettings = settings, diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index a556b8198e6e..9297002b7456 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -45,6 +45,7 @@ class TKqpPlanner { const ui64 TxId; const TMaybe LockTxId; const ui32 LockNodeId; + const TMaybe LockMode; const TActorId& Executer; const IKqpGateway::TKqpSnapshot& Snapshot; const TString& Database; @@ -107,6 +108,7 @@ class TKqpPlanner { const ui64 TxId; const TMaybe LockTxId; const ui32 LockNodeId; + const TMaybe LockMode; const TActorId ExecuterId; TVector ComputeTasks; THashMap> TasksPerNode; diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 2e2ac0eda0b1..0b8b458a6e10 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -1138,6 +1138,9 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput& input.Meta.StreamLookupSettings->SetLockTxId(*lockTxId); input.Meta.StreamLookupSettings->SetLockNodeId(tasksGraph.GetMeta().LockNodeId); } + if (tasksGraph.GetMeta().LockMode) { + input.Meta.StreamLookupSettings->SetLockMode(*tasksGraph.GetMeta().LockMode); + } transformProto->MutableSettings()->PackFrom(*input.Meta.StreamLookupSettings); } else if (input.Meta.SequencerSettings) { transformProto->MutableSettings()->PackFrom(*input.Meta.SequencerSettings); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 6c0c1c729e73..5ec0dba1f047 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -92,6 +92,7 @@ struct TGraphMeta { IKqpGateway::TKqpSnapshot Snapshot; TMaybe LockTxId; ui32 LockNodeId; + TMaybe LockMode; std::unordered_map ResultChannelProxies; TActorId ExecuterId; bool UseFollowers = false; @@ -122,6 +123,10 @@ struct TGraphMeta { void SetLockNodeId(ui32 lockNodeId) { LockNodeId = lockNodeId; } + + void SetLockMode(NKikimrDataEvents::ELockMode lockMode) { + LockMode = lockMode; + } }; struct TTaskInputMeta { diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 0d6dc18e042a..3010c0a94fb5 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -153,6 +153,9 @@ class TKqpNodeService : public TActorBootstrapped { ? TMaybe(msg.GetLockTxId()) : Nothing(); ui32 lockNodeId = msg.GetLockNodeId(); + TMaybe lockMode = msg.HasLockMode() + ? TMaybe(msg.GetLockMode()) + : Nothing(); YQL_ENSURE(msg.GetStartAllOrFail()); // todo: support partial start @@ -259,6 +262,7 @@ class TKqpNodeService : public TActorBootstrapped { .TxId = txId, .LockTxId = lockTxId, .LockNodeId = lockNodeId, + .LockMode = lockMode, .Task = &dqTask, .TxInfo = txInfo, .RuntimeSettings = runtimeSettingsBase, @@ -305,7 +309,8 @@ class TKqpNodeService : public TActorBootstrapped { for (auto&& i : computesByStage) { for (auto&& m : i.second.MutableMetaInfo()) { Register(CreateKqpScanFetcher(msg.GetSnapshot(), std::move(m.MutableActorIds()), - m.GetMeta(), runtimeSettingsBase, txId, lockTxId, lockNodeId, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId))); + m.GetMeta(), runtimeSettingsBase, txId, lockTxId, lockNodeId, lockMode, + scanPolicy, Counters, NWilson::TTraceId(ev->TraceId))); } } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 11a6f8d2b1c3..56d7d0ad2070 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -178,6 +178,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableConstantFolding = true; ui64 DefaultEnableSpillingNodes = 0; bool EnableAntlr4Parser = false; + bool EnableSnapshotIsolationRW = false; void SetDefaultEnabledSpillingNodes(const TString& node); ui64 GetEnabledSpillingNodes() const; diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 42d6075e64e7..fe1669341609 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -836,6 +836,9 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq if (Settings->HasLockTxId() && BrokenLocks.empty()) { record.SetLockTxId(Settings->GetLockTxId()); + if (Settings->HasLockMode()) { + ev->Record.SetLockMode(Settings->GetLockMode()); + } } if (Settings->HasLockNodeId()) { diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index d43222283bae..d52e826fd378 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -39,6 +39,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped()) , NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe()) + , LockMode(settings.HasLockMode() ? settings.GetLockMode() : TMaybe()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) , LookupStrategy(settings.GetLookupStrategy()) , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc)) @@ -498,6 +499,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped LockTxId; const TMaybe NodeLockId; + const TMaybe LockMode; std::unordered_map Reads; std::unordered_map ReadsPerShard; std::shared_ptr> Partitioning; diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 8dd6dda3a2ba..a5244a8550e2 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -157,7 +157,6 @@ struct TKqpTableWriterStatistics { THashSet AffectedPartitions; }; - class TKqpTableWriteActor : public TActorBootstrapped { using TBase = TActorBootstrapped; @@ -202,12 +201,16 @@ class TKqpTableWriteActor : public TActorBootstrapped { TVector keyColumnTypes, const NMiniKQL::TTypeEnvironment& typeEnv, std::shared_ptr alloc, + const std::optional& mvccSnapshot, + const NKikimrDataEvents::ELockMode lockMode, const IKqpTransactionManagerPtr& txManager, const TActorId sessionActorId, TIntrusivePtr counters, NWilson::TTraceId traceId) : TypeEnv(typeEnv) , Alloc(alloc) + , MvccSnapshot(mvccSnapshot) + , LockMode(lockMode) , TableId(tableId) , TablePath(tablePath) , LockTxId(lockTxId) @@ -850,6 +853,12 @@ class TKqpTableWriteActor : public TActorBootstrapped { FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager); } else if (!InconsistentTx) { evWrite->SetLockId(LockTxId, LockNodeId); + evWrite->Record.SetLockMode(LockMode); + + if (LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION) { + YQL_ENSURE(MvccSnapshot); + *evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot; + } } const auto serializationResult = ShardedWriteController->SerializeMessageToPayload(shardId, *evWrite); @@ -1041,6 +1050,9 @@ class TKqpTableWriteActor : public TActorBootstrapped { const NMiniKQL::TTypeEnvironment& TypeEnv; std::shared_ptr Alloc; + const std::optional MvccSnapshot; + const NKikimrDataEvents::ELockMode LockMode; + const TTableId TableId; const TString TablePath; @@ -1133,6 +1145,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu std::move(keyColumnTypes), TypeEnv, Alloc, + Settings.GetMvccSnapshot(), + Settings.GetLockMode(), nullptr, TActorId{}, Counters, @@ -1337,6 +1351,8 @@ struct TTransactionSettings { ui64 LockTxId = 0; ui64 LockNodeId = 0; bool InconsistentTx = false; + std::optional MvccSnapshot; + NKikimrDataEvents::ELockMode LockMode; }; struct TWriteSettings { @@ -1470,6 +1486,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub std::move(keyColumnTypes), TypeEnv, Alloc, + settings.TransactionSettings.MvccSnapshot, + settings.TransactionSettings.LockMode, TxManager, SessionActorId, Counters, @@ -2475,6 +2493,8 @@ class TKqpForwardWriteActor : public TActorBootstrapped, .LockTxId = Settings.GetLockTxId(), .LockNodeId = Settings.GetLockNodeId(), .InconsistentTx = Settings.GetInconsistentTx(), + .MvccSnapshot = Settings.GetMvccSnapshot(), + .LockMode = Settings.GetLockMode(), }, .Priority = Settings.GetPriority(), .IsOlap = Settings.GetIsOlap(), diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 467605df5ccb..8c0dfa76ea73 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -354,6 +354,12 @@ class TKqpQueryState : public TNonCopyable { return false; } + if (TxCtx->EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) { + // ReadWrite snapshot isolation transaction with can only use uncommitted data. + // WriteOnly snapshot isolation transaction is executed like serializable transaction. + return !TxCtx->HasTableRead; + } + if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { if (tx && tx->GetHasEffects()) { YQL_ENSURE(tx->ResultsSize() == 0); @@ -370,7 +376,8 @@ class TKqpQueryState : public TNonCopyable { bool ShouldAcquireLocks(const TKqpPhyTxHolder::TConstPtr& tx) { Y_UNUSED(tx); - if (*TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { + if (*TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE && + *TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) { return false; } @@ -415,7 +422,7 @@ class TKqpQueryState : public TNonCopyable { auto tx = PreparedQuery->GetPhyTxOrEmpty(CurrentTx); if (TxCtx->CanDeferEffects()) { - // At current time sinks require separate tnx with commit. + // Olap sinks require separate tnx with commit. while (tx && tx->GetHasEffects() && !TxCtx->HasOlapTable) { QueryData->CreateKqpValueMap(tx); bool success = TxCtx->AddDeferredEffect(tx, QueryData); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 95c7cd096665..5a1b29a1ac2e 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -818,6 +818,12 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->TxCtx->SetIsolationLevel(settings); QueryState->TxCtx->OnBeginQuery(); + if (QueryState->TxCtx->EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW + && !Settings.TableService.GetEnableSnapshotIsolationRW()) { + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) + << "Writes aren't supported for Snapshot Isolation"; + } + if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) { std::vector issues{ YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)}; @@ -893,9 +899,12 @@ class TKqpSessionActor : public TActorBootstrapped { const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); const bool hasOlapWrite = ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); const bool hasOltpWrite = ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - QueryState->TxCtx->HasOlapTable |= hasOlapWrite || ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery); - QueryState->TxCtx->HasOltpTable |= hasOltpWrite || ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery); + const bool hasOlapRead = ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery); + const bool hasOltpRead = ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery); + QueryState->TxCtx->HasOlapTable |= hasOlapWrite || hasOlapRead; + QueryState->TxCtx->HasOltpTable |= hasOltpWrite || hasOltpRead; QueryState->TxCtx->HasTableWrite |= hasOlapWrite || hasOltpWrite; + QueryState->TxCtx->HasTableRead |= hasOlapRead || hasOltpRead; if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite && !Settings.TableService.GetEnableHtapTx() && !QueryState->IsSplitted()) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, @@ -1130,8 +1139,8 @@ class TKqpSessionActor : public TActorBootstrapped { return; } - if (QueryState->TxCtx->ShouldExecuteDeferredEffects()) { - ExecuteDeferredEffectsImmediately(); + if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) { + ExecuteDeferredEffectsImmediately(tx); } else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) { ExecutePhyTx(tx, commit); } else { @@ -1139,8 +1148,8 @@ class TKqpSessionActor : public TActorBootstrapped { } } - void ExecuteDeferredEffectsImmediately() { - YQL_ENSURE(QueryState->TxCtx->ShouldExecuteDeferredEffects()); + void ExecuteDeferredEffectsImmediately(const TKqpPhyTxHolder::TConstPtr& tx) { + YQL_ENSURE(QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)); auto& txCtx = *QueryState->TxCtx; auto request = PrepareRequest(/* tx */ nullptr, /* literal */ false, QueryState.get()); @@ -1160,6 +1169,9 @@ class TKqpSessionActor : public TActorBootstrapped { txCtx.ClearDeferredEffects(); SendToExecuter(QueryState->TxCtx.Get(), std::move(request)); + if (!tx) { + ++QueryState->CurrentTx; + } } bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) { diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index bd3080432739..38c49472efe0 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -4241,7 +4241,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } Y_UNIT_TEST_TWIN(TableSink_Oltp_Replace, UseSink) { - //UseSink = true; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(UseSink); appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); diff --git a/ydb/core/kqp/ut/tx/kqp_sink_common.h b/ydb/core/kqp/ut/tx/kqp_sink_common.h index 80dae769b18f..d7e9075a1e0b 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_common.h +++ b/ydb/core/kqp/ut/tx/kqp_sink_common.h @@ -25,6 +25,7 @@ class TTableDataModificationTester { void Execute() { AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(!DisableSinks); AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(!DisableSinks); + AppConfig.MutableTableServiceConfig()->SetEnableSnapshotIsolationRW(true); AppConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); auto settings = TKikimrSettings().SetAppConfig(AppConfig).SetWithSampleTables(false); if (FastSnapshotExpiration) { diff --git a/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp b/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp new file mode 100644 index 000000000000..47fc6216bba6 --- /dev/null +++ b/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp @@ -0,0 +1,232 @@ +#include "kqp_sink_common.h" + +#include +#include +#include +#include + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NQuery; + +Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) { + class TSimple : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + auto session1 = client.GetSession().GetValueSync().GetSession(); + + { + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[300u];["None"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0))); + } + + { + auto result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed"); + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(1000))).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[300u];["Changed"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0))); + } + } + }; + + Y_UNIT_TEST(TSimpleOltp) { + TSimple tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(TSimpleOltpNoSink) { + TSimple tester; + tester.SetIsOlap(false); + tester.SetDisableSinks(true); + tester.Execute(); + } + + Y_UNIT_TEST(TSimpleOlap) { + TSimple tester; + tester.SetIsOlap(true); + tester.Execute(); + } + + class TConflictWrite : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/KV`; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed Other"); + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed"); + )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); + // Keys changed since taking snapshot. + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + + result = session2.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[300u];["Changed Other"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(TConflictWriteOltp) { + TConflictWrite tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(TConflictWriteOltpNoSink) { + TConflictWrite tester; + tester.SetIsOlap(false); + tester.SetDisableSinks(true); + tester.Execute(); + } + + Y_UNIT_TEST(TConflictWriteOlap) { + TConflictWrite tester; + tester.SetIsOlap(true); + tester.Execute(); + } + + class TConflictReadWrite : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test`; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "NOT Paul", "Changed Other"); + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed"); + )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session2.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[300u];["Changed"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(TConflictReadWriteOltp) { + TConflictReadWrite tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(TConflictReadWriteOltpNoSink) { + TConflictReadWrite tester; + tester.SetIsOlap(false); + tester.SetDisableSinks(true); + tester.Execute(); + } + + Y_UNIT_TEST(TConflictReadWriteOlap) { + TConflictReadWrite tester; + tester.SetIsOlap(true); + tester.Execute(); + } + + class TReadOnly : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + auto session1 = client.GetSession().GetValueSync().GetSession(); + auto session2 = client.GetSession().GetValueSync().GetSession(); + + auto result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[300u];["None"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0))); + + auto tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + + result = session2.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed Other"); + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; + )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[300u];["None"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0))); + + result = session1.ExecuteQuery(Q_(R"( + SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; + )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[300u];["Changed Other"];1u;"Paul"]])", FormatResultSetYson(result.GetResultSet(0))); + } + }; + + Y_UNIT_TEST(TReadOnlyOltp) { + TReadOnly tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(TReadOnlyOltpNoSink) { + TReadOnly tester; + tester.SetIsOlap(false); + tester.SetDisableSinks(true); + tester.Execute(); + } + + Y_UNIT_TEST(TReadOnlyOlap) { + TReadOnly tester; + tester.SetIsOlap(true); + tester.Execute(); + } +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/tx/ya.make b/ydb/core/kqp/ut/tx/ya.make index c1495dd99911..0a6133025b66 100644 --- a/ydb/core/kqp/ut/tx/ya.make +++ b/ydb/core/kqp/ut/tx/ya.make @@ -17,6 +17,7 @@ SRCS( kqp_sink_locks_ut.cpp kqp_sink_mvcc_ut.cpp kqp_sink_tx_ut.cpp + kqp_snapshot_isolation_ut.cpp kqp_tx_ut.cpp ) diff --git a/ydb/core/protos/data_events.proto b/ydb/core/protos/data_events.proto index dacd854792da..e856858f49fe 100644 --- a/ydb/core/protos/data_events.proto +++ b/ydb/core/protos/data_events.proto @@ -66,6 +66,11 @@ message TMvccSnapshot { optional bool RepeatableRead = 12 [default = true]; } +enum ELockMode { + OPTIMISTIC = 0; + OPTIMISTIC_SNAPSHOT_ISOLATION = 1; +} + enum EDataFormat { FORMAT_UNSPECIFIED = 0; FORMAT_CELLVEC = 1; @@ -115,6 +120,8 @@ message TEvWrite { // This mostly affects the minimum MVCC version of the resulting write optional TMvccSnapshot MvccSnapshot = 8; optional uint32 GranuleShardingVersionId = 9; + + optional ELockMode LockMode = 10; } message TEvWriteResult { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 10d35daaf6be..301f7c5f9ce5 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -64,6 +64,7 @@ enum EIsolationLevel { ISOLATION_LEVEL_READ_UNCOMMITTED = 3; ISOLATION_LEVEL_READ_STALE = 4; ISOLATION_LEVEL_SNAPSHOT_RO = 5; + ISOLATION_LEVEL_SNAPSHOT_RW = 6; }; message TTopicOperationsRequest { @@ -562,6 +563,8 @@ message TEvStartKqpTasksRequest { optional double PoolMaxCpuShare = 12; optional double QueryCpuShare = 16; optional double ResourceWeight = 18; + + optional NKikimrDataEvents.ELockMode LockMode = 19; } message TEvStartKqpTasksResponse { @@ -735,6 +738,8 @@ message TKqpTableSinkSettings { optional int64 Priority = 11; optional bool IsOlap = 12; repeated uint32 WriteIndexes = 13; + optional NKikimrDataEvents.TMvccSnapshot MvccSnapshot = 14; + optional NKikimrDataEvents.ELockMode LockMode = 15; } message TKqpStreamLookupSettings { @@ -751,6 +756,7 @@ message TKqpStreamLookupSettings { optional bool KeepRowsOrder = 11; optional bool AllowNullKeys = 12; optional uint32 AllowNullKeysPrefixSize = 13; + optional NKikimrDataEvents.ELockMode LockMode = 14; } message TKqpSequencerSettings { diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index e8412b4eaea3..a7eb638068ab 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -356,4 +356,6 @@ message TTableServiceConfig { // This value used for `arrow::Array`s built inside TDqOutputHashPartitionConsumerBlock - // if the underlying array buffer is filled less than this value, then the buffer's capacity gets shrunk to actual size. // Otherwise, we potentially don't track the real buffer capacity and it may lead to OOM situations inside DqOutputChannel's. + + optional bool EnableSnapshotIsolationRW = 76 [default = false]; }; diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 6019cc753c09..62a76826fbc6 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -261,6 +261,8 @@ message TKqpReadRangesSourceSettings { optional bool AllowInconsistentReads = 18 [default = false]; repeated TKqpTransaction.TColumnMeta DuplicateCheckColumns = 19; + + optional NKikimrDataEvents.ELockMode LockMode = 20; } message TKqpTaskInfo { @@ -298,6 +300,7 @@ message TDataTransaction { // Datashard will subscribe to lock status when node id is non-zero optional uint32 LockNodeId = 17; + optional NKikimrDataEvents.ELockMode LockMode = 18; } message TCreateVolatileSnapshot { @@ -1702,6 +1705,7 @@ message TEvKqpScan { optional uint32 LockNodeId = 25; optional string CSScanPolicy = 26; optional NKikimrKqp.TEvKqpScanCursor ScanCursor = 27; + optional NKikimrDataEvents.ELockMode LockMode = 28; } message TEvCompactTable { @@ -1887,6 +1891,8 @@ message TEvRead { optional bytes Program = 902; optional NKikimrSchemeOp.EOlapProgramType ProgramType = 903; + + optional NKikimrDataEvents.ELockMode LockMode = 904; } message TReadContinuationToken { diff --git a/ydb/public/api/protos/ydb_query.proto b/ydb/public/api/protos/ydb_query.proto index b1fabe26d5d7..252f3dc43b32 100644 --- a/ydb/public/api/protos/ydb_query.proto +++ b/ydb/public/api/protos/ydb_query.proto @@ -62,6 +62,8 @@ message StaleModeSettings { message SnapshotModeSettings { } +message SnapshotRWModeSettings { +} message TransactionSettings { oneof tx_mode { @@ -69,6 +71,7 @@ message TransactionSettings { OnlineModeSettings online_read_only = 2; StaleModeSettings stale_read_only = 3; SnapshotModeSettings snapshot_read_only = 4; + SnapshotRWModeSettings snapshot_read_write = 5; } } diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index dbfeafcac66b..f21a3b2749d0 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -879,12 +879,16 @@ message StaleModeSettings { message SnapshotModeSettings { } +message SnapshotRWModeSettings { +} + message TransactionSettings { oneof tx_mode { SerializableModeSettings serializable_read_write = 1; OnlineModeSettings online_read_only = 2; StaleModeSettings stale_read_only = 3; SnapshotModeSettings snapshot_read_only = 4; + SnapshotRWModeSettings snapshot_read_write = 5; } } diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index 331f17ec429c..2a5f47a69611 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -48,6 +48,9 @@ static void SetTxSettings(const TTxSettings& txSettings, Ydb::Query::Transaction case TTxSettings::TS_SNAPSHOT_RO: proto->mutable_snapshot_read_only(); break; + case TTxSettings::TS_SNAPSHOT_RW: + proto->mutable_snapshot_read_write(); + break; default: throw TContractViolation("Unexpected transaction mode."); } diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp index 93b91c5ac2e1..897a7ee8a3c8 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp @@ -32,6 +32,9 @@ static void SetTxSettings(const TTxSettings& txSettings, Ydb::Query::Transaction case TTxSettings::TS_SNAPSHOT_RO: proto->mutable_snapshot_read_only(); break; + case TTxSettings::TS_SNAPSHOT_RW: + proto->mutable_snapshot_read_write(); + break; default: throw TContractViolation("Unexpected transaction mode."); } diff --git a/ydb/public/sdk/cpp/client/ydb_query/tx.h b/ydb/public/sdk/cpp/client/ydb_query/tx.h index 749ddd00db43..d6e291f316c3 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/tx.h +++ b/ydb/public/sdk/cpp/client/ydb_query/tx.h @@ -34,6 +34,10 @@ struct TTxSettings { return TTxSettings(TS_SNAPSHOT_RO); } + static TTxSettings SnapshotRW() { + return TTxSettings(TS_SNAPSHOT_RW); + } + void Out(IOutputStream& out) const { switch (Mode_) { case TS_SERIALIZABLE_RW: @@ -48,6 +52,9 @@ struct TTxSettings { case TS_SNAPSHOT_RO: out << "SnapshotRO"; break; + case TS_SNAPSHOT_RW: + out << "SnapshotRW"; + break; default: out << "Unknown"; break; @@ -58,7 +65,8 @@ struct TTxSettings { TS_SERIALIZABLE_RW, TS_ONLINE_RO, TS_STALE_RO, - TS_SNAPSHOT_RO + TS_SNAPSHOT_RO, + TS_SNAPSHOT_RW, }; FLUENT_SETTING(TTxOnlineSettings, OnlineSettings); diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp index 38a84818661d..3e6dfa8d1f2f 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -1134,6 +1134,9 @@ void TTableClient::TImpl::SetTxSettings(const TTxSettings& txSettings, Ydb::Tabl case TTxSettings::TS_SNAPSHOT_RO: proto->mutable_snapshot_read_only(); break; + case TTxSettings::TS_SNAPSHOT_RW: + proto->mutable_snapshot_read_write(); + break; default: throw TContractViolation("Unexpected transaction mode."); } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index 5c2ce1b28e6d..a6d2f68dca9a 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -1317,6 +1317,10 @@ class TTxSettings { return TTxSettings(TS_SNAPSHOT_RO); } + static TTxSettings SnapshotRW() { + return TTxSettings(TS_SNAPSHOT_RW); + } + void Out(IOutputStream& out) const { switch (Mode_) { case TS_SERIALIZABLE_RW: @@ -1331,6 +1335,9 @@ class TTxSettings { case TS_SNAPSHOT_RO: out << "SnapshotRO"; break; + case TS_SNAPSHOT_RW: + out << "SnapshotRW"; + break; default: out << "Unknown"; break; @@ -1342,7 +1349,8 @@ class TTxSettings { TS_SERIALIZABLE_RW, TS_ONLINE_RO, TS_STALE_RO, - TS_SNAPSHOT_RO + TS_SNAPSHOT_RO, + TS_SNAPSHOT_RW, }; FLUENT_SETTING(TTxOnlineSettings, OnlineSettings);