Skip to content

Commit

Permalink
fix: explicitly delete timeout after it fires/fixes #82
Browse files Browse the repository at this point in the history
  • Loading branch information
shadowgate15 committed Feb 2, 2021
1 parent eac6acc commit 91d0242
Showing 1 changed file with 79 additions and 44 deletions.
123 changes: 79 additions & 44 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,47 @@ const {
const buildJob = require('./job-builder');
const validateJob = require('./job-validator');

// bthreads requires us to do this for web workers (see bthreads docs for insight)
// Bthreads requires us to do this for web workers (see bthreads docs for insight)
threads.Buffer = Buffer;

// instead of `threads.browser` checks below, we previously used this boolean
// Instead of `threads.browser` checks below, we previously used this boolean
// const hasFsStatSync = typeof fs === 'object' && typeof fs.statSync === 'function';

class Bree extends EventEmitter {
constructor(config) {
super();
this.config = {
// we recommend using Cabin for logging
// We recommend using Cabin for logging
// <https://cabinjs.com>
logger: console,
// set this to `false` to prevent requiring a root directory of jobs
// Set this to `false` to prevent requiring a root directory of jobs
// (e.g. if your jobs are not all in one directory)
root: threads.browser
? /* istanbul ignore next */
threads.resolve('jobs')
root: threads.browser /* istanbul ignore next */
? threads.resolve('jobs')
: resolve('jobs'),
// default timeout for jobs
// Default timeout for jobs
// (set this to `false` if you do not wish for a default timeout to be set)
timeout: 0,
// default interval for jobs
// Default interval for jobs
// (set this to `0` for no interval, and > 0 for a default interval to be set)
interval: 0,
// this is an Array of your job definitions (see README for examples)
// This is an Array of your job definitions (see README for examples)
jobs: [],
// <https://breejs.github.io/later/parsers.html#cron>
// (can be overridden on a job basis with same prop name)
hasSeconds: false,
// <https://github.com/Airfooox/cron-validate>
cronValidate: {},
// if you set a value > 0 here, then it will terminate workers after this time (ms)
// If you set a value > 0 here, then it will terminate workers after this time (ms)
closeWorkerAfterMs: 0,
// could also be mjs if desired
// Could also be mjs if desired
// (this is the default extension if you just specify a job's name without ".js" or ".mjs")
defaultExtension: 'js',
// default worker options to pass to ~`new Worker`~ `new threads.Worker`
// Default worker options to pass to ~`new Worker`~ `new threads.Worker`
// (can be overridden on a per job basis)
// <https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options>
worker: {},
// custom handler to execute when error events are emmited by the workers or when they exit
// Custom handler to execute when error events are emmited by the workers or when they exit
// with non-zero code
// pass in a callback function with following signature: `(error, workerMetadata) => { // custom handling here }`
errorHandler: null,
Expand All @@ -87,7 +86,7 @@ class Bree extends EventEmitter {
// `cronValidate` object has `override` object with `useSeconds` set to `true`
// <https://github.com/breejs/bree/issues/7>
//
if (this.config.hasSeconds)
if (this.config.hasSeconds) {
this.config.cronValidate = {
...this.config.cronValidate,
preset:
Expand All @@ -101,6 +100,7 @@ class Bree extends EventEmitter {
useSeconds: true
}
};
}

debug('config', this.config);

Expand All @@ -122,23 +122,24 @@ class Bree extends EventEmitter {
this.getHumanToMs = getHumanToMs;
this.parseValue = parseValue;

// validate root (sync check)
// Validate root (sync check)
if (isSANB(this.config.root)) {
/* istanbul ignore next */
if (!threads.browser && isValidPath(this.config.root)) {
const stats = fs.statSync(this.config.root);
if (!stats.isDirectory())
if (!stats.isDirectory()) {
throw new Error(
`Root directory of ${this.config.root} does not exist`
);
}
}
}

// validate timeout
// Validate timeout
this.config.timeout = this.parseValue(this.config.timeout);
debug('timeout', this.config.timeout);

// validate interval
// Validate interval
this.config.interval = this.parseValue(this.config.interval);
debug('interval', this.config.interval);

Expand All @@ -160,14 +161,15 @@ class Bree extends EventEmitter {
//
// validate jobs
//
if (!Array.isArray(this.config.jobs))
throw new Error('Jobs must be an Array');
if (!Array.isArray(this.config.jobs)) {
throw new TypeError('Jobs must be an Array');
}

// provide human-friendly errors for complex configurations
// Provide human-friendly errors for complex configurations
const errors = [];

/*
jobs = [
Jobs = [
'name',
{ name: 'boot' },
{ name: 'timeout', timeout: ms('3s') },
Expand Down Expand Up @@ -195,20 +197,27 @@ class Bree extends EventEmitter {
}
}

// if there were any errors then throw them
if (errors.length > 0) throw combineErrors(errors);
// If there were any errors then throw them
if (errors.length > 0) {
throw combineErrors(errors);
}

debug('this.config.jobs', this.config.jobs);
}

getWorkerMetadata(name, meta = {}) {
const job = this.config.jobs.find((j) => j.name === name);
if (!job) throw new Error(`Job "${name}" does not exist`);
if (!this.config.outputWorkerMetadata && !job.outputWorkerMetadata)
if (!job) {
throw new Error(`Job "${name}" does not exist`);
}

if (!this.config.outputWorkerMetadata && !job.outputWorkerMetadata) {
return meta &&
(typeof meta.err !== 'undefined' || typeof meta.message !== 'undefined')
? meta
: undefined;
}

return this.workers[name]
? {
...meta,
Expand All @@ -226,12 +235,17 @@ class Bree extends EventEmitter {
if (name) {
this.config.logger.info(new Date());
const job = this.config.jobs.find((j) => j.name === name);
if (!job) throw new Error(`Job "${name}" does not exist`);
if (this.workers[name])
if (!job) {
throw new Error(`Job "${name}" does not exist`);
}

if (this.workers[name]) {
return this.config.logger.warn(
new Error(`Job "${name}" is already running`),
this.getWorkerMetadata(name)
);
}

debug('starting worker', name);
const object = {
...(this.config.worker ? this.config.worker : {}),
Expand All @@ -248,7 +262,7 @@ class Bree extends EventEmitter {
this.emit('worker created', name);
debug('worker started', name);

// if we specified a value for `closeWorkerAfterMs`
// If we specified a value for `closeWorkerAfterMs`
// then we need to terminate it after that execution time
const closeWorkerAfterMs = Number.isFinite(job.closeWorkerAfterMs)
? job.closeWorkerAfterMs
Expand Down Expand Up @@ -348,15 +362,19 @@ class Bree extends EventEmitter {
debug('start', name);
if (name) {
const job = this.config.jobs.find((j) => j.name === name);
if (!job) throw new Error(`Job ${name} does not exist`);
if (this.timeouts[name] || this.intervals[name])
if (!job) {
throw new Error(`Job ${name} does not exist`);
}

if (this.timeouts[name] || this.intervals[name]) {
return this.config.logger.warn(
new Error(`Job "${name}" is already started`)
);
}

debug('job', job);

// check for date and if it is in the past then don't run it
// Check for date and if it is in the past then don't run it
if (job.date instanceof Date) {
debug('job date', job);
if (job.date.getTime() < Date.now()) {
Expand All @@ -380,13 +398,14 @@ class Bree extends EventEmitter {
);
} else {
debug('job.date was scheduled to run only once', job);
delete this.timeouts[name];
}

delete this.timeouts[name];
}, job.date.getTime() - Date.now());
return;
}

// this is only complex because both timeout and interval can be a schedule
// This is only complex because both timeout and interval can be a schedule
if (this.isSchedule(job.timeout)) {
debug('job timeout is schedule', job);
this.timeouts[name] = later.setTimeout(() => {
Expand All @@ -404,6 +423,8 @@ class Bree extends EventEmitter {
job.interval
);
}

delete this.timeouts[name];
}, job.timeout);
return;
}
Expand All @@ -426,6 +447,8 @@ class Bree extends EventEmitter {
job.interval
);
}

delete this.timeouts[name];
}, job.timeout);
} else if (this.isSchedule(job.interval)) {
debug('job.interval is schedule', job);
Expand All @@ -452,17 +475,21 @@ class Bree extends EventEmitter {
if (
typeof this.timeouts[name] === 'object' &&
typeof this.timeouts[name].clear === 'function'
)
) {
this.timeouts[name].clear();
}

delete this.timeouts[name];
}

if (this.intervals[name]) {
if (
typeof this.intervals[name] === 'object' &&
typeof this.intervals[name].clear === 'function'
)
) {
this.intervals[name].clear();
}

delete this.intervals[name];
}

Expand All @@ -483,8 +510,10 @@ class Bree extends EventEmitter {
if (
typeof this.closeWorkerAfterMs[name] === 'object' &&
typeof this.closeWorkerAfterMs[name].clear === 'function'
)
) {
this.closeWorkerAfterMs[name].clear();
}

delete this.closeWorkerAfterMs[name];
}

Expand All @@ -502,7 +531,9 @@ class Bree extends EventEmitter {
//
// make sure jobs is an array
//
if (!Array.isArray(jobs)) jobs = [jobs];
if (!Array.isArray(jobs)) {
jobs = [jobs];
}

const errors = [];

Expand All @@ -524,22 +555,26 @@ class Bree extends EventEmitter {

debug('jobs added', this.config.jobs);

// if there were any errors then throw them
if (errors.length > 0) throw combineErrors(errors);
// If there were any errors then throw them
if (errors.length > 0) {
throw combineErrors(errors);
}
}

remove(name) {
const job = this.config.jobs.find((j) => j.name === name);
if (!job) throw new Error(`Job "${name}" does not exist`);
if (!job) {
throw new Error(`Job "${name}" does not exist`);
}

this.config.jobs = this.config.jobs.filter((j) => j.name !== name);

// make sure it also closes any open workers
// Make sure it also closes any open workers
this.stop(name);
}
}

// expose bthreads (useful for tests)
// Expose bthreads (useful for tests)
// https://github.com/chjj/bthreads#api
Bree.threads = {
backend: threads.backend,
Expand Down

0 comments on commit 91d0242

Please sign in to comment.