Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot Isolation: kqp #12825

Merged
merged 24 commits into from
Dec 24, 2024
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ ydb/core/kqp/ut/olap [*/*] chunk chunk
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
ydb/core/kqp/ut/query KqpLimits.QueryExecTimeoutCancel
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictWrite*
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/data_integrity_trails/data_integrity_trails.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ inline void LogTxSettings(const TransactionSettings& txSettings, TStringStream&
case TransactionSettings::kSnapshotReadOnly:
LogKeyValue("TxMode", "SnapshotReadOnly", ss);
break;
case TransactionSettings::kSnapshotReadWrite:
LogKeyValue("TxMode", "SnapshotReadWrite", ss);
break;
case TransactionSettings::TX_MODE_NOT_SET:
LogKeyValue("TxMode", "Undefined", ss);
break;
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::Tra
case Ydb::Query::TransactionSettings::kSnapshotReadOnly:
to.mutable_snapshot_read_only();
break;
case Ydb::Query::TransactionSettings::kSnapshotReadWrite:
to.mutable_snapshot_read_write();
break;
default:
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
"Invalid tx_settings"));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/grpc_services/query/rpc_kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ class TBeginTransactionRPC : public TActorBootstrapped<TBeginTransactionRPC> {
case Ydb::Query::TransactionSettings::kSnapshotReadOnly:
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_snapshot_read_only();
break;
case Ydb::Query::TransactionSettings::kSnapshotReadWrite:
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_snapshot_read_write();
break;
}

ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX);
Expand Down
39 changes: 28 additions & 11 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
Y_UNUSED(config);

if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE &&
*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO)
*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO &&
*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW)
return false;

if (txCtx.GetSnapshot().IsValid())
Expand Down Expand Up @@ -211,26 +212,42 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig

YQL_ENSURE(!hasSinkWrite || hasEffects);

// We don't want snapshot when there are effects at the moment,
// because it hurts performance when there are multiple single-shard
// reads and a single distributed commit. Taking snapshot costs
// similar to an additional distributed transaction, and it's very
// hard to predict when that happens, causing performance
// degradation.
if (hasEffects) {
return false;
}

// We need snapshot for stream lookup, besause it's used for dependent reads
if (hasStreamLookup) {
return true;
}

if (*txCtx.EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW) {
if (hasEffects && !txCtx.HasTableRead) {
YQL_ENSURE(txCtx.HasTableWrite);
// Don't need snapshot for WriteOnly transaction.
return false;
} else if (hasEffects) {
YQL_ENSURE(txCtx.HasTableWrite);
// ReadWrite transaction => need snapshot
return true;
}
// ReadOnly transaction here
} else {
// We don't want snapshot when there are effects at the moment,
// because it hurts performance when there are multiple single-shard
// reads and a single distributed commit. Taking snapshot costs
// similar to an additional distributed transaction, and it's very
// hard to predict when that happens, causing performance
// degradation.
if (hasEffects) {
return false;
}
}

YQL_ENSURE(!hasEffects && !hasStreamLookup);

// We need snapshot when there are multiple table read phases, most
// likely it involves multiple tables and we would have to use a
// distributed commit otherwise. Taking snapshot helps as avoid TLI
// for read-only transactions, and costs less than a final distributed
// commit.
// NOTE: In case of read from single shard, we won't take snapshot.
return readPhases > 1;
}

Expand Down
12 changes: 11 additions & 1 deletion ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
HasOlapTable = false;
HasOltpTable = false;
HasTableWrite = false;
HasTableRead = false;
NeedUncommittedChangesFlush = false;
}

Expand Down Expand Up @@ -264,16 +265,24 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
Readonly = true;
break;

case Ydb::Table::TransactionSettings::kSnapshotReadWrite:
EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW;
Readonly = false;
break;

case Ydb::Table::TransactionSettings::TX_MODE_NOT_SET:
YQL_ENSURE(false, "tx_mode not set, settings: " << settings);
break;
};
}

bool ShouldExecuteDeferredEffects() const {
bool ShouldExecuteDeferredEffects(const TKqpPhyTxHolder::TConstPtr& tx) const {
if (NeedUncommittedChangesFlush || HasOlapTable) {
return !DeferredEffects.Empty();
}
if (EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW && !tx && HasTableRead) {
return !DeferredEffects.Empty();
}

return false;
}
Expand Down Expand Up @@ -343,6 +352,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;
bool HasTableRead = false;

bool NeedUncommittedChangesFlush = false;
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
kqpConfig.EnableSnapshotIsolationRW = serviceConfig.GetEnableSnapshotIsolationRW();

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();

bool enableSnapshotIsolationRW = TableServiceConfig.GetEnableSnapshotIsolationRW();

TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");

Expand Down Expand Up @@ -346,7 +348,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableAstCache() != enableAstCache ||
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams ||
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution) {
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution ||
TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW) {

QueryCache->Clear();

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,20 @@ namespace NKikimr::NKqp {
using namespace NYql::NDq;
using namespace NYql::NDqProto;

IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions,
NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode)
{
return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory),
return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory),
settings, memoryLimits, std::move(traceId), std::move(arena), mode);
}

IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, meta, shardsScanningPolicy, counters, std::move(traceId));
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode, meta, shardsScanningPolicy, counters, std::move(traceId));
}

}
9 changes: 5 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode);

IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings,
const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode);

IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);

NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
TIntrusivePtr<TKqpCounters> counters,
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
YQL_ENSURE(args.ComputesByStages);
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.LockTxId, args.LockNodeId, args.Task,
AsyncIoFactory, runtimeSettings, memoryLimits,
IActor* computeActor = CreateKqpScanComputeActor(
args.ExecuterId, args.TxId,
args.Task, AsyncIoFactory, runtimeSettings, memoryLimits,
std::move(args.TraceId), std::move(args.Arena),
std::move(args.SchedulingOptions), args.BlockTrackingMode);
TActorId result = TlsActivationContext->Register(computeActor);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ struct IKqpNodeComputeActorFactory {
const ui64 TxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
NYql::NDqProto::TDqTask* Task;
TIntrusivePtr<NRm::TTxState> TxInfo;
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);

} // anonymous namespace

TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, EBlockTrackingMode mode)
: TBase(std::move(cpuOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
, ComputeCtx(settings.StatsMode)
, LockTxId(lockTxId)
, LockNodeId(lockNodeId)
, BlockTrackingMode(mode)
{
InitializeTask();
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class TKqpScanComputeActor: public TSchedulableComputeActorBase<TKqpScanComputeA

std::set<NActors::TActorId> Fetchers;
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;

struct TLockHash {
bool operator()(const NKikimrDataEvents::TLock& lock) {
Expand Down Expand Up @@ -70,7 +68,7 @@ class TKqpScanComputeActor: public TSchedulableComputeActorBase<TKqpScanComputeA
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
}

TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId,
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena, EBlockTrackingMode mode);
Expand Down
13 changes: 9 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3;


TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot,
const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId,
const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors,
const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId, const TMaybe<NKikimrDataEvents::ELockMode> lockMode,
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy,
TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
: Meta(meta)
Expand All @@ -32,6 +33,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
, TxId(txId)
, LockTxId(lockTxId)
, LockNodeId(lockNodeId)
, LockMode(lockMode)
, ComputeActorIds(std::move(computeActors))
, Snapshot(snapshot)
, ShardsScanningPolicy(shardsScanningPolicy)
Expand Down Expand Up @@ -447,7 +449,7 @@ std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEv
ev->Record.SetStatsMode(RuntimeSettings.StatsMode);
ev->Record.SetScanId(scanId);
ev->Record.SetTxId(std::get<ui64>(TxId));
if (LockTxId) {
if (LockTxId && LockMode != NKikimrDataEvents::OPTIMISTIC_EXCLUSIVE_SNAPSHOT_ISOLATION) {
ev->Record.SetLockTxId(*LockTxId);
}
ev->Record.SetLockNodeId(LockNodeId);
Expand Down Expand Up @@ -499,8 +501,11 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
state->LastKey = std::move(msg.LastKey);
state->LastCursorProto = std::move(msg.LastCursorProto);
const ui64 rowsCount = msg.GetRowsCount();
AFL_ENSURE(!LockTxId || !msg.LocksInfo.Locks.empty() || !msg.LocksInfo.BrokenLocks.empty());
AFL_ENSURE(LockTxId || (msg.LocksInfo.Locks.empty() && msg.LocksInfo.BrokenLocks.empty()));
AFL_ENSURE(!(LockTxId && LockMode != NKikimrDataEvents::OPTIMISTIC_EXCLUSIVE_SNAPSHOT_ISOLATION)
|| !msg.LocksInfo.Locks.empty()
|| !msg.LocksInfo.BrokenLocks.empty());
AFL_ENSURE((LockTxId && LockMode != NKikimrDataEvents::OPTIMISTIC_EXCLUSIVE_SNAPSHOT_ISOLATION)
|| (msg.LocksInfo.Locks.empty() && msg.LocksInfo.BrokenLocks.empty()));
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action", "got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount())
("from", ev->Sender)("shards remain", PendingShards.size())
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
const NYql::NDq::TTxId TxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
const TMaybe<NKikimrDataEvents::ELockMode> LockMode;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_SCAN_FETCH_ACTOR;
}

TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const NYql::NDq::TComputeRuntimeSettings& settings,
std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId,
std::vector<NActors::TActorId>&& computeActors,
const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId, const TMaybe<NKikimrDataEvents::ELockMode> lockMode,
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);

Expand Down
Loading
Loading