diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 8a7e66d61cec..cf9f1e1bfbc1 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -423,6 +423,7 @@ func (dsp *DistSQLPlanner) Run( // NB: putting part of evalCtx in localState means it might be mutated down // the line. localState.EvalContext = &evalCtx.Context + localState.IsLocal = planCtx.isLocal localState.Txn = txn localState.LocalProcs = plan.LocalProcessors // If we need to perform some operation on the flow specs, we want to @@ -445,9 +446,12 @@ func (dsp *DistSQLPlanner) Run( // to be on the safe side and mark 'noMutations' as 'false'. noMutations := planCtx.planner != nil && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation) - if planCtx.isLocal { - localState.IsLocal = true - if noMutations && planCtx.parallelizeScansIfLocal { + if txn == nil { + // Txn can be nil in some cases, like BulkIO flows. In such a case, we + // cannot create a LeafTxn, so we cannot parallelize scans. + planCtx.parallelizeScansIfLocal = false + } else { + if planCtx.isLocal && noMutations && planCtx.parallelizeScansIfLocal { // Even though we have a single flow on the gateway node, we might // have decided to parallelize the scans. If that's the case, we // will need to use the Leaf txn. @@ -455,45 +459,52 @@ func (dsp *DistSQLPlanner) Run( localState.HasConcurrency = localState.HasConcurrency || execinfra.HasParallelProcessors(flow) } } - } - if noMutations { - // Even if planCtx.isLocal is false (which is the case when we think - // it's worth distributing the query), we need to go through the - // processors to figure out whether any of them have concurrency. - // - // However, the concurrency requires the usage of LeafTxns which is only - // acceptable if we don't have any mutations in the plan. - // TODO(yuzefovich): we could be smarter here and allow the usage of the - // RootTxn by the mutations while still using the Streamer (that gets a - // LeafTxn) iff the plan is such that there is no concurrency between - // the root and the leaf txns. - // - // At the moment of writing, this is only relevant whenever the Streamer - // API might be used by some of the processors. The Streamer internally - // can have concurrency, so it expects to be given a LeafTxn. In order - // for that LeafTxn to be created later, during the flow setup, we need - // to populate leafInputState below, so we tell the localState that - // there is concurrency. - if row.CanUseStreamer(ctx, dsp.st) { - for _, proc := range plan.Processors { - if jr := proc.Spec.Core.JoinReader; jr != nil { - // Both index and lookup joins, with and without ordering, - // are executed via the Streamer API that has concurrency. - localState.HasConcurrency = true - break + if noMutations { + // Even if planCtx.isLocal is false (which is the case when we think + // it's worth distributing the query), we need to go through the + // processors to figure out whether any of them have concurrency. + // + // However, the concurrency requires the usage of LeafTxns which is + // only acceptable if we don't have any mutations in the plan. + // TODO(yuzefovich): we could be smarter here and allow the usage of + // the RootTxn by the mutations while still using the Streamer (that + // gets a LeafTxn) iff the plan is such that there is no concurrency + // between the root and the leaf txns. + // + // At the moment of writing, this is only relevant whenever the + // Streamer API might be used by some of the processors. The + // Streamer internally can have concurrency, so it expects to be + // given a LeafTxn. In order for that LeafTxn to be created later, + // during the flow setup, we need to populate leafInputState below, + // so we tell the localState that there is concurrency. + if row.CanUseStreamer(ctx, dsp.st) { + for _, proc := range plan.Processors { + if jr := proc.Spec.Core.JoinReader; jr != nil { + // Both index and lookup joins, with and without + // ordering, are executed via the Streamer API that has + // concurrency. + localState.HasConcurrency = true + break + } } } } - } - if localState.MustUseLeafTxn() && txn != nil { - // Set up leaf txns using the txnCoordMeta if we need to. - tis, err := txn.GetLeafTxnInputStateOrRejectClient(ctx) - if err != nil { - log.Infof(ctx, "%s: %s", clientRejectedMsg, err) - recv.SetError(err) - return cleanup + if localState.MustUseLeafTxn() { + // Set up leaf txns using the txnCoordMeta if we need to. + tis, err := txn.GetLeafTxnInputStateOrRejectClient(ctx) + if err != nil { + log.Infof(ctx, "%s: %s", clientRejectedMsg, err) + recv.SetError(err) + return cleanup + } + if tis == nil { + recv.SetError(errors.AssertionFailedf( + "leafInputState is nil when txn is non-nil and we must use the leaf txn", + )) + return cleanup + } + leafInputState = tis } - leafInputState = tis } if logPlanDiagram {