From badf836c1845f9dc7fe1beda79cfd74bec4a7f96 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Thu, 17 Sep 2020 17:58:02 -0400 Subject: [PATCH] bulkio: Propagate errors when executing schedule. Use correct error object when checking for retryable errors. In addition, add a `FOR UPDATE` clause when picking up schedules to execute to reduce contention. Release Notes: None Release Justification: Bug fix; Incorrect handling of transaction errors resulted in scheduled jobs showing incorrect and confusing status message. --- pkg/jobs/job_scheduler.go | 5 ++-- pkg/jobs/job_scheduler_test.go | 49 +++++++++++++++++++++++++++++++ pkg/jobs/schedule_control_test.go | 13 ++++---- pkg/jobs/testutils_test.go | 37 ++++++++++------------- 4 files changed, 74 insertions(+), 30 deletions(-) diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 312d06b63dd2..067fb1393b55 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -88,7 +88,8 @@ SELECT FROM %s S WHERE next_run < %s ORDER BY random() -%s`, env.SystemJobsTableName(), CreatedByScheduledJobs, +%s +FOR UPDATE`, env.SystemJobsTableName(), CreatedByScheduledJobs, StatusSucceeded, StatusCanceled, StatusFailed, env.ScheduledJobsTableName(), env.NowExpr(), limitClause) } @@ -297,7 +298,7 @@ func (s *jobScheduler) executeSchedules( if processErr := withSavePoint(ctx, txn, func() error { return s.processSchedule(ctx, schedule, numRunning, stats, txn) }); processErr != nil { - if errors.HasType(err, (*savePointError)(nil)) { + if errors.HasType(processErr, (*savePointError)(nil)) { return errors.Wrapf(err, "savepoint error for schedule %d", schedule.ScheduleID()) } diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index 9c71502917a5..0dbb28fb94c3 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -592,3 +593,51 @@ func TestJobSchedulerDaemonUsesSystemTables(t *testing.T) { return nil }) } + +func TestTransientTxnErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + h, cleanup := newTestHelper(t) + defer cleanup() + ctx := context.Background() + + h.sqlDB.Exec(t, "CREATE TABLE defaultdb.foo(a int primary key, b timestamp not null)") + + // Setup 10 schedules updating defaultdb.foo timestamp. + for i := 0; i < 10; i++ { + schedule := NewScheduledJob(h.env) + schedule.SetScheduleLabel(fmt.Sprintf("test schedule: %d", i)) + schedule.SetOwner("test") + require.NoError(t, schedule.SetSchedule("*/1 * * * *")) + any, err := types.MarshalAny(&jobspb.SqlStatementExecutionArg{ + Statement: fmt.Sprintf("UPSERT INTO defaultdb.foo (a, b) VALUES (%d, now())", i), + }) + require.NoError(t, err) + schedule.SetExecutionDetails(InlineExecutorName, jobspb.ExecutionArguments{Args: any}) + require.NoError(t, schedule.Create( + ctx, h.cfg.InternalExecutor, nil)) + } + + // Setup numConcurrent workers, each executing maxExec executeSchedule calls. + const maxExec = 100 + const numConcurrent = 3 + require.NoError(t, + ctxgroup.GroupWorkers(context.Background(), numConcurrent, func(ctx context.Context, _ int) error { + ticker := time.NewTicker(time.Millisecond) + numExecs := 0 + for range ticker.C { + h.env.AdvanceTime(time.Minute) + // Transaction retry errors should never bubble up. + require.NoError(t, + h.cfg.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + return h.execSchedules(ctx, allSchedules, txn) + })) + numExecs++ + if numExecs == maxExec { + return nil + } + } + return nil + })) +} diff --git a/pkg/jobs/schedule_control_test.go b/pkg/jobs/schedule_control_test.go index 04fd125f86e6..d525bc53cc61 100644 --- a/pkg/jobs/schedule_control_test.go +++ b/pkg/jobs/schedule_control_test.go @@ -15,6 +15,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" @@ -31,9 +32,6 @@ func TestScheduleControl(t *testing.T) { th, cleanup := newTestHelper(t) defer cleanup() - // Inject our test environment into schedule control execution via testing knobs. - th.cfg.TestingKnobs.(*TestingKnobs).JobSchedulerEnv = th.env - t.Run("non-existent", func(t *testing.T) { for _, command := range []string{ "PAUSE SCHEDULE 123", @@ -123,8 +121,7 @@ func TestScheduleControl(t *testing.T) { func TestJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() - th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, - true /* accelerateIntervals */) + th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables) defer cleanup() registry := th.server.JobRegistry().(*Registry) @@ -227,10 +224,12 @@ func TestJobsControlForSchedules(t *testing.T) { func TestFilterJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() defer ResetConstructors()() - th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, - false /* accelerateIntervals */) + th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables) defer cleanup() + // Prevent registry from changing job state while running this test. + defer TestingSetAdoptAndCancelIntervals(24*time.Hour, 24*time.Hour)() + registry := th.server.JobRegistry().(*Registry) blockResume := make(chan struct{}) defer close(blockResume) diff --git a/pkg/jobs/testutils_test.go b/pkg/jobs/testutils_test.go index ed41d8499f06..e973aef34615 100644 --- a/pkg/jobs/testutils_test.go +++ b/pkg/jobs/testutils_test.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -33,11 +32,13 @@ import ( "github.com/stretchr/testify/require" ) +type execSchedulesFn func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error type testHelper struct { - env *jobstest.JobSchedulerTestEnv - server serverutils.TestServerInterface - cfg *scheduledjobs.JobExecutionConfig - sqlDB *sqlutils.SQLRunner + env *jobstest.JobSchedulerTestEnv + server serverutils.TestServerInterface + execSchedules execSchedulesFn + cfg *scheduledjobs.JobExecutionConfig + sqlDB *sqlutils.SQLRunner } // newTestHelper creates and initializes appropriate state for a test, @@ -51,20 +52,20 @@ type testHelper struct { // The testHelper will accelerate the adoption and cancellation loops inside of // the registry. func newTestHelper(t *testing.T) (*testHelper, func()) { - return newTestHelperForTables(t, jobstest.UseTestTables, - true /* accelerateIntervals */) + return newTestHelperForTables(t, jobstest.UseTestTables) } func newTestHelperForTables( - t *testing.T, envTableType jobstest.EnvTablesType, accelerateIntervals bool, + t *testing.T, envTableType jobstest.EnvTablesType, ) (*testHelper, func()) { - var cleanupIntervals func() - if accelerateIntervals { - cleanupIntervals = TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond) - } + var execSchedules execSchedulesFn + // Setup test scheduled jobs table. + env := jobstest.NewJobSchedulerTestEnv(envTableType, timeutil.Now()) knobs := &TestingKnobs{ - TakeOverJobsScheduling: func(_ func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) { + JobSchedulerEnv: env, + TakeOverJobsScheduling: func(daemon func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) { + execSchedules = daemon }, } s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{ @@ -73,9 +74,6 @@ func newTestHelperForTables( sqlDB := sqlutils.MakeSQLRunner(db) - // Setup test scheduled jobs table. - env := jobstest.NewJobSchedulerTestEnv(envTableType, timeutil.Now()) - if envTableType == jobstest.UseTestTables { sqlDB.Exec(t, jobstest.GetScheduledJobsTableSchema(env)) sqlDB.Exec(t, jobstest.GetJobsTableSchema(env)) @@ -91,12 +89,9 @@ func newTestHelperForTables( DB: kvDB, TestingKnobs: knobs, }, - sqlDB: sqlDB, + sqlDB: sqlDB, + execSchedules: execSchedules, }, func() { - if cleanupIntervals != nil { - cleanupIntervals() - } - if envTableType == jobstest.UseTestTables { sqlDB.Exec(t, "DROP TABLE "+env.SystemJobsTableName()) sqlDB.Exec(t, "DROP TABLE "+env.ScheduledJobsTableName())