Skip to content

Commit

Permalink
Merge #36687
Browse files Browse the repository at this point in the history
36687: sql: fix EXPLAIN (DISTSQL) for CREATE STATISTICS r=rytaft a=rytaft

`EXPLAIN (DISTSQL)` has been broken for `CREATE STATISTICS` ever since
`CREATE STATISTICS` was converted to use the jobs framework. Instead
of showing the DistSQL plan, it has only shown the local node
corresponding to the function that started the job. This commit fixes
`EXPLAIN (DISTSQL)` for `CREATE STATISTICS` so it shows the actual
DistSQL plan used for the job.

Fixes #36677

Release note (sql change): EXPLAIN (DISTSQL) CREATE STATISTICS now
shows the DistSQL plan used by the CREATE STATISTICS job.

Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
craig[bot] and rytaft committed Apr 10, 2019
2 parents 40b96bf + b0d31e7 commit ec5c3aa
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 47 deletions.
113 changes: 71 additions & 42 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,48 @@ func (*createStatsNode) Values() tree.Datums { return nil }

// startJob starts a CreateStats job to plan and execute statistics creation.
func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Datums) error {
record, err := n.makeJobRecord(ctx)
if err != nil {
return err
}

if n.Name == stats.AutoStatsName {
// Don't start the job if there is already a CREATE STATISTICS job running.
// (To handle race conditions we check this again after the job starts,
// but this check is used to prevent creating a large number of jobs that
// immediately fail).
if err := checkRunningJobs(ctx, nil /* job */, n.p); err != nil {
return err
}
} else {
telemetry.Inc(sqltelemetry.CreateStatisticsUseCounter)
}

job, errCh, err := n.p.ExecCfg().JobRegistry.StartJob(ctx, resultsCh, *record)
if err != nil {
return err
}

if err = <-errCh; err != nil {
pgerr, ok := errors.Cause(err).(*pgerror.Error)
if ok && pgerr.Code == pgerror.CodeLockNotAvailableError {
// Delete the job so users don't see it and get confused by the error.
const stmt = `DELETE FROM system.jobs WHERE id = $1`
if _ /* cols */, delErr := n.p.ExecCfg().InternalExecutor.Exec(
ctx, "delete-job", nil /* txn */, stmt, *job.ID(),
); delErr != nil {
log.Warningf(ctx, "failed to delete job: %v", delErr)
}
}
}
return err
}

// makeJobRecord creates a CreateStats job record which can be used to plan and
// execute statistics creation.
func (n *createStatsNode) makeJobRecord(ctx context.Context) (*jobs.Record, error) {
if !n.p.ExecCfg().Settings.Version.IsActive(cluster.VersionCreateStats) {
return pgerror.NewErrorf(pgerror.CodeObjectNotInPrerequisiteStateError,
return nil, pgerror.NewErrorf(pgerror.CodeObjectNotInPrerequisiteStateError,
`CREATE STATISTICS requires all nodes to be upgraded to %s`,
cluster.VersionByKey(cluster.VersionCreateStats),
)
Expand All @@ -120,7 +160,7 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da
// caching disabled, like other DDL statements.
tableDesc, err = ResolveExistingObject(ctx, n.p, t, true /*required*/, requireTableDesc)
if err != nil {
return err
return nil, err
}
fqTableName = t.FQString()

Expand All @@ -130,42 +170,46 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da
}}
tableDesc, err = n.p.Tables().getTableVersionByID(ctx, n.p.txn, sqlbase.ID(t.TableID), flags)
if err != nil {
return err
return nil, err
}
fqTableName, err = n.p.getQualifiedTableName(ctx, &tableDesc.TableDescriptor)
if err != nil {
return err
return nil, err
}
}

if tableDesc.IsVirtualTable() {
return pgerror.NewError(pgerror.CodeWrongObjectTypeError, "cannot create statistics on virtual tables")
return nil, pgerror.NewError(
pgerror.CodeWrongObjectTypeError, "cannot create statistics on virtual tables",
)
}

if tableDesc.IsView() {
return pgerror.NewError(pgerror.CodeWrongObjectTypeError, "cannot create statistics on views")
return nil, pgerror.NewError(
pgerror.CodeWrongObjectTypeError, "cannot create statistics on views",
)
}

if err := n.p.CheckPrivilege(ctx, tableDesc, privilege.SELECT); err != nil {
return err
return nil, err
}

// Identify which columns we should create statistics for.
var createStatsColLists []jobspb.CreateStatsDetails_ColList
if len(n.ColumnNames) == 0 {
if createStatsColLists, err = createStatsDefaultColumns(tableDesc); err != nil {
return err
return nil, err
}
} else {
columns, err := tableDesc.FindActiveColumnsByNames(n.ColumnNames)
if err != nil {
return err
return nil, err
}

columnIDs := make([]sqlbase.ColumnID, len(columns))
for i := range columns {
if columns[i].Type.SemanticType == sqlbase.ColumnType_JSONB {
return pgerror.UnimplementedWithIssueErrorf(35844,
return nil, pgerror.UnimplementedWithIssueErrorf(35844,
"CREATE STATISTICS is not supported for JSON columns")
}
columnIDs[i] = columns[i].ID
Expand All @@ -178,23 +222,11 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da
if n.Options.AsOf.Expr != nil {
asOfTs, err := n.p.EvalAsOfTimestamp(n.Options.AsOf)
if err != nil {
return err
return nil, err
}
asOf = &asOfTs
}

if n.Name == stats.AutoStatsName {
// Don't start the job if there is already a CREATE STATISTICS job running.
// (To handle race conditions we check this again after the job starts,
// but this check is used to prevent creating a large number of jobs that
// immediately fail).
if err := checkRunningJobs(ctx, nil /* job */, n.p); err != nil {
return err
}
} else {
telemetry.Inc(sqltelemetry.CreateStatisticsUseCounter)
}

// Create a job to run statistics creation.
statement := tree.AsStringWithFlags(n, tree.FmtAlwaysQualifyTableNames)
var description string
Expand All @@ -207,7 +239,7 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da
description = statement
statement = ""
}
job, errCh, err := n.p.ExecCfg().JobRegistry.StartJob(ctx, resultsCh, jobs.Record{
return &jobs.Record{
Description: description,
Statement: statement,
Username: n.p.User(),
Expand All @@ -221,24 +253,7 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da
MaxFractionIdle: n.Options.Throttling,
},
Progress: jobspb.CreateStatsProgress{},
})
if err != nil {
return err
}

if err = <-errCh; err != nil {
pgerr, ok := errors.Cause(err).(*pgerror.Error)
if ok && pgerr.Code == pgerror.CodeLockNotAvailableError {
// Delete the job so users don't see it and get confused by the error.
const stmt = `DELETE FROM system.jobs WHERE id = $1`
if _ /* cols */, delErr := n.p.ExecCfg().InternalExecutor.Exec(
ctx, "delete-job", nil /* txn */, stmt, *job.ID(),
); delErr != nil {
log.Warningf(ctx, "failed to delete job: %v", delErr)
}
}
}
return err
}, nil
}

// maxNonIndexCols is the maximum number of non-index columns that we will use
Expand Down Expand Up @@ -303,6 +318,20 @@ func createStatsDefaultColumns(
return columns, nil
}

// makePlanForExplainDistSQL is part of the distSQLExplainable interface.
func (n *createStatsNode) makePlanForExplainDistSQL(
planCtx *PlanningCtx, distSQLPlanner *DistSQLPlanner,
) (PhysicalPlan, error) {
// Create a job record but don't actually start the job.
record, err := n.makeJobRecord(planCtx.ctx)
if err != nil {
return PhysicalPlan{}, err
}
job := n.p.ExecCfg().JobRegistry.NewJob(*record)

return distSQLPlanner.createPlanForCreateStats(planCtx, job)
}

// createStatsResumer implements the jobs.Resumer interface for CreateStats
// jobs. A new instance is created for each job. evalCtx is populated inside
// createStatsResumer.Resume so it can be used in createStatsResumer.OnSuccess
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/distsql_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,18 @@ func (dsp *DistSQLPlanner) createStatsPlan(
))
}

var jobID int64
if job.ID() != nil {
jobID = *job.ID()
}

// Set up the final SampleAggregator stage.
agg := &distsqlpb.SampleAggregatorSpec{
Sketches: sketchSpecs,
SampleSize: sampler.SampleSize,
SampledColumnIDs: sampledColumnIDs,
TableID: desc.ID,
JobID: *job.ID(),
JobID: jobID,
RowsExpected: rowsExpected,
}
// Plan the SampleAggregator on the gateway, unless we have a single Sampler.
Expand Down
27 changes: 25 additions & 2 deletions pkg/sql/explain_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ type explainDistSQLRun struct {
executedStatement bool
}

// distSQLExplainable is an interface used for local plan nodes that create
// distributed jobs. The plan node should implement this interface so that
// EXPLAIN (DISTSQL) will show the DistSQL plan instead of the local plan node.
type distSQLExplainable interface {
// makePlanForExplainDistSQL returns the DistSQL physical plan that can be
// used by the explainDistSQLNode to generate flow specs (and run in the case
// of EXPLAIN ANALYZE).
makePlanForExplainDistSQL(*PlanningCtx, *DistSQLPlanner) (PhysicalPlan, error)
}

func (n *explainDistSQLNode) startExec(params runParams) error {
if n.analyze && params.SessionData().DistSQLMode == sessiondata.DistSQLOff {
return pgerror.NewErrorf(
Expand All @@ -73,7 +83,13 @@ func (n *explainDistSQLNode) startExec(params runParams) error {
params.p.prepareForDistSQLSupportCheck()

distSQLPlanner := params.extendedEvalCtx.DistSQLPlanner
recommendation, _ := distSQLPlanner.checkSupportForNode(n.plan)

var recommendation distRecommendation
if _, ok := n.plan.(distSQLExplainable); ok {
recommendation = shouldDistribute
} else {
recommendation, _ = distSQLPlanner.checkSupportForNode(n.plan)
}

planCtx := distSQLPlanner.NewPlanningCtx(params.ctx, params.extendedEvalCtx, params.p.txn)
planCtx.isLocal = !shouldDistributeGivenRecAndMode(recommendation, params.SessionData().DistSQLMode)
Expand Down Expand Up @@ -142,10 +158,17 @@ func (n *explainDistSQLNode) startExec(params runParams) error {
}
}

plan, err := distSQLPlanner.createPlanForNode(planCtx, n.plan)
var plan PhysicalPlan
var err error
if planNode, ok := n.plan.(distSQLExplainable); ok {
plan, err = planNode.makePlanForExplainDistSQL(planCtx, distSQLPlanner)
} else {
plan, err = distSQLPlanner.createPlanForNode(planCtx, n.plan)
}
if err != nil {
return err
}

distSQLPlanner.FinalizePlan(planCtx, &plan)

flows := plan.GenerateFlowSpecs(params.extendedEvalCtx.NodeID)
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/logictest/testdata/planner_test/distsql_misc
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,14 @@ NULL /1 {1} 1
query T
SELECT url FROM [EXPLAIN (DISTSQL) CREATE STATISTICS s1 ON a FROM data]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyMj7FqxjAMhPc-RbjZUGf1G2RpS9fiwdgiGFwrWAoUgt-9xB7-NeN9p7tDFyon-gi_JHA_WOENjsaRRLjdaB5s6Q_OGuR6nHpjbxC5EdwFzVoIDoVjKEtsFJQW0aBZNEdZ7LuFQSINuYxoN-BTX0WiYSe4tZvnY98kB1ehR822ewNKO82HhM8W6atxHDNTfo7cAIlEp7tOsdVpdd_f_gMAAP__6dpl3g==
https://cockroachdb.github.io/distsqlplan/decode.html#eJy0lMGOmzAQhu99CjRnI2NDslmftse9dKtNbxUHF48oEsHINlLbFe9eAUJboo2x5PSInT_fr2-seYNOK_wiL2hBfAcGBDgQyIFAAQQOUBLoja7QWm2mnyyBZ_ULREag6frBTcclgUobBPEGrnEtgoBv8keLrygVGpoBAYVONu2M6U1zkeb3k5JOAoFzLzsrkpSyRHYqYYl2P9EAgZfBieSJQTkS0IN7Z1knawTBRhLe5ywvfYuGHrZdluNz8wdFMtU8O-mH8pvQd9bQaaPQoNqwyvFmrc91bbCWThvKssiC-aYgC58SC5kSZSnlUXPaabTO6XjXOfFwDTxIA09pHqVhp9Gq4eGuGvJwDXmQhjylRZSGnUarhtNdNRThGoogDUVKD1EadhqtGh7_2_L6APqKttedxasl9vE_Z9NyQ1XjsgmtHkyFX42uZszy-TLn5gOF1i23bPl47parqeC_YeYN802YXYe5n7yDzr3pwh8uYnofvOGjn3yMIT94wyc_-RRDfvTPKtt5Jv5Hds0ux09_AwAA___WSOum


statement ok
INSERT INTO data SELECT a, b, c::FLOAT, 1
FROM generate_series(1,10) AS a, generate_series(1,10) AS b, generate_series(1,10) AS c;

query T
SELECT url FROM [EXPLAIN ANALYZE (DISTSQL) CREATE STATISTICS s1 ON a FROM data]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJy0lU9vnDAQxe_9FNacvQIDu9n4lPaWS1Nle6s4OOspQQWMbKNmG-13r8BdJewf4yrLkfHMvOf3Q_IrNEriV1GjAf4DGFBIgEIKFDKgsIScQqvVFo1Rum9xA_fyBXhMoWzazvblnMJWaQT-Cra0FQKH7-KpwkcUEnUUAwWJVpTVINPqshZ6dyeFFUBh04rGcLKIGBGNJIwo-4waKDx0lpO73pRWvw3RKCQnLO63GSuqitiyRk5iM-44c57vKajOvrk1VhQInO1p-I02om4r1NFyfBtX3pR_0ElvrHC2L4kmF0XftLpGaYka5Ugr31-09bkoNBbCKh2x-IMG05FBFs6ZhXCO2CJKQkgn85GeuNOB9OqqpJPwIJOgIJNFlM4WJIVavJAaa6V3pDPYd5Ev_8qyNL_eFcNCn7j_IfSbq4aehoeeBoWeLqLsA6FX-NOSo-SH2imesng-aXXFk15jlUZJTCmRk6FnBn4TUR74ra_KLwvnlwXxyxbRMoRfeo5fWFATng9B3c72jpwRfUTTqsbg0XtyfnPcvzMoC3SPklGd3uI3rbaDjPt8GOaGgkRj3SlzH_eNO-oNvh9m3uFkNMyOhxO_8oR06p3O_MPZ__geUhwChaedRUMMNrZHmh-vXXrXrvyeVvN4uvGuXfs9refxdOsnH0_8dP5fNtxVvv_0NwAA___Cd5vl
11 changes: 10 additions & 1 deletion pkg/sql/opt/exec/execbuilder/testdata/distsql_misc
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,13 @@ NULL /1 {1} 1
query T
SELECT url FROM [EXPLAIN (DISTSQL) CREATE STATISTICS s1 ON a FROM data]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyMj7FqxjAMhPc-RbjZUGf1G2RpS9fiwdgiGFwrWAoUgt-9xB7-NeN9p7tDFyon-gi_JHA_WOENjsaRRLjdaB5s6Q_OGuR6nHpjbxC5EdwFzVoIDoVjKEtsFJQW0aBZNEdZ7LuFQSINuYxoN-BTX0WiYSe4tZvnY98kB1ehR822ewNKO82HhM8W6atxHDNTfo7cAIlEp7tOsdVpdd_f_gMAAP__6dpl3g==
https://cockroachdb.github.io/distsqlplan/decode.html#eJy0lMGOmzAQhu99CjRnI2NDslmftse9dKtNbxUHF48oEsHINlLbFe9eAUJboo2x5PSInT_fr2-seYNOK_wiL2hBfAcGBDgQyIFAAQQOUBLoja7QWm2mnyyBZ_ULREag6frBTcclgUobBPEGrnEtgoBv8keLrygVGpoBAYVONu2M6U1zkeb3k5JOAoFzLzsrkpSyRHYqYYl2P9EAgZfBieSJQTkS0IN7Z1knawTBRhLe5ywvfYuGHrZdluNz8wdFMtU8O-mH8pvQd9bQaaPQoNqwyvFmrc91bbCWThvKssiC-aYgC58SC5kSZSnlUXPaabTO6XjXOfFwDTxIA09pHqVhp9Gq4eGuGvJwDXmQhjylRZSGnUarhtNdNRThGoogDUVKD1EadhqtGh7_2_L6APqKttedxasl9vE_Z9NyQ1XjsgmtHkyFX42uZszy-TLn5gOF1i23bPl47parqeC_YeYN802YXYe5n7yDzr3pwh8uYnofvOGjn3yMIT94wyc_-RRDfvTPKtt5Jv5Hds0ux09_AwAA___WSOum

statement ok
INSERT INTO data SELECT a, b, c::FLOAT, 1
FROM generate_series(1,10) AS a, generate_series(1,10) AS b, generate_series(1,10) AS c;

query T
SELECT url FROM [EXPLAIN ANALYZE (DISTSQL) CREATE STATISTICS s1 ON a FROM data]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJy0lU9vnDAQxe_9FNacvQIDu9n4lPaWS1Nle6s4OOspQQWMbKNmG-13r8BdJewf4yrLkfHMvOf3Q_IrNEriV1GjAf4DGFBIgEIKFDKgsIScQqvVFo1Rum9xA_fyBXhMoWzazvblnMJWaQT-Cra0FQKH7-KpwkcUEnUUAwWJVpTVINPqshZ6dyeFFUBh04rGcLKIGBGNJIwo-4waKDx0lpO73pRWvw3RKCQnLO63GSuqitiyRk5iM-44c57vKajOvrk1VhQInO1p-I02om4r1NFyfBtX3pR_0ElvrHC2L4kmF0XftLpGaYka5Ugr31-09bkoNBbCKh2x-IMG05FBFs6ZhXCO2CJKQkgn85GeuNOB9OqqpJPwIJOgIJNFlM4WJIVavJAaa6V3pDPYd5Ev_8qyNL_eFcNCn7j_IfSbq4aehoeeBoWeLqLsA6FX-NOSo-SH2imesng-aXXFk15jlUZJTCmRk6FnBn4TUR74ra_KLwvnlwXxyxbRMoRfeo5fWFATng9B3c72jpwRfUTTqsbg0XtyfnPcvzMoC3SPklGd3uI3rbaDjPt8GOaGgkRj3SlzH_eNO-oNvh9m3uFkNMyOhxO_8oR06p3O_MPZ__geUhwChaedRUMMNrZHmh-vXXrXrvyeVvN4uvGuXfs9refxdOsnH0_8dP5fNtxVvv_0NwAA___Cd5vl

0 comments on commit ec5c3aa

Please sign in to comment.