Skip to content

Commit

Permalink
distsqlrun: fix data race on evalCtx.iVarContainerStack
Browse files Browse the repository at this point in the history
Previously, it was possible to run into a data race on the slice
evalCtx.iVarContainerStack because we were simply using a copy by
dereference to create a new eval context. However, this is not
sufficient since the slices still point to the same underlying
memory, so now we make a deeper copy of the slice.

Release note: None
  • Loading branch information
yuzefovich committed Apr 8, 2019
1 parent 2eb3f60 commit d5a068d
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 17 deletions.
11 changes: 4 additions & 7 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,7 @@ func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planTop) error
if !params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRunSubqueries(
params.ctx,
params.p,
func() *extendedEvalContext {
ret := *params.extendedEvalCtx
return &ret
},
params.extendedEvalCtx.copy,
plan.subqueryPlans,
recv,
true,
Expand All @@ -321,8 +318,8 @@ func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planTop) error
}

// Make a copy of the EvalContext so it can be safely modified.
evalCtx := *params.p.ExtendedEvalContext()
planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.newLocalPlanningCtx(params.ctx, &evalCtx)
evalCtx := params.p.ExtendedEvalContextCopy()
planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.newLocalPlanningCtx(params.ctx, evalCtx)
// Always plan local.
planCtx.isLocal = true
plannerCopy := *params.p
Expand All @@ -332,7 +329,7 @@ func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planTop) error
planCtx.stmtType = recv.stmtType

params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun(
params.ctx, &evalCtx, planCtx, params.p.Txn(), plan.plan, recv)
params.ctx, evalCtx, planCtx, params.p.Txn(), plan.plan, recv)
if recv.commErr != nil {
return recv.commErr
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ type FlowCtx struct {
// them at runtime to ensure expressions are evaluated with the correct indexed
// var context.
func (ctx *FlowCtx) NewEvalCtx() *tree.EvalContext {
evalCtx := *ctx.EvalCtx
return &evalCtx
return ctx.EvalCtx.Copy()
}

// TestingKnobs returns the distsql testing knobs for this flow context.
Expand Down
9 changes: 3 additions & 6 deletions pkg/sql/explain_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error {
if !distSQLPlanner.PlanAndRunSubqueries(
planCtx.ctx,
params.p,
func() *extendedEvalContext {
ret := *params.extendedEvalCtx
return &ret
},
params.extendedEvalCtx.copy,
n.subqueryPlans,
recv,
true,
Expand Down Expand Up @@ -181,10 +178,10 @@ func (n *explainDistSQLNode) startExec(params runParams) error {
planCtx.ctx = ctx
// Make a copy of the evalContext with the recording span in it; we can't
// change the original.
newEvalCtx := *params.extendedEvalCtx
newEvalCtx := params.extendedEvalCtx.copy()
newEvalCtx.Context = ctx
newParams := params
newParams.extendedEvalCtx = &newEvalCtx
newParams.extendedEvalCtx = newEvalCtx

// Discard rows that are returned.
rw := newCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func (n *insertNode) processSourceRow(params runParams, sourceVals tree.Datums)
n.run.computeExprs,
n.run.insertCols,
n.run.computedCols,
*params.EvalContext(),
*params.EvalContext().Copy(),
n.run.ti.tableDesc(),
sourceVals,
&n.run.iVarContainerForComputedCols,
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ type extendedEvalContext struct {
schemaAccessors *schemaInterface
}

// copy returns a deep copy of ctx.
func (ctx *extendedEvalContext) copy() *extendedEvalContext {
cpy := *ctx
cpy.EvalContext = *ctx.EvalContext.Copy()
return &cpy
}

// schemaInterface provides access to the database and table descriptors.
// See schema_accessors.go.
type schemaInterface struct {
Expand Down Expand Up @@ -332,10 +339,15 @@ func (p *planner) LogicalSchemaAccessor() SchemaAccessor {
return p.extendedEvalCtx.schemaAccessors.logical
}

// Note: if the context will be modified, use ExtendedEvalContextCopy instead.
func (p *planner) ExtendedEvalContext() *extendedEvalContext {
return &p.extendedEvalCtx
}

func (p *planner) ExtendedEvalContextCopy() *extendedEvalContext {
return p.extendedEvalCtx.copy()
}

func (p *planner) CurrentDatabase() string {
return p.SessionData().Database
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/sem/tree/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,14 @@ func MakeTestingEvalContextWithMon(st *cluster.Settings, monitor *mon.BytesMonit
return ctx
}

// Copy returns a deep copy of ctx.
func (ctx *EvalContext) Copy() *EvalContext {
ctxCopy := *ctx
ctxCopy.iVarContainerStack = make([]IndexedVarContainer, len(ctx.iVarContainerStack), cap(ctx.iVarContainerStack))
copy(ctxCopy.iVarContainerStack, ctx.iVarContainerStack)
return &ctxCopy
}

// PushIVarContainer replaces the current IVarContainer with a different one -
// pushing the current one onto a stack to be replaced later once
// PopIVarContainer is called.
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/sem/tree/eval_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package tree
import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

func TestUnescapePattern(t *testing.T) {
Expand Down Expand Up @@ -119,3 +121,17 @@ func TestReplaceUnescaped(t *testing.T) {
})
}
}

// TestEvalContextCopy verifies that EvalContext.Copy() produces a deep copy of
// EvalContext.
func TestEvalContextCopy(t *testing.T) {
defer leaktest.AfterTest(t)()
// Note: the test relies on "parent" EvalContext having non-nil and non-empty
// iVarContainerStack.
ctx := EvalContext{iVarContainerStack: make([]IndexedVarContainer, 1)}

cpy := ctx.Copy()
if &ctx.iVarContainerStack[0] == &cpy.iVarContainerStack[0] {
t.Fatal("iVarContainerStacks are the same")
}
}
2 changes: 1 addition & 1 deletion pkg/sql/upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (n *upsertNode) processSourceRow(params runParams, sourceVals tree.Datums)
n.run.computeExprs,
n.run.insertCols,
n.run.computedCols,
*params.EvalContext(),
*params.EvalContext().Copy(),
n.run.tw.tableDesc(),
sourceVals,
&n.run.iVarContainerForComputedCols,
Expand Down

0 comments on commit d5a068d

Please sign in to comment.