From eb51c4f405e06cc9fbbe09e39b577b9349a6bba3 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Thu, 3 Oct 2019 16:55:45 -0400 Subject: [PATCH] sql: use LeafTxn for some gateway flows Before this patch, a DistSQL flow running on its gateway node would use the RootTxn for all its processors for row-based flows / all of its operators for vectorized flows if there are no remote flows. Some of these processors/operator can execute concurrently with one another. RootTxns don't support concurrent requests (see #25329), resulting in some reads possibly missing to see the transaction's own writes. This patch fixes things by using a LeafTxn on the gateway in case there's concurrency on the gateway or if there's any remote flows. In other words, the Root is used only if there's no remote flows and no concurrency. This is sufficient for supporting mutations (which need the Root), because mutations force everything to be planned on the gateway and so, thanks to the previous commit, there's no concurrency if that's the case. Fixes #40487 Touches #24798 Release justification: Fixes bad bugs. Release note: Fix a bug possibly leading to transactions missing to see their own previous writes (#40487). --- pkg/ccl/importccl/sst_writer_proc.go | 2 +- pkg/sql/distsql/server.go | 66 +++++++++++++++++----------- pkg/sql/flowinfra/flow.go | 11 +++++ pkg/sql/rowexec/scrub_tablereader.go | 8 ++-- 4 files changed, 56 insertions(+), 31 deletions(-) diff --git a/pkg/ccl/importccl/sst_writer_proc.go b/pkg/ccl/importccl/sst_writer_proc.go index 5302a370182c..6d559ed6ce27 100644 --- a/pkg/ccl/importccl/sst_writer_proc.go +++ b/pkg/ccl/importccl/sst_writer_proc.go @@ -64,7 +64,6 @@ func newSSTWriterProcessor( settings: flowCtx.Cfg.Settings, registry: flowCtx.Cfg.JobRegistry, progress: spec.Progress, - db: flowCtx.EvalCtx.Txn.DB(), } if err := sp.out.Init(&execinfrapb.PostProcessSpec{}, sstOutputTypes, flowCtx.NewEvalCtx(), output); err != nil { return nil, err @@ -97,6 +96,7 @@ func (sp *sstWriter) OutputTypes() []types.T { } func (sp *sstWriter) Run(ctx context.Context) { + sp.db = sp.flowCtx.EvalCtx.Txn.DB() sp.input.Start(ctx) ctx, span := tracing.ChildSpan(ctx, "sstWriter") diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 78ce867f1c05..85f1898b8fa5 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -219,29 +219,10 @@ func (ds *ServerImpl) setupFlow( ) monitor.Start(ctx, parentMonitor, mon.BoundAccount{}) - // Figure out what txn the flow needs to run in, if any. - // For local flows, the txn comes from localState.Txn. For non-local ones, we - // create a txn based on the request's TxnCoordMeta. - var txn *client.Txn - if !localState.IsLocal { - if meta := req.TxnCoordMeta; meta != nil { - if meta.Txn.Status != roachpb.PENDING { - return nil, nil, errors.Errorf("cannot create flow in non-PENDING txn: %s", - meta.Txn) - } - // The flow will run in a LeafTxn because we do not want each distributed - // Txn to heartbeat the transaction. - txn = client.NewTxnWithCoordMeta(ctx, ds.FlowDB, req.Flow.Gateway, client.LeafTxn, *meta) - } - } else { - txn = localState.Txn - } - var evalCtx *tree.EvalContext if localState.EvalContext != nil { evalCtx = localState.EvalContext evalCtx.Mon = &monitor - evalCtx.Txn = txn } else { location, err := timeutil.TimeZoneStringToLocation(req.EvalContext.Location) if err != nil { @@ -296,7 +277,6 @@ func (ds *ServerImpl) setupFlow( // TODO(andrei): This is wrong. Each processor should override Ctx with its // own context. Context: ctx, - Txn: txn, Planner: &sqlbase.DummyEvalPlanner{}, SessionAccessor: &sqlbase.DummySessionAccessor{}, Sequence: &sqlbase.DummySequenceOperators{}, @@ -320,7 +300,6 @@ func (ds *ServerImpl) setupFlow( Cfg: &ds.ServerConfig, ID: req.Flow.FlowID, EvalCtx: evalCtx, - Txn: txn, NodeID: nodeID, TraceKV: req.TraceKV, Local: localState.IsLocal, @@ -353,6 +332,27 @@ func (ds *ServerImpl) setupFlow( if f.IsVectorized() { telemetry.Inc(sqltelemetry.VecExecCounter) } + + // Figure out what txn the flow needs to run in, if any. For gateway flows + // that have no remote flows and also no concurrency, the txn comes from + // localState.Txn. Otherwise, we create a txn based on the request's + // TxnCoordMeta. + var txn *client.Txn + if localState.IsLocal && !f.ConcurrentExecution() { + txn = localState.Txn + } else { + if meta := req.TxnCoordMeta; meta != nil { + if meta.Txn.Status != roachpb.PENDING { + return nil, nil, errors.Errorf("cannot create flow in non-PENDING txn: %s", + meta.Txn) + } + // The flow will run in a LeafTxn because we do not want each distributed + // Txn to heartbeat the transaction. + txn = client.NewTxnWithCoordMeta(ctx, ds.FlowDB, req.Flow.Gateway, client.LeafTxn, *meta) + } + } + f.SetTxn(txn) + return ctx, f, nil } @@ -381,7 +381,12 @@ func (ds *ServerImpl) SetupSyncFlow( req *execinfrapb.SetupFlowRequest, output execinfra.RowReceiver, ) (context.Context, flowinfra.Flow, error) { - return ds.setupFlow(ds.AnnotateCtx(ctx), opentracing.SpanFromContext(ctx), parentMonitor, req, output, LocalState{}) + ctx, f, err := ds.setupFlow(ds.AnnotateCtx(ctx), opentracing.SpanFromContext(ctx), parentMonitor, + req, output, LocalState{}) + if err != nil { + return nil, nil, err + } + return ctx, f, err } // LocalState carries information that is required to set up a flow with wrapped @@ -393,6 +398,11 @@ type LocalState struct { // remote flows. IsLocal bool + // Txn is filled in on the gateway only. It is the RootTxn that the query is running in. + // This will be used directly by the flow if the flow has no concurrency and IsLocal is set. + // If there is concurrency, a LeafTxn will be created. + Txn *client.Txn + ///////////////////////////////////////////// // Fields below are empty if IsLocal == false ///////////////////////////////////////////// @@ -400,12 +410,11 @@ type LocalState struct { // LocalProcs is an array of planNodeToRowSource processors. It's in order and // will be indexed into by the RowSourceIdx field in LocalPlanNodeSpec. LocalProcs []execinfra.LocalProcessor - Txn *client.Txn } // SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node. -// It's used by the gateway node to set up the flows local to it. Otherwise, -// the same as SetupSyncFlow. +// It's used by the gateway node to set up the flows local to it. +// It's the same as SetupSyncFlow except it takes the localState. func (ds *ServerImpl) SetupLocalSyncFlow( ctx context.Context, parentMonitor *mon.BytesMonitor, @@ -413,7 +422,12 @@ func (ds *ServerImpl) SetupLocalSyncFlow( output execinfra.RowReceiver, localState LocalState, ) (context.Context, flowinfra.Flow, error) { - return ds.setupFlow(ctx, opentracing.SpanFromContext(ctx), parentMonitor, req, output, localState) + ctx, f, err := ds.setupFlow(ctx, opentracing.SpanFromContext(ctx), parentMonitor, req, output, + localState) + if err != nil { + return nil, nil, err + } + return ctx, f, err } // RunSyncFlow is part of the DistSQLServer interface. diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 3c795cea5bad..193ff7ce646f 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -14,6 +14,7 @@ import ( "context" "sync" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -65,6 +66,10 @@ type Flow interface { // spec. The flow will then need to be started and run. Setup(ctx context.Context, spec *execinfrapb.FlowSpec, opt FuseOpt) error + // SetTxn is used to provide the transaction in which the flow will run. + // It needs to be called after Setup() and before Start/Run. + SetTxn(*client.Txn) + // Start starts the flow. Processors run asynchronously in their own goroutines. // Wait() needs to be called to wait for the flow to finish. // See Run() for a synchronous version. @@ -169,6 +174,12 @@ func (f *FlowBase) Setup(context.Context, *execinfrapb.FlowSpec, FuseOpt) error panic("Setup should not be called on FlowBase") } +// SetTxn is part of the Flow interface. +func (f *FlowBase) SetTxn(txn *client.Txn) { + f.FlowCtx.Txn = txn + f.EvalCtx.Txn = txn +} + // ConcurrentExecution is part of the Flow interface. func (f *FlowBase) ConcurrentExecution() bool { return len(f.processors) > 1 diff --git a/pkg/sql/rowexec/scrub_tablereader.go b/pkg/sql/rowexec/scrub_tablereader.go index b445a5b3601d..0efcb10b9bd8 100644 --- a/pkg/sql/rowexec/scrub_tablereader.go +++ b/pkg/sql/rowexec/scrub_tablereader.go @@ -71,10 +71,6 @@ func newScrubTableReader( if flowCtx.NodeID == 0 { return nil, errors.Errorf("attempting to create a tableReader with uninitialized NodeID") } - if flowCtx.Txn == nil { - return nil, errors.Errorf("scrubTableReader outside of txn") - } - tr := &scrubTableReader{ indexIdx: int(spec.IndexIdx), } @@ -214,6 +210,10 @@ func (tr *scrubTableReader) prettyPrimaryKeyValues( // Start is part of the RowSource interface. func (tr *scrubTableReader) Start(ctx context.Context) context.Context { + if tr.FlowCtx.Txn == nil { + tr.MoveToDraining(errors.Errorf("scrubTableReader outside of txn")) + } + ctx = tr.StartInternal(ctx, scrubTableReaderProcName) log.VEventf(ctx, 1, "starting")