diff --git a/pkg/jobs/metricspoller/job_statistics.go b/pkg/jobs/metricspoller/job_statistics.go index 0c975966877e..b70ce2f9789e 100644 --- a/pkg/jobs/metricspoller/job_statistics.go +++ b/pkg/jobs/metricspoller/job_statistics.go @@ -33,7 +33,7 @@ import ( const pausedJobsCountQuery = string(` SELECT job_type, count(*) FROM system.jobs - WHERE status = '` + jobs.StatusPaused + `' + WHERE status = '` + jobs.StatusPaused + `' GROUP BY job_type`) // updatePausedMetrics counts the number of paused jobs per job type. @@ -163,42 +163,50 @@ func processJobPTSRecord( ptsStats map[jobspb.Type]*ptsStat, txn isql.Txn, ) error { - j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(jobID), txn) + var stats *ptsStat + defer func() { + if stats != nil { + stats.numRecords++ + if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) { + stats.oldest = rec.Timestamp + } + } + }() + + err := execCfg.JobRegistry.UpdateJobWithTxn(ctx, jobspb.JobID(jobID), txn, + func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + p := md.Payload + jobType, err := p.CheckType() + if err != nil { + return err + } + stats = ptsStats[jobType] + if stats == nil { + stats = &ptsStat{} + ptsStats[jobType] = stats + } + + // If MaximumPTSAge is set on the job payload, verify if PTS record + // timestamp is fresh enough. + if p.MaximumPTSAge > 0 && + rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) { + stats.expired++ + ptsExpired := errors.Newf( + "protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s", + rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge) + log.Warningf(ctx, "job %d canceled due to %s", jobID, ptsExpired) + return ju.CancelRequestedWithReason(ctx, md, ptsExpired) + } + return nil + }) if err != nil { if jobs.HasJobNotFoundError(err) { return nil // nolint:returnerrcheck -- job maybe deleted when we run; just keep going. } return err } - p := j.Payload() - jobType, err := p.CheckType() - if err != nil { - return err - } - stats := ptsStats[jobType] - if stats == nil { - stats = &ptsStat{} - ptsStats[jobType] = stats - } - stats.numRecords++ - if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) { - stats.oldest = rec.Timestamp - } - - // If MaximumPTSAge is set on the job payload, verify if PTS record - // timestamp is fresh enough. - if p.MaximumPTSAge > 0 && - rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) { - stats.expired++ - ptsExpired := errors.Newf( - "protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s", - rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge) - if err := j.WithTxn(txn).CancelRequestedWithReason(ctx, ptsExpired); err != nil { - return err - } - log.Warningf(ctx, "job %d canceled due to %s", jobID, ptsExpired) - } return nil + } func updateJobPTSMetrics(