Skip to content

Commit

Permalink
Merge #95516 #95522
Browse files Browse the repository at this point in the history
95516: ui: add proper checks and message for insights r=maryliag a=maryliag

Previously, no checks were done on insights when user was not admin, making the page show empty results, even when there was an error retrieving the data.
This commit passes on the proper error message when there is one. It also checks for admin privilege to show option to apply a recommendation.

https://www.loom.com/share/33d002cc9b704a4fad2f9851109b04ee

Epic: CRDB-20388

Release note (ui change): Hide apply option for index recommendation when user is not admin.

95522: sql: fix a possible race between Flow.Cleanup and Flow.Cancel r=yuzefovich a=yuzefovich

This commit fixes a possible race that could occur when `Flow.Cleanup`
is called by the main goroutine concurrently with `Flow.Cancel` by the
listener goroutine (which is not allowed). We already had
synchronization in place, but it was insufficient. In particular, the
following scenario could lead to a nil pointer crash:
- the listener checks whether `Cleanup` has been called, it hasn't, and
the mutex is unlocked;
- the listener is preemptied;
- the main goroutine proceeds to perform the `Cleanup`. At the very end
the flow object is unset (including `ctxCancel` overwritten to `nil`);
- the listener resumes its execution, proceeds to call `Cancel` on the
already-unset `Flow` object, leading to a nil pointer on `ctxCancel`
call.

This is now fixed by holding the mutex through the call to `Cancel` in
the listener goroutine, ensuring that the `Flow` object is not unset
from under the listener. Additionally, this commit clarifies the
callbacks that are performed at the very beginning and very end of
`Cleanup` method.

Fixes: #95527.

Release note: None

Co-authored-by: maryliag <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Jan 19, 2023
3 parents ba572fc + 0bdc210 + 23da047 commit 0775fcc
Show file tree
Hide file tree
Showing 32 changed files with 347 additions and 140 deletions.
4 changes: 4 additions & 0 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ func (f *vectorizedFlow) MemUsage() int64 {

// Cleanup is part of the flowinfra.Flow interface.
func (f *vectorizedFlow) Cleanup(ctx context.Context) {
startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns()
startCleanup()
defer endCleanup()

// This cleans up all the memory and disk monitoring of the vectorized flow
// as well as closes all the closers.
f.creator.cleanup(ctx)
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,16 @@ func (ds *ServerImpl) setupFlow(
) (retCtx context.Context, _ flowinfra.Flow, _ execopnode.OpChains, retErr error) {
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
var monitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
var onFlowCleanup func()
var onFlowCleanupEnd func() // will be called at the very end of Flow.Cleanup()
// Make sure that we clean up all resources (which in the happy case are
// cleaned up in Flow.Cleanup()) if an error is encountered.
defer func() {
if retErr != nil {
if monitor != nil {
monitor.Stop(ctx)
}
if onFlowCleanup != nil {
onFlowCleanup()
if onFlowCleanupEnd != nil {
onFlowCleanupEnd()
} else {
reserved.Close(ctx)
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func (ds *ServerImpl) setupFlow(
// the whole evalContext, but that isn't free, so we choose to restore
// the original state in order to avoid performance regressions.
origTxn := evalCtx.Txn
onFlowCleanup = func() {
onFlowCleanupEnd = func() {
evalCtx.Txn = origTxn
reserved.Close(ctx)
}
Expand All @@ -322,7 +322,7 @@ func (ds *ServerImpl) setupFlow(
evalCtx.Txn = leafTxn
}
} else {
onFlowCleanup = func() {
onFlowCleanupEnd = func() {
reserved.Close(ctx)
}
if localState.IsLocal {
Expand Down Expand Up @@ -388,7 +388,7 @@ func (ds *ServerImpl) setupFlow(
isVectorized := req.EvalContext.SessionData.VectorizeMode != sessiondatapb.VectorizeOff
f := newFlow(
flowCtx, sp, ds.flowRegistry, rowSyncFlowConsumer, batchSyncFlowConsumer,
localState.LocalProcs, isVectorized, onFlowCleanup, req.StatementSQL,
localState.LocalProcs, isVectorized, onFlowCleanupEnd, req.StatementSQL,
)
opt := flowinfra.FuseNormally
if !localState.MustUseLeafTxn() {
Expand Down Expand Up @@ -521,10 +521,10 @@ func newFlow(
batchSyncFlowConsumer execinfra.BatchReceiver,
localProcessors []execinfra.LocalProcessor,
isVectorized bool,
onFlowCleanup func(),
onFlowCleanupEnd func(),
statementSQL string,
) flowinfra.Flow {
base := flowinfra.NewFlowBase(flowCtx, sp, flowReg, rowSyncFlowConsumer, batchSyncFlowConsumer, localProcessors, onFlowCleanup, statementSQL)
base := flowinfra.NewFlowBase(flowCtx, sp, flowReg, rowSyncFlowConsumer, batchSyncFlowConsumer, localProcessors, onFlowCleanupEnd, statementSQL)
if isVectorized {
return colflow.NewVectorizedFlow(base)
}
Expand Down
93 changes: 46 additions & 47 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -100,23 +99,25 @@ type runnerResult struct {
// run executes the request. An error, if encountered, is both sent on the
// result channel and returned.
func (req runnerRequest) run() error {
defer physicalplan.ReleaseFlowSpec(&req.flowReq.Flow)
res := runnerResult{nodeID: req.sqlInstanceID}
defer func() {
req.resultChan <- res
physicalplan.ReleaseFlowSpec(&req.flowReq.Flow)
}()

conn, err := req.podNodeDialer.Dial(req.ctx, roachpb.NodeID(req.sqlInstanceID), rpc.DefaultClass)
if err != nil {
res.err = err
return err
}
client := execinfrapb.NewDistSQLClient(conn)
// TODO(radu): do we want a timeout here?
resp, err := client.SetupFlow(req.ctx, req.flowReq)
if err != nil {
res.err = err
} else {
client := execinfrapb.NewDistSQLClient(conn)
// TODO(radu): do we want a timeout here?
resp, err := client.SetupFlow(req.ctx, req.flowReq)
if err != nil {
res.err = err
} else {
res.err = resp.Error.ErrorDetail(req.ctx)
}
res.err = resp.Error.ErrorDetail(req.ctx)
}
req.resultChan <- res
return res.err
}

Expand Down Expand Up @@ -464,12 +465,13 @@ func (dsp *DistSQLPlanner) setupFlows(

// Start all the remote flows.
//
// numAsyncRequests tracks the number of the SetupFlow RPCs that were
// delegated to the DistSQL runner goroutines.
var numAsyncRequests int
// numSerialRequests tracks the number of the SetupFlow RPCs that were
// issued by the current goroutine on its own.
var numSerialRequests int
// usedWorker indicates whether we used at least one DistSQL worker
// goroutine to issue the SetupFlow RPC.
var usedWorker bool
// numIssuedRequests tracks the number of the SetupFlow RPCs that were
// issued (either by the current goroutine directly or delegated to the
// DistSQL workers).
var numIssuedRequests int
if sp := tracing.SpanFromContext(origCtx); sp != nil && !sp.IsNoop() {
setupReq.TraceInfo = sp.Meta().ToProto()
}
Expand Down Expand Up @@ -515,7 +517,7 @@ func (dsp *DistSQLPlanner) setupFlows(
//
// Note that even in case of an error in runnerRequest.run we still
// send on the result channel.
for i := 0; i < numAsyncRequests+numSerialRequests; i++ {
for i := 0; i < numIssuedRequests; i++ {
<-resultChan
}
// At this point, we know that all concurrent requests (if there
Expand All @@ -541,11 +543,11 @@ func (dsp *DistSQLPlanner) setupFlows(

// Send out a request to the workers; if no worker is available, run
// directly.
numIssuedRequests++
select {
case dsp.runnerCoordinator.runnerChan <- runReq:
numAsyncRequests++
usedWorker = true
default:
numSerialRequests++
// Use the context of the local flow since we're executing this
// SetupFlow RPC synchronously.
runReq.ctx = ctx
Expand All @@ -554,16 +556,8 @@ func (dsp *DistSQLPlanner) setupFlows(
}
}
}
if buildutil.CrdbTestBuild {
if numAsyncRequests+numSerialRequests != len(flows)-1 {
return ctx, flow, errors.AssertionFailedf(
"expected %d requests, found only %d async and %d serial",
len(flows)-1, numAsyncRequests, numSerialRequests,
)
}
}

if numAsyncRequests == 0 {
if !usedWorker {
// We executed all SetupFlow RPCs in the current goroutine, and all RPCs
// succeeded.
return ctx, flow, nil
Expand All @@ -586,7 +580,7 @@ func (dsp *DistSQLPlanner) setupFlows(
syncutil.Mutex
called bool
}{}
flow.AddOnCleanup(func() {
flow.AddOnCleanupStart(func() {
cleanupCalledMu.Lock()
defer cleanupCalledMu.Unlock()
cleanupCalledMu.called = true
Expand All @@ -605,24 +599,29 @@ func (dsp *DistSQLPlanner) setupFlows(
for i := 0; i < len(flows)-1; i++ {
res := <-resultChan
if res.err != nil && !seenError {
seenError = true
// The setup of at least one remote flow failed.
cleanupCalledMu.Lock()
skipCancel := cleanupCalledMu.called
cleanupCalledMu.Unlock()
if skipCancel {
continue
}
// First, we update the DistSQL receiver with the error to be
// returned to the client eventually.
//
// In order to not protect DistSQLReceiver.status with a mutex,
// we do not update the status here and, instead, rely on the
// DistSQLReceiver detecting the error the next time an object
// is pushed into it.
recv.setErrorWithoutStatusUpdate(res.err, true /* willDeferStatusUpdate */)
// Now explicitly cancel the local flow.
flow.Cancel()
seenError = true
func() {
cleanupCalledMu.Lock()
// Flow.Cancel cannot be called after or concurrently with
// Flow.Cleanup.
defer cleanupCalledMu.Unlock()
if cleanupCalledMu.called {
// Cleanup of the local flow has already been performed,
// so there is nothing to do.
return
}
// First, we update the DistSQL receiver with the error to
// be returned to the client eventually.
//
// In order to not protect DistSQLReceiver.status with a
// mutex, we do not update the status here and, instead,
// rely on the DistSQLReceiver detecting the error the next
// time an object is pushed into it.
recv.setErrorWithoutStatusUpdate(res.err, true /* willDeferStatusUpdate */)
// Now explicitly cancel the local flow.
flow.Cancel()
}()
}
}
})
Expand Down
52 changes: 35 additions & 17 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,14 @@ type Flow interface {
// Cleanup.
Cancel()

// AddOnCleanup adds a callback to be executed at the very end of Cleanup.
// Callbacks are put on the stack meaning that AddOnCleanup is called
// multiple times, then the "later" callbacks are executed first.
AddOnCleanup(fn func())
// AddOnCleanupStart adds a callback to be executed at the very beginning of
// Cleanup.
AddOnCleanupStart(fn func())

// GetOnCleanupFns returns a couple of functions that should be called at
// the very beginning and the very end of Cleanup, respectively. Both will
// be non-nil.
GetOnCleanupFns() (startCleanup, endCleanup func())

// Cleanup must be called whenever the flow is done (meaning it either
// completes gracefully after all processors and mailboxes exited or an
Expand Down Expand Up @@ -200,7 +204,10 @@ type FlowBase struct {
// - outboxes
waitGroup sync.WaitGroup

onFlowCleanup func()
// onCleanupStart and onCleanupEnd will be called in the very beginning and
// the very end of Cleanup(), respectively.
onCleanupStart func()
onCleanupEnd func()

statementSQL string

Expand Down Expand Up @@ -276,7 +283,7 @@ func NewFlowBase(
rowSyncFlowConsumer execinfra.RowReceiver,
batchSyncFlowConsumer execinfra.BatchReceiver,
localProcessors []execinfra.LocalProcessor,
onFlowCleanup func(),
onFlowCleanupEnd func(),
statementSQL string,
) *FlowBase {
// We are either in a single tenant cluster, or a SQL node in a multi-tenant
Expand All @@ -300,7 +307,7 @@ func NewFlowBase(
batchSyncFlowConsumer: batchSyncFlowConsumer,
localProcessors: localProcessors,
admissionInfo: admissionInfo,
onFlowCleanup: onFlowCleanup,
onCleanupEnd: onFlowCleanupEnd,
status: flowNotStarted,
statementSQL: statementSQL,
}
Expand Down Expand Up @@ -527,17 +534,31 @@ func (f *FlowBase) Cancel() {
f.ctxCancel()
}

// AddOnCleanup is part of the Flow interface.
func (f *FlowBase) AddOnCleanup(fn func()) {
if f.onFlowCleanup != nil {
oldOnFlowCleanup := f.onFlowCleanup
f.onFlowCleanup = func() {
// AddOnCleanupStart is part of the Flow interface.
func (f *FlowBase) AddOnCleanupStart(fn func()) {
if f.onCleanupStart != nil {
oldOnCleanupStart := f.onCleanupStart
f.onCleanupStart = func() {
fn()
oldOnFlowCleanup()
oldOnCleanupStart()
}
} else {
f.onFlowCleanup = fn
f.onCleanupStart = fn
}
}

var noopFn = func() {}

// GetOnCleanupFns is part of the Flow interface.
func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) {
onCleanupStart, onCleanupEnd := f.onCleanupStart, f.onCleanupEnd
if onCleanupStart == nil {
onCleanupStart = noopFn
}
if onCleanupEnd == nil {
onCleanupEnd = noopFn
}
return onCleanupStart, onCleanupEnd
}

// Cleanup is part of the Flow interface.
Expand Down Expand Up @@ -594,9 +615,6 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
}
f.status = flowFinished
f.ctxCancel()
if f.onFlowCleanup != nil {
f.onFlowCleanup()
}
}

// cancel cancels all unconnected streams of this flow. This function is called
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/rowflow/row_based_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ func (f *rowBasedFlow) Release() {

// Cleanup is part of the flowinfra.Flow interface.
func (f *rowBasedFlow) Cleanup(ctx context.Context) {
startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns()
startCleanup()
defer endCleanup()
f.FlowBase.Cleanup(ctx)
f.Release()
}
Expand Down
Loading

0 comments on commit 0775fcc

Please sign in to comment.