Skip to content

Commit

Permalink
fix(worker): mitigate job's locks extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Dec 14, 2024
1 parent af02061 commit d7cceab
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -835,14 +835,14 @@ will never work with more accuracy than 1ms. */
});

const handleCompleted = async (result: ResultType) => {
jobsInProgress.delete(inProgressItem);

if (!this.connection.closing) {
const completed = await job.moveToCompleted(
result,
token,
fetchNextCallback() && !(this.closing || this.paused),
);
jobsInProgress.delete(inProgressItem);

this.emit('completed', job, result, 'active');

span?.addEvent('job completed', {
Expand All @@ -857,8 +857,6 @@ will never work with more accuracy than 1ms. */
};

const handleFailed = async (err: Error) => {
jobsInProgress.delete(inProgressItem);

if (!this.connection.closing) {
try {
// Check if the job was manually rate-limited
Expand All @@ -877,6 +875,8 @@ will never work with more accuracy than 1ms. */
}

const result = await job.moveToFailed(err, token, true);
jobsInProgress.delete(inProgressItem);

this.emit('failed', job, err, 'active');

span?.addEvent('job failed', {
Expand Down Expand Up @@ -911,6 +911,8 @@ will never work with more accuracy than 1ms. */
const failed = await handleFailed(<Error>err);
return failed;
} finally {
jobsInProgress.delete(inProgressItem);

span?.setAttributes({
[TelemetryAttributes.JobFinishedTimestamp]: Date.now(),
[TelemetryAttributes.JobProcessedTimestamp]: processedOn,
Expand Down Expand Up @@ -1121,6 +1123,9 @@ will never work with more accuracy than 1ms. */

for (const item of jobsInProgress) {
const { job, ts } = item;

// In theory ts should always be defined here so this
// check should not be necessary.
if (!ts) {
item.ts = now;
continue;
Expand All @@ -1141,7 +1146,7 @@ will never work with more accuracy than 1ms. */
}

this.startLockExtenderTimer(jobsInProgress);
}, this.opts.lockRenewTime / 2);
}, this.opts.lockRenewTime / 10);
}
}
}
Expand Down Expand Up @@ -1207,8 +1212,6 @@ will never work with more accuracy than 1ms. */
);

for (const jobId of erroredJobIds) {
// TODO: Send signal to process function that the job has been lost.

this.emit(
'error',
new Error(`could not renew lock for job ${jobId}`),
Expand Down

0 comments on commit d7cceab

Please sign in to comment.