Skip to content
This repository has been archived by the owner on Nov 30, 2021. It is now read-only.

Commit

Permalink
jobs: schedule a job in a transaction to execute after commit
Browse files Browse the repository at this point in the history
It's useful to be able to create jobs in transactions. Obviously
they would need to execute after the transaction commits. We add
a new function ScheduleJob on the planner that allows that. The
main use case is converting schema changes to jobs. We sweep under
the rug the complicated semantics of handling any failures of these
jobs. This will be addressed in follow up PR(s). This PR handles the
plumbing and this function is not for production use yet.

Touches cockroachdb#37691, cockroachdb#42061.

Release note: none.
  • Loading branch information
Spas Bojanov authored and spaskob committed Dec 6, 2019
1 parent fbfd5d8 commit 19b22f5
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 0 deletions.
89 changes: 89 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
Expand Down Expand Up @@ -1612,3 +1613,91 @@ func TestShowJobWhenComplete(t *testing.T) {
}
})
}

func TestJobInTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer jobs.ResetConstructors()()

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond

ctx := context.TODO()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

var hasRun bool
var job *jobs.Job
sql.AddPlanHook(
func(_ context.Context, stmt tree.Statement, phs sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) {
_, ok := stmt.(*tree.Backup)
if !ok {
return nil, nil, nil, false, nil
}
fn := func(_ context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error {
var err error
job, err = phs.ScheduleJob(
jobs.Record{
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
},
)
return err
}
return fn, nil, nil, false, nil
})

var lock syncutil.Mutex
jobs.RegisterConstructor(jobspb.TypeBackup, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func() error {
lock.Lock()
hasRun = true
lock.Unlock()
return nil
},
}
})
t.Run("normal success", func(t *testing.T) {
txn, err := sqlDB.Begin()
if err != nil {
t.Fatal(err)
}
if _, err = txn.Exec("BACKUP doesnot.matter TO doesnotmattter"); err != nil {
t.Fatal(err)
}

// If we rollback then the job should not run
if err = txn.Rollback(); err != nil {
t.Fatal(err)
}
sqlRunner := sqlutils.MakeSQLRunner(sqlDB)
// Just in case the job was scheduled let's wait for it to finish
// to avoid a race.
sqlRunner.Exec(t, "SHOW JOB WHEN COMPLETE $1", *job.ID())
lock.Lock()
if hasRun {
t.Fatalf("job has run in transaction before txn commit")
}
lock.Unlock()

// Now let's actually commit the transaction and check that the job ran.
txn, err = sqlDB.Begin()
if err != nil {
t.Fatal(err)
}
if _, err = txn.Exec("BACKUP doesnot.matter TO doesnotmattter"); err != nil {
t.Fatal(err)
}
if err = txn.Commit(); err != nil {
t.Fatal(err)
}
lock.Lock()
if !hasRun {
t.Fatalf("job scheduled in transaction did not run")
}
lock.Unlock()
})
}
22 changes: 22 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package jobs

import (
"bytes"
"context"
"fmt"
"strings"
Expand Down Expand Up @@ -246,6 +247,27 @@ func (r *Registry) StartJob(
return errCh, nil
}

// StartAndWaitForJobs starts previously unstarted jobs from a list of scheduled jobs.
// The ctx passed to this function is not the context the jobs will be started with
// (canceling ctx will not cause the job to cancel).
func (r *Registry) StartAndWaitForJobs(
ctx context.Context, ex sqlutil.InternalExecutor, jobs []int64,
) error {
log.Infof(ctx, "scheduled jobs %+v", jobs)
if len(jobs) == 0 {
return nil
}
buf := bytes.Buffer{}
for i, j := range jobs {
if i > 0 {
buf.WriteString(",")
}
buf.WriteString(fmt.Sprintf(" (%d)", j))
}
_, err := ex.Exec(ctx, "wait-for-jobs", nil, fmt.Sprintf("SHOW JOBS WHEN COMPLETE VALUES %s", buf.String()))
return err
}

// NewJob creates a new Job.
func (r *Registry) NewJob(record Record) *Job {
job := &Job{
Expand Down
34 changes: 34 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,8 @@ type connExecutor struct {
// is done if the statement was executed in an implicit txn).
schemaChangers schemaChangerCollection

jobs jobsCollection

// autoRetryCounter keeps track of the which iteration of a transaction
// auto-retry we're currently in. It's 0 whenever the transaction state is not
// stateOpen.
Expand Down Expand Up @@ -1867,6 +1869,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
DistSQLPlanner: ex.server.cfg.DistSQLPlanner,
TxnModesSetter: ex,
SchemaChangers: &ex.extraTxnState.schemaChangers,
Jobs: &ex.extraTxnState.jobs,
schemaAccessors: scInterface,
sqlStatsCollector: ex.statsCollector,
}
Expand Down Expand Up @@ -1999,6 +2002,37 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
errorutil.SendReport(ex.Ctx(), &ex.server.cfg.Settings.SV, err)
return advanceInfo{}, err
}

if err := ex.server.cfg.JobRegistry.StartAndWaitForJobs(
ex.ctxHolder.connCtx,
ex.server.cfg.InternalExecutor,
ex.extraTxnState.jobs.scheduled); err != nil {
// Some of the jobs scheduled in transaction failed but
// everything else in the transaction was actually committed already.
// At this point, it is too late to cancel the transaction.
// In effect, we have violated the "A" of ACID.
//
// This situation is sufficiently serious that we cannot let
// the error that caused the schema change to fail flow back
// to the client as-is. We replace it by a custom code
// dedicated to this situation. Replacement occurs
// because this error code is a "serious error" and the code
// computation logic will give it a higher priority.
//
// We also print out the original error code as prefix of
// the error message, in case it was a serious error.
newErr := pgerror.Wrapf(err,
pgcode.TransactionCommittedWithSchemaChangeFailure,
"transaction committed but schema change aborted with error: (%s)",
pgerror.GetPGCode(err))
newErr = errors.WithHint(newErr,
"Some of the non-job statements may have committed successfully, but some of the job statement(s) failed.\n"+
"Manual inspection may be required to determine the actual state of the database.")
newErr = errors.WithIssueLink(newErr,
errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/42061"})
res.SetError(newErr)
log.Fatalf(ex.ctxHolder.connCtx, fmt.Sprintf("waiting for jobs failed: %v", err))
}
scc := &ex.extraTxnState.schemaChangers
if len(scc.schemaChangers) != 0 {
ieFactory := func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,10 @@ type schemaChangerCollection struct {
schemaChangers []SchemaChanger
}

type jobsCollection struct {
scheduled []int64
}

func (scc *schemaChangerCollection) queueSchemaChanger(schemaChanger SchemaChanger) {
scc.schemaChangers = append(scc.schemaChangers, schemaChanger)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/planhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -97,6 +98,7 @@ type PlanHookState interface {
ResolveMutableTableDescriptor(
ctx context.Context, tn *ObjectName, required bool, requiredType ResolveRequiredType,
) (table *MutableTableDescriptor, err error)
ScheduleJob(record jobs.Record) (*jobs.Job, error)
}

// AddPlanHook adds a hook used to short-circuit creating a planNode from a
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -68,6 +69,8 @@ type extendedEvalContext struct {

SchemaChangers *schemaChangerCollection

Jobs *jobsCollection

schemaAccessors *schemaInterface

sqlStatsCollector *sqlStatsCollector
Expand Down Expand Up @@ -585,6 +588,15 @@ func (p *planner) SessionData() *sessiondata.SessionData {
return p.EvalContext().SessionData
}

func (p *planner) ScheduleJob(record jobs.Record) (*jobs.Job, error) {
job, err := p.execCfg.JobRegistry.CreateJobWithTxn(p.EvalContext().Context, record, p.ExtendedEvalContext().Txn)
if err != nil {
return nil, err
}
p.ExtendedEvalContext().Jobs.scheduled = append(p.ExtendedEvalContext().Jobs.scheduled, *job.ID())
return job, nil
}

// txnModesSetter is an interface used by SQL execution to influence the current
// transaction.
type txnModesSetter interface {
Expand Down

0 comments on commit 19b22f5

Please sign in to comment.