Skip to content
This repository has been archived by the owner on Nov 30, 2021. It is now read-only.

Commit

Permalink
jobs: schedule a job in a transaction to execute after commit
Browse files Browse the repository at this point in the history
It's useful to be able to create jobs in transactions. Obviously
they would need to execute after the transaction commits. We add
a new function QueueJob on the extendedEvalContext to do that. The
main use case is converting schema changes to jobs. We sweep under
the rug the complicated semantics of handling any failures of these
jobs. This will be addressed in follow up PR(s). This PR handles the
plumbing and this function is not for production use yet.

Touches cockroachdb#37691, cockroachdb#42061.

Release note: none.
  • Loading branch information
Spas Bojanov committed Dec 12, 2019
1 parent 1c54631 commit 66e6fde
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 31 deletions.
84 changes: 84 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
Expand Down Expand Up @@ -1612,3 +1614,85 @@ func TestShowJobWhenComplete(t *testing.T) {
}
})
}

func TestJobInTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer jobs.ResetConstructors()()

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

// Use int32 instead of bool to take advantage of atomic operations.
var hasRun int32
var job *jobs.Job
sql.AddPlanHook(
func(_ context.Context, stmt tree.Statement, phs sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) {
_, ok := stmt.(*tree.Backup)
if !ok {
return nil, nil, nil, false, nil
}
fn := func(_ context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error {
var err error
job, err = phs.ExtendedEvalContext().QueueJob(
jobs.Record{
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
},
)
return err
}
return fn, nil, nil, false, nil
})

jobs.RegisterConstructor(jobspb.TypeBackup, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func() error {
atomic.StoreInt32(&hasRun, 1)
return nil
},
}
})
t.Run("normal success", func(t *testing.T) {
txn, err := sqlDB.Begin()
if err != nil {
t.Fatal(err)
}
if _, err = txn.Exec("BACKUP doesnot.matter TO doesnotmattter"); err != nil {
t.Fatal(err)
}

// If we rollback then the job should not run
if err = txn.Rollback(); err != nil {
t.Fatal(err)
}
sqlRunner := sqlutils.MakeSQLRunner(sqlDB)
// Just in case the job was scheduled let's wait for it to finish
// to avoid a race.
sqlRunner.Exec(t, "SHOW JOB WHEN COMPLETE $1", *job.ID())
if atomic.LoadInt32(&hasRun) == int32(1) {
t.Fatalf("job has run in transaction before txn commit")
}

// Now let's actually commit the transaction and check that the job ran.
txn, err = sqlDB.Begin()
if err != nil {
t.Fatal(err)
}
if _, err = txn.Exec("BACKUP doesnot.matter TO doesnotmattter"); err != nil {
t.Fatal(err)
}
if err = txn.Commit(); err != nil {
t.Fatal(err)
}
if atomic.LoadInt32(&hasRun) == int32(0) {
t.Fatalf("job scheduled in transaction did not run")
}
})
}
26 changes: 26 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package jobs

import (
"bytes"
"context"
"fmt"
"strings"
Expand Down Expand Up @@ -246,6 +247,31 @@ func (r *Registry) StartJob(
return errCh, nil
}

// StartAndWaitForJobs starts previously unstarted jobs from a list of scheduled
// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs.
func (r *Registry) StartAndWaitForJobs(
ctx context.Context, ex sqlutil.InternalExecutor, jobs []int64,
) error {
log.Infof(ctx, "scheduled jobs %+v", jobs)
if len(jobs) == 0 {
return nil
}
buf := bytes.Buffer{}
for i, j := range jobs {
if i > 0 {
buf.WriteString(",")
}
buf.WriteString(fmt.Sprintf(" (%d)", j))
}
_, err := ex.Exec(
ctx,
"wait-for-jobs",
nil, /* txn */
fmt.Sprintf("SHOW JOBS WHEN COMPLETE VALUES %s", buf.String()),
)
return err
}

// NewJob creates a new Job.
func (r *Registry) NewJob(record Record) *Job {
job := &Job{
Expand Down
82 changes: 51 additions & 31 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,13 @@ type connExecutor struct {
// is done if the statement was executed in an implicit txn).
schemaChangers schemaChangerCollection

// jobs jobsCollection accumulates jobs staged for execution inside the
// transaction. Staging happens when executing statements that are
// implemented with a job. The job must be staged via the function
// QueueJob in pkg/sql/planner.go. The staged jobs are executed once
// the transaction that staged them commits.
jobs jobsCollection

// autoRetryCounter keeps track of the which iteration of a transaction
// auto-retry we're currently in. It's 0 whenever the transaction state is not
// stateOpen.
Expand Down Expand Up @@ -1871,6 +1878,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
DistSQLPlanner: ex.server.cfg.DistSQLPlanner,
TxnModesSetter: ex,
SchemaChangers: &ex.extraTxnState.schemaChangers,
Jobs: &ex.extraTxnState.jobs,
schemaAccessors: scInterface,
sqlStatsCollector: ex.statsCollector,
}
Expand Down Expand Up @@ -2003,6 +2011,48 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
errorutil.SendReport(ex.Ctx(), &ex.server.cfg.Settings.SV, err)
return advanceInfo{}, err
}

handleErr := func(err error) {
if implicitTxn {
// The schema change/job failed but it was also the only
// operation in the transaction. In this case, the
// transaction's error is the schema change error.
res.SetError(err)
} else {
// The schema change/job failed but everything else in the
// transaction was actually committed successfully already.
// At this point, it is too late to cancel the transaction.
// In effect, we have violated the "A" of ACID.
//
// This situation is sufficiently serious that we cannot let
// the error that caused the schema change to fail flow back
// to the client as-is. We replace it by a custom code
// dedicated to this situation. Replacement occurs
// because this error code is a "serious error" and the code
// computation logic will give it a higher priority.
//
// We also print out the original error code as prefix of
// the error message, in case it was a serious error.
newErr := pgerror.Wrapf(err,
pgcode.TransactionCommittedWithSchemaChangeFailure,
"transaction committed but schema change aborted with error: (%s)",
pgerror.GetPGCode(err))
newErr = errors.WithHint(newErr,
"Some of the non-DDL statements may have committed successfully, "+
"but some of the DDL statement(s) failed.\n Manual inspection may be "+
"required to determine the actual state of the database.")
newErr = errors.WithIssueLink(newErr,
errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/42061"})
res.SetError(newErr)
}
}
if err := ex.server.cfg.JobRegistry.StartAndWaitForJobs(
ex.ctxHolder.connCtx,
ex.server.cfg.InternalExecutor,
ex.extraTxnState.jobs.scheduled); err != nil {
handleErr(err)
}

scc := &ex.extraTxnState.schemaChangers
if len(scc.schemaChangers) != 0 {
ieFactory := func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor {
Expand All @@ -2018,37 +2068,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
if schemaChangeErr := scc.execSchemaChanges(
ex.Ctx(), ex.server.cfg, &ex.sessionTracing, ieFactory,
); schemaChangeErr != nil {
if implicitTxn {
// The schema change failed but it was also the only
// operation in the transaction. In this case, the
// transaction's error is the schema change error.
res.SetError(schemaChangeErr)
} else {
// The schema change failed but everything else in the transaction
// was actually committed successfully already. At this point,
// it is too late to cancel the transaction. In effect, we have
// violated the "A" of ACID.
//
// This situation is sufficiently serious that we cannot let
// the error that caused the schema change to fail flow back
// to the client as-is. We replace it by a custom code
// dedicated to this situation. Replacement occurs
// because this error code is a "serious error" and the code
// computation logic will give it a higher priority.
//
// We also print out the original error code as prefix of
// the error message, in case it was a serious error.
newErr := pgerror.Wrapf(schemaChangeErr,
pgcode.TransactionCommittedWithSchemaChangeFailure,
"transaction committed but schema change aborted with error: (%s)",
pgerror.GetPGCode(schemaChangeErr))
newErr = errors.WithHint(newErr,
"Some of the non-DDL statements may have committed successfully, but some of the DDL statement(s) failed.\n"+
"Manual inspection may be required to determine the actual state of the database.")
newErr = errors.WithIssueLink(newErr,
errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/42061"})
res.SetError(newErr)
}
handleErr(schemaChangeErr)
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,10 @@ type schemaChangerCollection struct {
schemaChangers []SchemaChanger
}

type jobsCollection struct {
scheduled []int64
}

func (scc *schemaChangerCollection) queueSchemaChanger(schemaChanger SchemaChanger) {
scc.schemaChangers = append(scc.schemaChangers, schemaChanger)
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -68,6 +69,8 @@ type extendedEvalContext struct {

SchemaChangers *schemaChangerCollection

Jobs *jobsCollection

schemaAccessors *schemaInterface

sqlStatsCollector *sqlStatsCollector
Expand All @@ -80,6 +83,21 @@ func (ctx *extendedEvalContext) copy() *extendedEvalContext {
return &cpy
}

// QueueJob creates a new job from record and queues it for execution after
// the transaction in the planner commits.
func (ctx *extendedEvalContext) QueueJob(record jobs.Record) (*jobs.Job, error) {
job, err := ctx.ExecCfg.JobRegistry.CreateJobWithTxn(
ctx.Context,
record,
ctx.Txn,
)
if err != nil {
return nil, err
}
ctx.Jobs.scheduled = append(ctx.Jobs.scheduled, *job.ID())
return job, nil
}

// schemaInterface provides access to the database and table descriptors.
// See schema_accessors.go.
type schemaInterface struct {
Expand Down

0 comments on commit 66e6fde

Please sign in to comment.