Skip to content

Commit

Permalink
sql: harden the check for when leaf txn can used
Browse files Browse the repository at this point in the history
This commit refactors the code where we determine whether we have to
make a leaf txn for the local flow (which is the case when we either
parallelize scans or use the streamer API) to be "stronger". We recently
saw a nil-pointer error when trying to create a streamer (the txn was
nil), and the only case I could see this happening was that if
`makeLeaf` function returned nil which can occur if the original txn was
nil. I still don't see how that could happen (and neither could
I reproduce it), but this commit makes the code more bullet-proof so
that we ensure that `LeafTxnInputState` is non-nil when we expect to
have concurrency.

Release note: None
  • Loading branch information
yuzefovich committed Jun 23, 2022
1 parent 9e47dc9 commit 0d340db
Showing 1 changed file with 49 additions and 38 deletions.
87 changes: 49 additions & 38 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (dsp *DistSQLPlanner) Run(
// NB: putting part of evalCtx in localState means it might be mutated down
// the line.
localState.EvalContext = &evalCtx.Context
localState.IsLocal = planCtx.isLocal
localState.Txn = txn
localState.LocalProcs = plan.LocalProcessors
// If we need to perform some operation on the flow specs, we want to
Expand All @@ -445,55 +446,65 @@ func (dsp *DistSQLPlanner) Run(
// to be on the safe side and mark 'noMutations' as 'false'.
noMutations := planCtx.planner != nil && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation)

if planCtx.isLocal {
localState.IsLocal = true
if noMutations && planCtx.parallelizeScansIfLocal {
if txn == nil {
// Txn can be nil in some cases, like BulkIO flows. In such a case, we
// cannot create a LeafTxn, so we cannot parallelize scans.
planCtx.parallelizeScansIfLocal = false
} else {
if planCtx.isLocal && noMutations && planCtx.parallelizeScansIfLocal {
// Even though we have a single flow on the gateway node, we might
// have decided to parallelize the scans. If that's the case, we
// will need to use the Leaf txn.
for _, flow := range flows {
localState.HasConcurrency = localState.HasConcurrency || execinfra.HasParallelProcessors(flow)
}
}
}
if noMutations {
// Even if planCtx.isLocal is false (which is the case when we think
// it's worth distributing the query), we need to go through the
// processors to figure out whether any of them have concurrency.
//
// However, the concurrency requires the usage of LeafTxns which is only
// acceptable if we don't have any mutations in the plan.
// TODO(yuzefovich): we could be smarter here and allow the usage of the
// RootTxn by the mutations while still using the Streamer (that gets a
// LeafTxn) iff the plan is such that there is no concurrency between
// the root and the leaf txns.
//
// At the moment of writing, this is only relevant whenever the Streamer
// API might be used by some of the processors. The Streamer internally
// can have concurrency, so it expects to be given a LeafTxn. In order
// for that LeafTxn to be created later, during the flow setup, we need
// to populate leafInputState below, so we tell the localState that
// there is concurrency.
if row.CanUseStreamer(ctx, dsp.st) {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
// Both index and lookup joins, with and without ordering,
// are executed via the Streamer API that has concurrency.
localState.HasConcurrency = true
break
if noMutations {
// Even if planCtx.isLocal is false (which is the case when we think
// it's worth distributing the query), we need to go through the
// processors to figure out whether any of them have concurrency.
//
// However, the concurrency requires the usage of LeafTxns which is
// only acceptable if we don't have any mutations in the plan.
// TODO(yuzefovich): we could be smarter here and allow the usage of
// the RootTxn by the mutations while still using the Streamer (that
// gets a LeafTxn) iff the plan is such that there is no concurrency
// between the root and the leaf txns.
//
// At the moment of writing, this is only relevant whenever the
// Streamer API might be used by some of the processors. The
// Streamer internally can have concurrency, so it expects to be
// given a LeafTxn. In order for that LeafTxn to be created later,
// during the flow setup, we need to populate leafInputState below,
// so we tell the localState that there is concurrency.
if row.CanUseStreamer(ctx, dsp.st) {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
// Both index and lookup joins, with and without
// ordering, are executed via the Streamer API that has
// concurrency.
localState.HasConcurrency = true
break
}
}
}
}
}
if localState.MustUseLeafTxn() && txn != nil {
// Set up leaf txns using the txnCoordMeta if we need to.
tis, err := txn.GetLeafTxnInputStateOrRejectClient(ctx)
if err != nil {
log.Infof(ctx, "%s: %s", clientRejectedMsg, err)
recv.SetError(err)
return cleanup
if localState.MustUseLeafTxn() {
// Set up leaf txns using the txnCoordMeta if we need to.
tis, err := txn.GetLeafTxnInputStateOrRejectClient(ctx)
if err != nil {
log.Infof(ctx, "%s: %s", clientRejectedMsg, err)
recv.SetError(err)
return cleanup
}
if tis == nil {
recv.SetError(errors.AssertionFailedf(
"leafInputState is nil when txn is non-nil and we must use the leaf txn",
))
return cleanup
}
leafInputState = tis
}
leafInputState = tis
}

if logPlanDiagram {
Expand Down

0 comments on commit 0d340db

Please sign in to comment.