Skip to content

Commit

Permalink
bulkio: Propagate errors when executing schedule.
Browse files Browse the repository at this point in the history
Use correct error object when checking for retryable errors.
In addition, add a `FOR UPDATE` clause when picking up
schedules to execute to reduce contention.

Release Notes: None

Release Justification: Bug fix; Incorrect handling of transaction
errors resulted in scheduled jobs showing incorrect and confusing
status message.
  • Loading branch information
Yevgeniy Miretskiy committed Sep 18, 2020
1 parent 68ff6b1 commit badf836
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 30 deletions.
5 changes: 3 additions & 2 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ SELECT
FROM %s S
WHERE next_run < %s
ORDER BY random()
%s`, env.SystemJobsTableName(), CreatedByScheduledJobs,
%s
FOR UPDATE`, env.SystemJobsTableName(), CreatedByScheduledJobs,
StatusSucceeded, StatusCanceled, StatusFailed,
env.ScheduledJobsTableName(), env.NowExpr(), limitClause)
}
Expand Down Expand Up @@ -297,7 +298,7 @@ func (s *jobScheduler) executeSchedules(
if processErr := withSavePoint(ctx, txn, func() error {
return s.processSchedule(ctx, schedule, numRunning, stats, txn)
}); processErr != nil {
if errors.HasType(err, (*savePointError)(nil)) {
if errors.HasType(processErr, (*savePointError)(nil)) {
return errors.Wrapf(err, "savepoint error for schedule %d", schedule.ScheduleID())
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -592,3 +593,51 @@ func TestJobSchedulerDaemonUsesSystemTables(t *testing.T) {
return nil
})
}

func TestTransientTxnErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

h, cleanup := newTestHelper(t)
defer cleanup()
ctx := context.Background()

h.sqlDB.Exec(t, "CREATE TABLE defaultdb.foo(a int primary key, b timestamp not null)")

// Setup 10 schedules updating defaultdb.foo timestamp.
for i := 0; i < 10; i++ {
schedule := NewScheduledJob(h.env)
schedule.SetScheduleLabel(fmt.Sprintf("test schedule: %d", i))
schedule.SetOwner("test")
require.NoError(t, schedule.SetSchedule("*/1 * * * *"))
any, err := types.MarshalAny(&jobspb.SqlStatementExecutionArg{
Statement: fmt.Sprintf("UPSERT INTO defaultdb.foo (a, b) VALUES (%d, now())", i),
})
require.NoError(t, err)
schedule.SetExecutionDetails(InlineExecutorName, jobspb.ExecutionArguments{Args: any})
require.NoError(t, schedule.Create(
ctx, h.cfg.InternalExecutor, nil))
}

// Setup numConcurrent workers, each executing maxExec executeSchedule calls.
const maxExec = 100
const numConcurrent = 3
require.NoError(t,
ctxgroup.GroupWorkers(context.Background(), numConcurrent, func(ctx context.Context, _ int) error {
ticker := time.NewTicker(time.Millisecond)
numExecs := 0
for range ticker.C {
h.env.AdvanceTime(time.Minute)
// Transaction retry errors should never bubble up.
require.NoError(t,
h.cfg.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
return h.execSchedules(ctx, allSchedules, txn)
}))
numExecs++
if numExecs == maxExec {
return nil
}
}
return nil
}))
}
13 changes: 6 additions & 7 deletions pkg/jobs/schedule_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
Expand All @@ -31,9 +32,6 @@ func TestScheduleControl(t *testing.T) {
th, cleanup := newTestHelper(t)
defer cleanup()

// Inject our test environment into schedule control execution via testing knobs.
th.cfg.TestingKnobs.(*TestingKnobs).JobSchedulerEnv = th.env

t.Run("non-existent", func(t *testing.T) {
for _, command := range []string{
"PAUSE SCHEDULE 123",
Expand Down Expand Up @@ -123,8 +121,7 @@ func TestScheduleControl(t *testing.T) {

func TestJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables,
true /* accelerateIntervals */)
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables)
defer cleanup()

registry := th.server.JobRegistry().(*Registry)
Expand Down Expand Up @@ -227,10 +224,12 @@ func TestJobsControlForSchedules(t *testing.T) {
func TestFilterJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer ResetConstructors()()
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables,
false /* accelerateIntervals */)
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables)
defer cleanup()

// Prevent registry from changing job state while running this test.
defer TestingSetAdoptAndCancelIntervals(24*time.Hour, 24*time.Hour)()

registry := th.server.JobRegistry().(*Registry)
blockResume := make(chan struct{})
defer close(blockResume)
Expand Down
37 changes: 16 additions & 21 deletions pkg/jobs/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -33,11 +32,13 @@ import (
"github.com/stretchr/testify/require"
)

type execSchedulesFn func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error
type testHelper struct {
env *jobstest.JobSchedulerTestEnv
server serverutils.TestServerInterface
cfg *scheduledjobs.JobExecutionConfig
sqlDB *sqlutils.SQLRunner
env *jobstest.JobSchedulerTestEnv
server serverutils.TestServerInterface
execSchedules execSchedulesFn
cfg *scheduledjobs.JobExecutionConfig
sqlDB *sqlutils.SQLRunner
}

// newTestHelper creates and initializes appropriate state for a test,
Expand All @@ -51,20 +52,20 @@ type testHelper struct {
// The testHelper will accelerate the adoption and cancellation loops inside of
// the registry.
func newTestHelper(t *testing.T) (*testHelper, func()) {
return newTestHelperForTables(t, jobstest.UseTestTables,
true /* accelerateIntervals */)
return newTestHelperForTables(t, jobstest.UseTestTables)
}

func newTestHelperForTables(
t *testing.T, envTableType jobstest.EnvTablesType, accelerateIntervals bool,
t *testing.T, envTableType jobstest.EnvTablesType,
) (*testHelper, func()) {
var cleanupIntervals func()
if accelerateIntervals {
cleanupIntervals = TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)
}
var execSchedules execSchedulesFn

// Setup test scheduled jobs table.
env := jobstest.NewJobSchedulerTestEnv(envTableType, timeutil.Now())
knobs := &TestingKnobs{
TakeOverJobsScheduling: func(_ func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) {
JobSchedulerEnv: env,
TakeOverJobsScheduling: func(daemon func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) {
execSchedules = daemon
},
}
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Expand All @@ -73,9 +74,6 @@ func newTestHelperForTables(

sqlDB := sqlutils.MakeSQLRunner(db)

// Setup test scheduled jobs table.
env := jobstest.NewJobSchedulerTestEnv(envTableType, timeutil.Now())

if envTableType == jobstest.UseTestTables {
sqlDB.Exec(t, jobstest.GetScheduledJobsTableSchema(env))
sqlDB.Exec(t, jobstest.GetJobsTableSchema(env))
Expand All @@ -91,12 +89,9 @@ func newTestHelperForTables(
DB: kvDB,
TestingKnobs: knobs,
},
sqlDB: sqlDB,
sqlDB: sqlDB,
execSchedules: execSchedules,
}, func() {
if cleanupIntervals != nil {
cleanupIntervals()
}

if envTableType == jobstest.UseTestTables {
sqlDB.Exec(t, "DROP TABLE "+env.SystemJobsTableName())
sqlDB.Exec(t, "DROP TABLE "+env.ScheduledJobsTableName())
Expand Down

0 comments on commit badf836

Please sign in to comment.