Skip to content

Commit

Permalink
kvstreamer: ask for a separate LeafTxn
Browse files Browse the repository at this point in the history
This commit fixes the streamer so that it uses a LeafTxn that is not
shared with any other components. Such a setup is needed in order to
avoid spurious "context canceled" errors returned when multiple
streamers are part of the same flow.

Consider the following setup. We have a remote flow consisting of two
trees of operators rooted in the outboxes:
```
  └ Node 2
    ├ *colrpc.Outbox
    │ └ *rowexec.joinReader
...
    └ *colrpc.Outbox
      └ *rowexec.joinReader
```
Both of the join readers use the streamer API and run in separate
goroutines (because outboxes run in separate goroutines). Then let's
imagine that one of the outboxes is moved to draining while the
corresponding streamer is still evaluating requests. The join reader
will call `Streamer.Close` which cancels the context of the requests'
evaluation. This cancellation "poisons" the txn used by that streamer
with `ERROR: txn already encountered an error`.

Now let's imagine that the second tree of operators is not done yet and
needs to produce more data. Whenever the streamer in that tree tries to
evaluate some requests, it would run into the "poisoned" txn error.

This commit goes around this problem by giving a separate LeafTxn to
each streamer so that a streamer from one tree could not poison the txn
of the streamer from another tree. The non-streamer code path doesn't
run into a similar problem because it doesn't eagerly cancel the
outstanding requests.

I think a similar change (to give the streamer a separate leaf txn) will
also be needed to support the transparent refresh mechanism in some
cases, so this commit is beneficial from that point of view too.

I spent some time attempting to write a regression test, but it is quite
difficult to come up with something reliable here, and I decided to not
spend more time on it.

Release note: None
  • Loading branch information
yuzefovich committed Jun 8, 2022
1 parent d9fd5bf commit 6c88496
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func max(a, b int64) int64 {

// NewStreamer creates a new Streamer.
//
// txn must be a LeafTxn.
// txn must be a LeafTxn that is not used by anything other than this Streamer.
//
// limitBytes determines the maximum amount of memory this Streamer is allowed
// to use (i.e. it'll be used lazily, as needed). The more memory it has, the
Expand Down
16 changes: 12 additions & 4 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type ColIndexJoin struct {

flowCtx *execinfra.FlowCtx
cf *cFetcher
// txn is the transaction used by the index joiner.
txn *kv.Txn

// tracingSpan is created when the stats should be collected for the query
// execution, and it will be finished when closing the operator.
Expand Down Expand Up @@ -150,7 +152,7 @@ func (s *ColIndexJoin) Init(ctx context.Context) {
s.streamerInfo.Streamer = kvstreamer.NewStreamer(
s.flowCtx.Cfg.DistSender,
s.flowCtx.Stopper(),
s.flowCtx.Txn,
s.txn,
s.flowCtx.EvalCtx.Settings,
row.GetWaitPolicy(s.cf.lockWaitPolicy),
s.streamerInfo.budgetLimit,
Expand Down Expand Up @@ -252,7 +254,7 @@ func (s *ColIndexJoin) Next() coldata.Batch {
} else {
err = s.cf.StartScan(
s.Ctx,
s.flowCtx.Txn,
s.txn,
spans,
nil, /* bsHeader */
false, /* limitBatches */
Expand Down Expand Up @@ -407,7 +409,7 @@ func (s *ColIndexJoin) next() bool {
// DrainMeta is part of the colexecop.MetadataSource interface.
func (s *ColIndexJoin) DrainMeta() []execinfrapb.ProducerMetadata {
var trailingMeta []execinfrapb.ProducerMetadata
if tfs := execinfra.GetLeafTxnFinalState(s.Ctx, s.flowCtx.Txn); tfs != nil {
if tfs := execinfra.GetLeafTxnFinalState(s.Ctx, s.txn); tfs != nil {
trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs})
}
meta := execinfrapb.GetProducerMeta()
Expand Down Expand Up @@ -523,7 +525,8 @@ func NewColIndexJoin(
var streamerBudgetLimit int64

useStreamer := flowCtx.Txn != nil && flowCtx.Txn.Type() == kv.LeafTxn &&
row.CanUseStreamer(ctx, flowCtx.EvalCtx.Settings)
flowCtx.MakeLeafTxn != nil && row.CanUseStreamer(ctx, flowCtx.EvalCtx.Settings)
txn := flowCtx.Txn
if useStreamer {
if streamerBudgetAcc == nil {
return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired")
Expand All @@ -532,6 +535,10 @@ func NewColIndexJoin(
// and we'll give the remaining memory to the streamer budget below.
cFetcherMemoryLimit = int64(math.Ceil(float64(totalMemoryLimit) / 8.0))
streamerBudgetLimit = 7 * cFetcherMemoryLimit
txn, err = flowCtx.MakeLeafTxn()
if err != nil {
return nil, err
}
}

fetcher := cFetcherPool.Get().(*cFetcher)
Expand Down Expand Up @@ -564,6 +571,7 @@ func NewColIndexJoin(
spanAssembler: spanAssembler,
ResultTypes: tableArgs.typs,
maintainOrdering: spec.MaintainOrdering,
txn: txn,
usesStreamer: useStreamer,
limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post),
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (ds *ServerImpl) setupFlow(
)
monitor.Start(ctx, parentMonitor, mon.BoundAccount{})

makeLeaf := func(req *execinfrapb.SetupFlowRequest) (*kv.Txn, error) {
makeLeaf := func() (*kv.Txn, error) {
tis := req.LeafTxnInputState
if tis == nil {
// This must be a flow running for some bulk-io operation that doesn't use
Expand Down Expand Up @@ -311,7 +311,7 @@ func (ds *ServerImpl) setupFlow(
evalCtx.Mon = monitor
if localState.HasConcurrency {
var err error
leafTxn, err = makeLeaf(req)
leafTxn, err = makeLeaf()
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -333,7 +333,7 @@ func (ds *ServerImpl) setupFlow(
// It's important to populate evalCtx.Txn early. We'll write it again in the
// f.SetTxn() call below, but by then it will already have been captured by
// processors.
leafTxn, err = makeLeaf(req)
leafTxn, err = makeLeaf()
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -369,7 +369,7 @@ func (ds *ServerImpl) setupFlow(

// Create the FlowCtx for the flow.
flowCtx := ds.newFlowContext(
ctx, req.Flow.FlowID, evalCtx, req.TraceKV, req.CollectStats, localState, req.Flow.Gateway == ds.NodeID.SQLInstanceID(),
ctx, req.Flow.FlowID, evalCtx, makeLeaf, req.TraceKV, req.CollectStats, localState, req.Flow.Gateway == ds.NodeID.SQLInstanceID(),
)

// req always contains the desired vectorize mode, regardless of whether we
Expand Down Expand Up @@ -417,7 +417,7 @@ func (ds *ServerImpl) setupFlow(
} else {
// If I haven't created the leaf already, do it now.
if leafTxn == nil {
leafTxn, err = makeLeaf(req)
leafTxn, err = makeLeaf()
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -442,6 +442,7 @@ func (ds *ServerImpl) newFlowContext(
ctx context.Context,
id execinfrapb.FlowID,
evalCtx *eval.Context,
makeLeafTxn func() (*kv.Txn, error),
traceKV bool,
collectStats bool,
localState LocalState,
Expand All @@ -454,6 +455,7 @@ func (ds *ServerImpl) newFlowContext(
ID: id,
EvalCtx: evalCtx,
Txn: evalCtx.Txn,
MakeLeafTxn: makeLeafTxn,
NodeID: ds.ServerConfig.NodeID,
TraceKV: traceKV,
CollectStats: collectStats,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type FlowCtx struct {
// higher-level txn (like backfills).
Txn *kv.Txn

// MakeLeafTxn returns a new LeafTxn, different from Txn.
MakeLeafTxn func() (*kv.Txn, error)

// Descriptors is used to look up leased table descriptors and to construct
// transaction bound TypeResolvers to resolve type references during flow
// setup. It is not safe for concurrent use and is intended to be used only
Expand Down
29 changes: 21 additions & 8 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ type joinReader struct {
keyLocking descpb.ScanLockingStrength
lockWaitPolicy lock.WaitPolicy

// txn is the transaction used by the join reader.
txn *kv.Txn

// usesStreamer indicates whether the joinReader performs the lookups using
// the kvcoord.Streamer API.
// the kvstreamer.Streamer API.
usesStreamer bool
streamerInfo struct {
*kvstreamer.Streamer
Expand Down Expand Up @@ -308,7 +311,7 @@ func newJoinReader(
shouldLimitBatches = false
}
useStreamer := flowCtx.Txn != nil && flowCtx.Txn.Type() == kv.LeafTxn &&
row.CanUseStreamer(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Settings)
flowCtx.MakeLeafTxn != nil && row.CanUseStreamer(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Settings)

jr := &joinReader{
fetchSpec: spec.FetchSpec,
Expand All @@ -329,9 +332,19 @@ func newJoinReader(
jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner}
}

if useStreamer {
var err error
jr.txn, err = flowCtx.MakeLeafTxn()
if err != nil {
return nil, err
}
} else {
jr.txn = flowCtx.Txn
}

// Make sure the key column types are hydrated. The fetched column types will
// be hydrated in ProcessorBase.Init (via joinerBase.init).
resolver := flowCtx.NewTypeResolver(flowCtx.Txn)
resolver := flowCtx.NewTypeResolver(jr.txn)
for i := range spec.FetchSpec.KeyAndSuffixColumns {
if err := typedesc.EnsureTypeIsHydrated(
flowCtx.EvalCtx.Ctx(), spec.FetchSpec.KeyAndSuffixColumns[i].Type, &resolver,
Expand Down Expand Up @@ -411,7 +424,7 @@ func newJoinReader(
lookupExprTypes = append(lookupExprTypes, leftTypes...)
lookupExprTypes = append(lookupExprTypes, rightTypes...)

semaCtx := flowCtx.NewSemaContext(flowCtx.Txn)
semaCtx := flowCtx.NewSemaContext(jr.txn)
if err := jr.lookupExpr.Init(spec.LookupExpr, lookupExprTypes, semaCtx, jr.EvalCtx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -939,7 +952,7 @@ func (jr *joinReader) readInput() (
}
}
err = jr.fetcher.StartScan(
jr.Ctx, jr.FlowCtx.Txn, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
jr.Ctx, jr.txn, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionValues,
)
}
Expand Down Expand Up @@ -1006,7 +1019,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet
bytesLimit = rowinfra.NoBytesLimit
}
if err := jr.fetcher.StartScan(
jr.Ctx, jr.FlowCtx.Txn, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
jr.Ctx, jr.txn, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionValues,
); err != nil {
jr.MoveToDraining(err)
Expand Down Expand Up @@ -1053,7 +1066,7 @@ func (jr *joinReader) Start(ctx context.Context) {
jr.streamerInfo.Streamer = kvstreamer.NewStreamer(
jr.FlowCtx.Cfg.DistSender,
jr.FlowCtx.Stopper(),
jr.FlowCtx.Txn,
jr.txn,
jr.FlowCtx.EvalCtx.Settings,
jr.lockWaitPolicy,
jr.streamerInfo.budgetLimit,
Expand Down Expand Up @@ -1164,7 +1177,7 @@ func (jr *joinReader) generateMeta() []execinfrapb.ProducerMetadata {
meta.Metrics = execinfrapb.GetMetricsMeta()
meta.Metrics.RowsRead = jr.rowsRead
meta.Metrics.BytesRead = jr.fetcher.GetBytesRead()
if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx, jr.FlowCtx.Txn); tfs != nil {
if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx, jr.txn); tfs != nil {
trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs})
}
return trailingMeta
Expand Down

0 comments on commit 6c88496

Please sign in to comment.