Skip to content

Commit

Permalink
Merge #38281
Browse files Browse the repository at this point in the history
38281: sql: add support for executing postqueries r=yuzefovich a=yuzefovich

This commit introduces infrastructure for running "deferred
subqueries" that are to be executed after the execution of the main
query tree which is needed for (among other things) for foreign key
checks.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jun 24, 2019
2 parents adf264a + c0b1cc3 commit 225a320
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 5 deletions.
18 changes: 16 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,15 +877,19 @@ func (ex *connExecutor) execWithDistSQLEngine(
planCtx.planner = planner
planCtx.stmtType = recv.stmtType

if len(planner.curPlan.subqueryPlans) != 0 {
var evalCtxFactory func() *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 || len(planner.curPlan.postqueryPlans) != 0 {
var evalCtx extendedEvalContext
ex.initEvalCtx(ctx, &evalCtx, planner)
evalCtxFactory := func() *extendedEvalContext {
evalCtxFactory = func() *extendedEvalContext {
ex.resetEvalCtx(&evalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
evalCtx.Placeholders = &planner.semaCtx.Placeholders
evalCtx.Annotations = &planner.semaCtx.Annotations
return &evalCtx
}
}

if len(planner.curPlan.subqueryPlans) != 0 {
if !ex.server.cfg.DistSQLPlanner.PlanAndRunSubqueries(
ctx, planner, evalCtxFactory, planner.curPlan.subqueryPlans, recv, distribute,
) {
Expand All @@ -897,6 +901,16 @@ func (ex *connExecutor) execWithDistSQLEngine(
// the planner whether or not to plan remote table readers.
ex.server.cfg.DistSQLPlanner.PlanAndRun(
ctx, evalCtx, planCtx, planner.txn, planner.curPlan.plan, recv)
if recv.commErr != nil {
return recv.bytesRead, recv.rowsRead, recv.commErr
}

if len(planner.curPlan.postqueryPlans) != 0 {
ex.server.cfg.DistSQLPlanner.PlanAndRunPostqueries(
ctx, planner, evalCtxFactory, planner.curPlan.postqueryPlans, recv, distribute,
)
}

return recv.bytesRead, recv.rowsRead, recv.commErr
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,10 @@ type PlanningCtx struct {
isLocal bool
planner *planner
// ignoreClose, when set to true, will prevent the closing of the planner's
// current plan. The top-level query needs to close it, but everything else
// (like subqueries or EXPLAIN ANALYZE) should set this to true to avoid
// double closes of the planNode tree.
// current plan. Only the top-level query needs to close it, but everything
// else (like subqueries or EXPLAIN ANALYZE) should set this to true to avoid
// double closes of the planNode tree. Postqueries also need to set it to
// true, and they are responsible for closing their own plan.
ignoreClose bool
stmtType tree.StatementType
// planDepth is set to the current depth of the planNode tree. It's used to
Expand Down
113 changes: 113 additions & 0 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,3 +848,116 @@ func (dsp *DistSQLPlanner) PlanAndRun(
dsp.FinalizePlan(planCtx, &physPlan)
dsp.Run(planCtx, txn, &physPlan, recv, evalCtx, nil /* finishedSetupFn */)
}

// PlanAndRunPostqueries returns false if an error was encountered and sets
// that error in the provided receiver.
func (dsp *DistSQLPlanner) PlanAndRunPostqueries(
ctx context.Context,
planner *planner,
evalCtxFactory func() *extendedEvalContext,
postqueryPlans []postquery,
recv *DistSQLReceiver,
maybeDistribute bool,
) bool {
for _, postqueryPlan := range postqueryPlans {
if err := dsp.planAndRunPostquery(
ctx,
postqueryPlan,
planner,
evalCtxFactory(),
recv,
maybeDistribute,
); err != nil {
recv.SetError(err)
return false
}
}

return true
}

func (dsp *DistSQLPlanner) planAndRunPostquery(
ctx context.Context,
postqueryPlan postquery,
planner *planner,
evalCtx *extendedEvalContext,
recv *DistSQLReceiver,
maybeDistribute bool,
) error {
postqueryMonitor := mon.MakeMonitor(
"postquery",
mon.MemoryResource,
dsp.distSQLSrv.Metrics.CurBytesCount,
dsp.distSQLSrv.Metrics.MaxBytesHist,
-1, /* use default block size */
noteworthyMemoryUsageBytes,
dsp.distSQLSrv.Settings,
)
postqueryMonitor.Start(ctx, evalCtx.Mon, mon.BoundAccount{})
defer postqueryMonitor.Stop(ctx)

postqueryMemAccount := postqueryMonitor.MakeBoundAccount()
defer postqueryMemAccount.Close(ctx)

var postqueryPlanCtx *PlanningCtx
var distributePostquery bool
if maybeDistribute {
distributePostquery = shouldDistributePlan(
ctx, planner.SessionData().DistSQLMode, dsp, postqueryPlan.plan)
}
if distributePostquery {
postqueryPlanCtx = dsp.NewPlanningCtx(ctx, evalCtx, planner.txn)
} else {
postqueryPlanCtx = dsp.newLocalPlanningCtx(ctx, evalCtx)
}

postqueryPlanCtx.isLocal = !distributePostquery
postqueryPlanCtx.planner = planner
postqueryPlanCtx.stmtType = tree.Rows
// We cannot close the postqueries' plans through the plan top since
// postqueries haven't been executed yet, so we manually close each postquery
// plan right after the execution.
postqueryPlanCtx.ignoreClose = true
defer postqueryPlan.plan.Close(ctx)

postqueryPhysPlan, err := dsp.createPlanForNode(postqueryPlanCtx, postqueryPlan.plan)
if err != nil {
return err
}
dsp.FinalizePlan(postqueryPlanCtx, &postqueryPhysPlan)

postqueryRecv := recv.clone()
var postqueryRowReceiver postqueryRowResultWriter
postqueryRecv.resultWriter = &postqueryRowReceiver
dsp.Run(postqueryPlanCtx, planner.txn, &postqueryPhysPlan, postqueryRecv, evalCtx, nil /* finishedSetupFn */)
if postqueryRecv.commErr != nil {
return postqueryRecv.commErr
}
return postqueryRowReceiver.Err()
}

// postqueryRowResultWriter is a lightweight version of RowResultWriter that
// can only write errors. It is used only for executing postqueries and is
// sufficient for that case since those can only return errors.
type postqueryRowResultWriter struct {
err error
}

var _ rowResultWriter = &postqueryRowResultWriter{}

func (r *postqueryRowResultWriter) AddRow(ctx context.Context, row tree.Datums) error {
return errors.Errorf("unexpectedly AddRow is called on postqueryRowResultWriter")
}

func (r *postqueryRowResultWriter) IncrementRowsAffected(n int) {
// TODO(yuzefovich): this probably will need to change when we support
// cascades.
}

func (r *postqueryRowResultWriter) SetError(err error) {
r.err = err
}

func (r *postqueryRowResultWriter) Err() error {
return r.err
}
10 changes: 10 additions & 0 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ type planTop struct {
// subqueryPlans contains all the sub-query plans.
subqueryPlans []subquery

// postqueryPlans contains all the plans for subqueries that are to be
// executed after the main query (for example, foreign key checks).
postqueryPlans []postquery

// auditEvents becomes non-nil if any of the descriptors used by
// current statement is causing an auditing event. See exec_log.go.
auditEvents []auditEvent
Expand All @@ -319,6 +323,12 @@ type planTop struct {
avoidBuffering bool
}

// postquery is a query tree that is executed after the main one. It can only
// return an error (for example, foreign key violation).
type postquery struct {
plan planNode
}

// makePlan implements the planMaker interface. It populates the
// planner's curPlan field.
//
Expand Down

0 comments on commit 225a320

Please sign in to comment.