Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
91160: sql: do not print stack trace when logging if txn is not open r=yuzefovich a=yuzefovich

After executing each statement, that statement might be logged. If there
were any audit events, then we attempt to resolve the table names for
which the audit events have occurred. To do the resolution we're using
the current txn. Previously, if that txn has been aborted or committed,
it would result in a scary-looking stack trace added to the log, and
this commit fixes it.

Epic: None

Release note: None

91563: jobs: clear job claim after execution r=ajwerner a=stevendanna

Since cockroachdb#89014 the job system reset a job's claim when transitioning it from pause-requested to paused and from cancel-requested to reverting. The job system signals these transitions to the running Resumer by cancelling the job's context and does not wait for the resumer to exit. Once the claim is clear, another node can adopt the job and start running it's OnFailOrCancel callback. As a result, clearing the context makes it more likely that OnFailOrCancel executions will overlap with Resume executions.

In general, Jobs need to assume that Resume may still be running while OnFailOrCancel is called. But, making it more likely isn't in our interest.

Here, we only clear the lease when we exit the job state machine. This makes it much more likely that OnFailOrCancel doesn't start until Resume has returned.

Epic: None

Release note: None

91874: colflow: temporarily disable test assertion about closers r=yuzefovich a=yuzefovich

Informs: cockroachdb#91845.

Epic: None

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
3 people committed Nov 14, 2022
4 parents df02d67 + d9726d2 + deb0896 + 4e6730e commit a8b0cd9
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 22 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ go_test(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -426,7 +427,7 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
feed := testFeed.(cdctest.EnterpriseTestFeed)

require.NoError(t, feed.Pause())

jobutils.WaitForJobToHaveNoLease(t, sqlDB, feed.JobID())
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID()))

counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
Expand Down
41 changes: 36 additions & 5 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,26 +417,57 @@ func (r *Registry) runJob(
log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err)
}

r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
r.maybeRecordExecutionFailure(ctx, err, job)
// NB: After this point, the job may no longer have the claim
// and further updates to the job record from this node may
// fail.
r.maybeClearLease(job, err)
r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
if r.knobs.AfterJobStateMachine != nil {
r.knobs.AfterJobStateMachine()
}
return err
}

const clearClaimQuery = `
UPDATE system.jobs
SET claim_session_id = NULL, claim_instance_id = NULL
WHERE id = $1
AND claim_session_id = $2
AND claim_instance_id = $3
AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')`

// maybeClearLease clears the claim on the given job, provided that
// the current lease matches our liveness Session.
func (r *Registry) maybeClearLease(job *Job, jobErr error) {
if jobErr == nil {
return
}

// We use the serverCtx here rather than the context from the
// caller since the caller's context may have been canceled.
r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) {
n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID())
if err != nil {
log.Warningf(ctx, "could not clear job claim: %s", err.Error())
return
}
log.VEventf(ctx, 2, "cleared leases for %d jobs", n)
})
}

const pauseAndCancelUpdate = `
UPDATE system.jobs
SET status =
SET status =
CASE
WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `'
WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `'
ELSE status
END,
num_runs = 0,
last_run = NULL,
claim_session_id = NULL,
claim_instance_id = NULL
last_run = NULL
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
Expand Down
7 changes: 7 additions & 0 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestScheduleControl(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newTestHelper(t)
defer cleanup()

Expand Down Expand Up @@ -139,6 +142,8 @@ func TestScheduleControl(t *testing.T) {

func TestJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, nil)
defer cleanup()

Expand Down Expand Up @@ -247,6 +252,7 @@ func TestJobsControlForSchedules(t *testing.T) {
// jobs prior to executing the control command.
func TestFilterJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer ResetConstructors()()

argsFn := func(args *base.TestServerArgs) {
Expand Down Expand Up @@ -327,6 +333,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) {

func TestJobControlByType(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer ResetConstructors()()

argsFn := func(args *base.TestServerArgs) {
Expand Down
51 changes: 48 additions & 3 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"runtime/pprof"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1211,10 +1212,11 @@ func TestJobLifecycle(t *testing.T) {

done := make(chan struct{})
defer close(done)

resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -1469,6 +1471,10 @@ func TestJobLifecycle(t *testing.T) {
t.Fatal(err)
}

// Wait for job to be adopted so that we have the
// lease and can move to succeeded.
resumeSignaler.WaitForResumeStarted()

// PauseRequested fails after job is successful.
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3109,6 +3115,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) {
require.Equal(t, expectedFiles, filesInZip)
}

type resumeStartedSignaler struct {
syncutil.Mutex
cond *sync.Cond
isStarted bool
}

func newResumeStartedSignaler() *resumeStartedSignaler {
ret := &resumeStartedSignaler{}
ret.cond = sync.NewCond(&ret.Mutex)
return ret

}

func (r *resumeStartedSignaler) SignalResumeStarted() {
r.Lock()
r.isStarted = true
r.cond.Signal()
r.Unlock()
}

func (r *resumeStartedSignaler) WaitForResumeStarted() {
r.Lock()
for !r.isStarted {
r.cond.Wait()
}
r.isStarted = false
r.Unlock()
}

// TestPauseReason tests pausing a job with a user specified reason.
func TestPauseReason(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand All @@ -3125,10 +3160,11 @@ func TestPauseReason(t *testing.T) {

done := make(chan struct{})
defer close(done)

resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -3160,9 +3196,16 @@ func TestPauseReason(t *testing.T) {
return n
}
mustNotHaveClaim := func() {
require.Equal(t, 0, countRowsWithClaimInfo())
t.Helper()
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 0 {
return nil
}
return errors.New("still waiting for claim to clear")
})
}
mustHaveClaim := func() {
t.Helper()
testutils.SucceedsSoon(t, func() error {
if countRowsWithClaimInfo() == 1 {
return nil
Expand All @@ -3175,6 +3218,7 @@ func TestPauseReason(t *testing.T) {
q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID)
tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}})
mustHaveClaim()
resumeSignaler.WaitForResumeStarted()

getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) {
var payloadBytes []byte
Expand Down Expand Up @@ -3208,6 +3252,7 @@ func TestPauseReason(t *testing.T) {

checkStatusAndPauseReason(t, jobID, "running", "for testing")
mustHaveClaim()
resumeSignaler.WaitForResumeStarted()
}
{
// Pause the job again with a different reason. Verify that the job is paused with the reason.
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestJobsTableClaimFamily(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j
return
}
if updateErr != nil {
log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err)
log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr)
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ func TestRunWithoutLoop(t *testing.T) {

func TestJobIdleness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
intervalOverride := time.Millisecond
Expand Down Expand Up @@ -1111,6 +1112,7 @@ func TestJobIdleness(t *testing.T) {
// allow other job registries in the cluster to claim and run this job.
func TestDisablingJobAdoptionClearsClaimSessionID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

intervalOverride := time.Millisecond
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Expand Down
23 changes: 12 additions & 11 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,17 +384,18 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) {
// This cleans up all the memory and disk monitoring of the vectorized flow.
f.creator.cleanup(ctx)

if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics {
// Check that all closers have been closed. Note that we don't check
// this in case the flow was never started in the first place (it is ok
// to not check this since closers haven't allocated any resources in
// such a case). We also don't check when the panic injection is
// enabled since then Close() might be legitimately not called (if a
// panic is injected in Init() of the wrapped operator).
if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers {
colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed))
}
}
// TODO(yuzefovich): uncomment this once the assertion is no longer flaky.
//if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics {
// // Check that all closers have been closed. Note that we don't check
// // this in case the flow was never started in the first place (it is ok
// // to not check this since closers haven't allocated any resources in
// // such a case). We also don't check when the panic injection is
// // enabled since then Close() might be legitimately not called (if a
// // panic is injected in Init() of the wrapped operator).
// if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers {
// colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed))
// }
//}

f.tempStorage.Lock()
created := f.tempStorage.path != ""
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -167,6 +168,8 @@ func (p *planner) maybeLogStatement(
p.maybeLogStatementInternal(ctx, execType, isCopy, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics, stmtFingerprintID, queryStats)
}

var errTxnIsNotOpen = errors.New("txn is already committed or rolled back")

func (p *planner) maybeLogStatementInternal(
ctx context.Context,
execType executorType,
Expand Down Expand Up @@ -323,13 +326,19 @@ func (p *planner) maybeLogStatementInternal(
mode = "rw"
}
tableName := ""
var tn *tree.TableName
// We only have a valid *table* name if the object being
// audited is table-like (includes view, sequence etc). For
// now, this is sufficient because the auditing feature can
// only audit tables. If/when the mechanisms are extended to
// audit databases and schema, we need more logic here to
// extract a name to include in the logging events.
tn, err := p.getQualifiedTableName(ctx, ev.desc)
if p.txn != nil && p.txn.IsOpen() {
// Only open txn accepts further commands.
tn, err = p.getQualifiedTableName(ctx, ev.desc)
} else {
err = errTxnIsNotOpen
}
if err != nil {
log.Warningf(ctx, "name for audited table ID %d not found: %v", ev.desc.GetID(), err)
} else {
Expand Down
13 changes: 13 additions & 0 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ func waitForJobToHaveStatus(
}, 2*time.Minute)
}

func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) {
t.Helper()
testutils.SucceedsWithin(t, func() error {
var sessionID []byte
var instanceID gosql.NullInt64
db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID)
if sessionID == nil && !instanceID.Valid {
return nil
}
return errors.Newf("job %d still has claim information")
}, 2*time.Minute)
}

// RunJob runs the provided job control statement, initializing, notifying and
// closing the chan at the passed pointer (see below for why) and returning the
// jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard
Expand Down

0 comments on commit a8b0cd9

Please sign in to comment.