Skip to content

Commit

Permalink
jobs: remove double-load from metrics poller
Browse files Browse the repository at this point in the history
Previously, this code loaded the job twice, once via LoadJobWithTxn
and again via the call to Unpaused. Here, we re-arrange the code so
that it only loads it once.

Epic: none
Release note: None
  • Loading branch information
stevendanna committed Feb 22, 2024
1 parent ee94b81 commit faeda85
Showing 1 changed file with 38 additions and 30 deletions.
68 changes: 38 additions & 30 deletions pkg/jobs/metricspoller/job_statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
const pausedJobsCountQuery = string(`
SELECT job_type, count(*)
FROM system.jobs
WHERE status = '` + jobs.StatusPaused + `'
WHERE status = '` + jobs.StatusPaused + `'
GROUP BY job_type`)

// updatePausedMetrics counts the number of paused jobs per job type.
Expand Down Expand Up @@ -163,42 +163,50 @@ func processJobPTSRecord(
ptsStats map[jobspb.Type]*ptsStat,
txn isql.Txn,
) error {
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(jobID), txn)
var stats *ptsStat
defer func() {
if stats != nil {
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}
}
}()

err := execCfg.JobRegistry.UpdateJobWithTxn(ctx, jobspb.JobID(jobID), txn,
func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
p := md.Payload
jobType, err := p.CheckType()
if err != nil {
return err
}
stats = ptsStats[jobType]
if stats == nil {
stats = &ptsStat{}
ptsStats[jobType] = stats
}

// If MaximumPTSAge is set on the job payload, verify if PTS record
// timestamp is fresh enough.
if p.MaximumPTSAge > 0 &&
rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) {
stats.expired++
ptsExpired := errors.Newf(
"protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s",
rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge)
log.Warningf(ctx, "job %d canceled due to %s", jobID, ptsExpired)
return ju.CancelRequestedWithReason(ctx, md, ptsExpired)
}
return nil
})
if err != nil {
if jobs.HasJobNotFoundError(err) {
return nil // nolint:returnerrcheck -- job maybe deleted when we run; just keep going.
}
return err
}
p := j.Payload()
jobType, err := p.CheckType()
if err != nil {
return err
}
stats := ptsStats[jobType]
if stats == nil {
stats = &ptsStat{}
ptsStats[jobType] = stats
}
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}

// If MaximumPTSAge is set on the job payload, verify if PTS record
// timestamp is fresh enough.
if p.MaximumPTSAge > 0 &&
rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) {
stats.expired++
ptsExpired := errors.Newf(
"protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s",
rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge)
if err := j.WithTxn(txn).CancelRequestedWithReason(ctx, ptsExpired); err != nil {
return err
}
log.Warningf(ctx, "job %d canceled due to %s", jobID, ptsExpired)
}
return nil

}

func updateJobPTSMetrics(
Expand Down

0 comments on commit faeda85

Please sign in to comment.