From d39d2bee792715fa344942ccba297ddaed0ebc56 Mon Sep 17 00:00:00 2001 From: Jared Noble Date: Tue, 30 Jan 2018 12:44:08 -0700 Subject: [PATCH] persitent jobs can return to running resolves #654 --- lib/cluster/execution_controller/engine.js | 43 +++++++++++++++++++--- lib/config/schemas/job.js | 11 ++++++ spec/config/validators/config-spec.js | 3 +- spec/execution_controller/engine-spec.js | 31 ++++++++++++++-- 4 files changed, 78 insertions(+), 10 deletions(-) diff --git a/lib/cluster/execution_controller/engine.js b/lib/cluster/execution_controller/engine.js index 9374359c284..30a9c200732 100644 --- a/lib/cluster/execution_controller/engine.js +++ b/lib/cluster/execution_controller/engine.js @@ -146,12 +146,13 @@ module.exports = function module(context, messaging, exStore, stateStore, execut // Recovery has it own error listening logic internally if (!isRecovery) { - events.once('slice:failure', () => { - logger.error(`slicer: ${exId} has encountered a processing_error`); - const executionStats = executionAnalytics.getAnalytics(); - const errorMeta = exStore.failureMetaData(null, executionStats); - exStore.setStatus(exId, 'failing', errorMeta); - }); + if (executionConfig.lifecycle === 'once') { + events.once('slice:failure', _setFailingStatus); + } else { + // in persistent mode we set watchdogs to monitor + // when failing can be set back to running + events.on('slice:failure', _checkAndUpdateExecutionState()); + } } } @@ -442,6 +443,36 @@ module.exports = function module(context, messaging, exStore, stateStore, execut }); } + function _checkAndUpdateExecutionState() { + let watchDogSet = false; + let errorCount; + let watcher; + + return () => { + if (!watchDogSet) { + watchDogSet = true; + errorCount = executionAnalytics.getAnalytics().failed; + _setFailingStatus(); + + watcher = setInterval(() => { + const currentErrorCount = executionAnalytics.getAnalytics().failed; + if (errorCount === currentErrorCount) { + clearInterval(watcher); + logger.info(`No slice errors have occurred within execution: ${exId} will be set back to 'running' state`); + exStore.setStatus(exId, 'running'); + } + }, executionConfig.probation_window); + } + }; + } + + function _setFailingStatus() { + logger.error(`slicer: ${exId} has encountered a processing_error`); + const executionStats = executionAnalytics.getAnalytics(); + const errorMeta = exStore.failureMetaData(null, executionStats); + exStore.setStatus(exId, 'failing', errorMeta); + } + function _watchDog(checkFn, timeout, errMsg, logMsg) { setTimeout(() => { // if after a a set time there are still no workers, it will shutdown diff --git a/lib/config/schemas/job.js b/lib/config/schemas/job.js index 0098edd5bc2..0cbd6bd3877 100644 --- a/lib/config/schemas/job.js +++ b/lib/config/schemas/job.js @@ -135,6 +135,17 @@ function jobSchema(context) { } } } + }, + probation_window: { + doc: 'time in ms that the execution controller checks for failed slices, if there are none then it updates the state of the execution to running (this is only when lifecycle is set to persistent)', + default: 300000, + format(val) { + if (isNaN(val)) { + throw new Error('probation_window parameter for job must be a number'); + } else if (val < 1) { + throw new Error('probation_window for job must be >= one'); + } + } } }; } diff --git a/spec/config/validators/config-spec.js b/spec/config/validators/config-spec.js index 499f678e7c9..0b713683dd3 100644 --- a/spec/config/validators/config-spec.js +++ b/spec/config/validators/config-spec.js @@ -34,7 +34,8 @@ describe('When passed a valid jobSchema and jobConfig', () => { { _op: 'noop' } ], assets: null, - moderator: null + moderator: null, + probation_window: 300000 }; const jobConfig = configValidator.validateConfig(jobSchema, jobSpec); diff --git a/spec/execution_controller/engine-spec.js b/spec/execution_controller/engine-spec.js index 7e578bfffee..ecd0d376b0a 100644 --- a/spec/execution_controller/engine-spec.js +++ b/spec/execution_controller/engine-spec.js @@ -3,6 +3,7 @@ const engineCode = require('../../lib/cluster/execution_controller/engine'); const events = require('events'); const Promise = require('bluebird'); +const _ = require('lodash'); const eventEmitter = new events.EventEmitter(); @@ -64,7 +65,7 @@ describe('execution engine', () => { respond: (msg) => { respondingMsg = msg; } }; const executionAnalytics = { - getAnalytics: () => ({}), + getAnalytics: () => ({failed: 1}), set: () => {}, increment: () => {}, shutdown: () => {} @@ -139,8 +140,9 @@ describe('execution engine', () => { return newEmitter; } - function makeEngine() { - return engineCode(context, messaging, exStore, stateStore, executionContext); + function makeEngine(_execution) { + const execution = _execution || executionContext; + return engineCode(context, messaging, exStore, stateStore, execution); } it('can instantiate', (done) => { @@ -394,6 +396,29 @@ describe('execution engine', () => { .finally(done); }); + it('failed slice can recover to running status in persistent mode', (done) => { + const newExecution = _.cloneDeep(executionContext); + newExecution.config.lifecycle = 'persistent'; + newExecution.config.probation_window = 100; + const myEmitter = makeEmitter(); + const engine = makeEngine(newExecution); + const engineTextContext = engine.__test_context(executionAnalytics, slicerAnalytics, recovery, 1234); + const registerSlicers = engineTextContext._registerSlicers; + + registerSlicers([() => {}], false); + myEmitter.emit('slice:failure'); + waitFor(10) + .then(() => { + expect(exStatus).toEqual('failing'); + return waitFor(150); + }) + .then(() => { + expect(exStatus).toEqual('running'); + }) + .catch(fail) + .finally(done); + }); + it('slicer init retry will attempt to create slicer', (done) => { const oldSlicer = executionContext.slicer.newSlicer; let errorCounter = 0;