Skip to content

Commit

Permalink
persitent jobs can return to running resolves #654
Browse files Browse the repository at this point in the history
  • Loading branch information
jsnoble committed Feb 13, 2018
1 parent 18b7c6a commit d39d2be
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 10 deletions.
43 changes: 37 additions & 6 deletions lib/cluster/execution_controller/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,13 @@ module.exports = function module(context, messaging, exStore, stateStore, execut

// Recovery has it own error listening logic internally
if (!isRecovery) {
events.once('slice:failure', () => {
logger.error(`slicer: ${exId} has encountered a processing_error`);
const executionStats = executionAnalytics.getAnalytics();
const errorMeta = exStore.failureMetaData(null, executionStats);
exStore.setStatus(exId, 'failing', errorMeta);
});
if (executionConfig.lifecycle === 'once') {
events.once('slice:failure', _setFailingStatus);
} else {
// in persistent mode we set watchdogs to monitor
// when failing can be set back to running
events.on('slice:failure', _checkAndUpdateExecutionState());
}
}
}

Expand Down Expand Up @@ -442,6 +443,36 @@ module.exports = function module(context, messaging, exStore, stateStore, execut
});
}

function _checkAndUpdateExecutionState() {
let watchDogSet = false;
let errorCount;
let watcher;

return () => {
if (!watchDogSet) {
watchDogSet = true;
errorCount = executionAnalytics.getAnalytics().failed;
_setFailingStatus();

watcher = setInterval(() => {
const currentErrorCount = executionAnalytics.getAnalytics().failed;
if (errorCount === currentErrorCount) {
clearInterval(watcher);
logger.info(`No slice errors have occurred within execution: ${exId} will be set back to 'running' state`);
exStore.setStatus(exId, 'running');
}
}, executionConfig.probation_window);
}
};
}

function _setFailingStatus() {
logger.error(`slicer: ${exId} has encountered a processing_error`);
const executionStats = executionAnalytics.getAnalytics();
const errorMeta = exStore.failureMetaData(null, executionStats);
exStore.setStatus(exId, 'failing', errorMeta);
}

function _watchDog(checkFn, timeout, errMsg, logMsg) {
setTimeout(() => {
// if after a a set time there are still no workers, it will shutdown
Expand Down
11 changes: 11 additions & 0 deletions lib/config/schemas/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ function jobSchema(context) {
}
}
}
},
probation_window: {
doc: 'time in ms that the execution controller checks for failed slices, if there are none then it updates the state of the execution to running (this is only when lifecycle is set to persistent)',
default: 300000,
format(val) {
if (isNaN(val)) {
throw new Error('probation_window parameter for job must be a number');
} else if (val < 1) {
throw new Error('probation_window for job must be >= one');
}
}
}
};
}
Expand Down
3 changes: 2 additions & 1 deletion spec/config/validators/config-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ describe('When passed a valid jobSchema and jobConfig', () => {
{ _op: 'noop' }
],
assets: null,
moderator: null
moderator: null,
probation_window: 300000
};

const jobConfig = configValidator.validateConfig(jobSchema, jobSpec);
Expand Down
31 changes: 28 additions & 3 deletions spec/execution_controller/engine-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const engineCode = require('../../lib/cluster/execution_controller/engine');
const events = require('events');
const Promise = require('bluebird');
const _ = require('lodash');

const eventEmitter = new events.EventEmitter();

Expand Down Expand Up @@ -64,7 +65,7 @@ describe('execution engine', () => {
respond: (msg) => { respondingMsg = msg; }
};
const executionAnalytics = {
getAnalytics: () => ({}),
getAnalytics: () => ({failed: 1}),
set: () => {},
increment: () => {},
shutdown: () => {}
Expand Down Expand Up @@ -139,8 +140,9 @@ describe('execution engine', () => {
return newEmitter;
}

function makeEngine() {
return engineCode(context, messaging, exStore, stateStore, executionContext);
function makeEngine(_execution) {
const execution = _execution || executionContext;
return engineCode(context, messaging, exStore, stateStore, execution);
}

it('can instantiate', (done) => {
Expand Down Expand Up @@ -394,6 +396,29 @@ describe('execution engine', () => {
.finally(done);
});

it('failed slice can recover to running status in persistent mode', (done) => {
const newExecution = _.cloneDeep(executionContext);
newExecution.config.lifecycle = 'persistent';
newExecution.config.probation_window = 100;
const myEmitter = makeEmitter();
const engine = makeEngine(newExecution);
const engineTextContext = engine.__test_context(executionAnalytics, slicerAnalytics, recovery, 1234);
const registerSlicers = engineTextContext._registerSlicers;

registerSlicers([() => {}], false);
myEmitter.emit('slice:failure');
waitFor(10)
.then(() => {
expect(exStatus).toEqual('failing');
return waitFor(150);
})
.then(() => {
expect(exStatus).toEqual('running');
})
.catch(fail)
.finally(done);
});

it('slicer init retry will attempt to create slicer', (done) => {
const oldSlicer = executionContext.slicer.newSlicer;
let errorCounter = 0;
Expand Down

0 comments on commit d39d2be

Please sign in to comment.