Skip to content

Commit

Permalink
first pass on slicer handling ex state resolves #518
Browse files Browse the repository at this point in the history
  • Loading branch information
jsnoble committed Sep 8, 2017
1 parent 583fcb1 commit 997e18c
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 180 deletions.
46 changes: 3 additions & 43 deletions lib/cluster/services/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,64 +121,24 @@ module.exports = function(context, server) {
//remove any pending worker request on completed jobs
logger.debug(`job for ex_id: ${data.ex_id} has finished, removing any from pending queue`);
logger.trace(`job ex_id: ${data.ex_id} has finished, message:`, data);

pendingWorkerRequests.remove(data.ex_id);

messaging.send('cluster:job:stop', data);
//if errors in slices, emit error events
if (data.errorCount > 0) {
var message = `job: ${data.ex_id} had ${data.errorCount} slice failures during processing`;
data.error = message;

logger.warn(message);
events.emit('cluster:job_failure', data)
}
else {
events.emit('cluster:job_finished', data);
}
});

messaging.register('slicer:recovery:failed', function(data) {
//remove any pending worker request
logger.debug(`slicer for execution: ${data.ex_id} has failed on recovery`);
pendingWorkerRequests.remove(data.ex_id);
messaging.send('cluster:job:stop', {ex_id: data.ex_id});
events.emit('cluster:job_failure', data);
});

messaging.register('job:error:terminal', function(data) {
logger.debug(`terminal job error for execution: ${data.ex_id}, canceling job`);

pendingWorkerRequests.remove(data.ex_id);
messaging.send('cluster:job:stop', {ex_id: data.ex_id});
events.emit('cluster:job_failure', data);
});

messaging.register('slicer:error:terminal', function(data) {
logger.debug(`terminal slicer error for execution: ${data.ex_id}, canceling job`);

pendingWorkerRequests.remove(data.ex_id);
messaging.send('cluster:job:stop', {ex_id: data.ex_id});
events.emit('cluster:slicer_failure', data);
});

messaging.register('slicer:processing:error', function(data) {
logger.debug(`an error has occurred processing a slice, ex_id: ${data.ex_id}`);
logger.trace(`error processing slice,ex_id: ${data.ex_id}, message:`, data);
events.emit('slicer:processing:error', data)
});

messaging.register('slicer:initialized', function(data) {
logger.debug(`slicer has initialized, ex_id: ${data.ex_id}`);
logger.trace(`slicer initialized, ex_id: ${data.ex_id}, message:`, data);

events.emit('slicer:initialized', data)
});

messaging.register('slicer:job:update', function(ex_Update) {
logger.debug(`updating ex: ${ex_Update.ex_id}`);
logger.trace(`updating ex: ${ex_Update.ex_id}, message:`, ex_Update);
events.emit('slicer:job:update', ex_Update);
if (!data.markedJob) {
events.emit('cluster:job_failure', data);
}
});

messaging.register('network:error', function(err) {
Expand Down
125 changes: 29 additions & 96 deletions lib/cluster/services/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ var moderatorPausedQueue = new Queue;
*/


//if changing VALID_STATUS, make sure to modify getLatestExecution, it expects first 7 values are active states and the rest are finished states
var VALID_STATUS = [
'pending', 'scheduling', 'initializing', 'running', 'failing', 'paused', 'moderator_paused',
'completed', 'stopped', 'rejected', 'failed', 'terminated'
];

// Maps job notification to execution states
var STATE_MAPPING = {
'stop': 'stopped',
Expand Down Expand Up @@ -54,16 +48,7 @@ module.exports = function(context, cluster_service) {
var shortid = require('shortid');
var pendingJobsScheduler;

events.on('cluster:job_finished', function(data) {
logger.info(`job ${data.ex_id} has completed`);
var metaData = {_slicer_stats: data.slicer_stats};

if (data.hasRecovered) {
metaData._has_errors = 'recovered';
}
_setStatus(data, 'completed', metaData)
});

//need in the case the slicer is unable to mark the ex
events.on('cluster:job_failure', function(data) {
var metaData = {_has_errors: 'true', _slicer_stats: data.slicer_stats};
logger.error(`job ${data.ex_id} has failed to complete`);
Expand All @@ -72,35 +57,7 @@ module.exports = function(context, cluster_service) {
metaData._failureReason = data.error;
}

_setStatus(data, 'failed', metaData)
});

events.on('cluster:slicer_failure', function(data) {
var metaData = {_has_errors: 'true', _slicer_stats: data.slicer_stats};
logger.error(`slicer for ex_id: ${data.ex_id} has failed to initialize`);

if (data.error) {
metaData._failureReason = data.error;
}

_setStatus(data, 'failed', metaData)
});

events.on('slicer:processing:error', function(data) {
logger.error(`job ${data.ex_id} has had at least one slice failure and will be marked 'failed'`);
var metaData = {_has_errors: 'true'};

_setStatus(data, 'failing', metaData)
});

events.on('slicer:initialized', function(data) {
logger.info(`job: ${data.ex_id} is now running`);
_setStatus(data, 'running')
});

events.on('slicer:job:update', function(updateSpec) {
//this is updating the opConfig for elasticsearch start and/or end dates for ex, this assumes elasticsearch is first
updateEX(updateSpec.ex_id, {operations: updateSpec.update})
ex_store.setStatus(data.ex_id, 'failed', metaData)
});

events.on('cluster_service:cleanup_job', function(data) {
Expand Down Expand Up @@ -287,29 +244,34 @@ module.exports = function(context, cluster_service) {
function createExecutionContext(job) {
return saveJob(job, 'ex')
.then(function(ex) {
return Promise.all([_setStatus(ex, 'pending'), checkModerator(job)])
})
.spread(function(ex, moderatorResponse) {
var canRun = _.every(moderatorResponse, function(db) {
return db.canRun === true
});
return Promise.all([ex_store.setStatus(ex.ex_id, 'pending'), checkModerator(job)])
.spread(function(ex_id, moderatorResponse) {
var canRun = _.every(moderatorResponse, function(db) {
return db.canRun === true
});

if (canRun) {
logger.debug(`enqueueing job to be processed, job`, ex);
pendingExecutionQueue.enqueue(ex);
}
else {
logger.warn(`job cannot be run due to throttled database connections`);
moderatorPausedQueue.enqueue(ex);
}
if (canRun) {
logger.debug(`enqueueing job to be processed, job`, ex);
pendingExecutionQueue.enqueue(ex);
}
else {
logger.warn(`job cannot be run due to throttled database connections`);
moderatorPausedQueue.enqueue(ex);
}

return {job_id: ex.job_id};
return {job_id: ex.job_id};
})
.catch(function(err) {
var errMsg = parseError(err);
logger.error(`could not set to pending`, errMsg);
return Promise.reject(errMsg)
})
})
.catch(function(err) {
var errMsg = parseError(err);
logger.error(`could not create execution context`, errMsg);
return Promise.reject(errMsg)
});
})
}

function ensureAssets(job_spec) {
Expand Down Expand Up @@ -489,7 +451,7 @@ module.exports = function(context, cluster_service) {
var query = `job_id: ${job_id} AND _context:ex`;

if (checkIfActive) {
var str = VALID_STATUS.slice(7).map(state => ` _status:${state} `).join('OR');
var str = ex_store.getTerminalStatuses().map(state => ` _status:${state} `).join('OR');
query = `job_id: ${job_id} AND _context:ex NOT (${str.trim()})`
}

Expand Down Expand Up @@ -518,7 +480,7 @@ module.exports = function(context, cluster_service) {

logger.info(`Scheduling job: ${executionContext.ex_id}`);

_setStatus(executionContext, 'scheduling')
ex_store.setStatus(executionContext.ex_id, 'scheduling')
.then(function() {
cluster_service.allocateSlicer(executionContext, recover_execution)
.then(function() {
Expand All @@ -545,7 +507,7 @@ module.exports = function(context, cluster_service) {
var errMsg = parseError(err);
logger.error(`Failure during worker allocation - ${errMsg}`);
isJobBeingAllocated = false;
_setStatus(executionContext, 'failed');
ex_store.setStatus(executionContext.ex_id, 'failed');
allocator();
});
});
Expand All @@ -560,7 +522,7 @@ module.exports = function(context, cluster_service) {
.then(function(job_spec) {
logger.info(`job ${ex_id} has changed to state`, state);

return _setStatus(job_spec, state);
return ex_store.setStatus(job_spec.ex_id, state);
})
.then(function(job) {
return state
Expand Down Expand Up @@ -609,35 +571,6 @@ module.exports = function(context, cluster_service) {
});
}

function _setStatus(job_spec, status, metaData) {
if (VALID_STATUS.indexOf(status) != -1) {
return getExecutionContext(job_spec.ex_id)
.then(function(ex) {
var statusObj = {_status: status};
if (metaData) {
_.assign(statusObj, metaData)
}
return ex_store.update(job_spec.ex_id, statusObj)
.then(function() {
return job_spec;
})
.catch(function(err) {
var errMsg = parseError(err);
logger.error('error update ex_store, error:', errMsg, job_spec.ex_id, statusObj);
return Promise.reject(errMsg);
});
})
.catch(function(err) {
var errMsg = parseError(err);
logger.error('was not able to _setStatus, error:', errMsg, job_spec.ex_id, status, metaData);
return Promise.reject(errMsg);
})
}
else {
return Promise.reject(`Invalid Job status: ${status}`);
}
}

function _validateJob(job_spec) {
return new Promise(function(resolve, reject) {
// This will throw errors if the job does not pass validation.
Expand All @@ -654,7 +587,7 @@ module.exports = function(context, cluster_service) {

function shutdown() {
logger.info(`shutting down`);
var query = VALID_STATUS.slice(0, 7).map(str => `_status:${str}`).join(" OR ");
var query = ex_store.getRunningStatuses().map(str => `_status:${str}`).join(" OR ");
return ex_search(query)
.map(function(job) {
logger.warn(`marking job ${job.job_id}, ex_id: ${job.ex_id} as terminated`);
Expand Down Expand Up @@ -700,7 +633,7 @@ module.exports = function(context, cluster_service) {
//pendingExecutionQueue.enqueue(job);
}
else {
//_setStatus(job, 'aborted');
//ex_store.setStatus(job.ex_id, 'aborted');
}
})
.then(function() {
Expand Down
6 changes: 2 additions & 4 deletions lib/cluster/services/messaging.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ var networkMapping = {
'cluster:node:get_port': 'cluster:node:get_port',
'slicer:recovery:failed': 'slicer:recovery:failed',
'slicer:job:finished': 'slicer:job:finished',
'slicer:processing:error': 'slicer:processing:error',
'slicer:initialized': 'slicer:initialized',
'slicer:job:update': 'slicer:job:update',
'slicer:slice:new': 'slicer:slice:new',
'slicer:slice:recorded': 'slicer:slice:recorded',
'slicer:error:terminal': 'slicer:error:terminal',
'worker:ready': 'worker:ready',
'worker:slice:complete': 'worker:slice:complete',
'job:error:terminal': 'job:error:terminal'
Expand Down Expand Up @@ -298,6 +294,7 @@ module.exports = function messaging(context, logger, childHookFn) {

//process is an event emitter, we hook through message event, but we register the functions through register
if (config.clients.ipcClient) {
//TODO need better explanation of whats happening
process.on('message', function(ipcMessage) {
var msg = ipcMessage.message;
var realMsg = processMapping[msg];
Expand All @@ -310,6 +307,7 @@ module.exports = function messaging(context, logger, childHookFn) {
});
}

//TODO need better explanation of whats happening
if (!config.clients.ipcClient) {
processContext.on('online', function(worker) {
logger.debug('worker has come online');
Expand Down
Loading

0 comments on commit 997e18c

Please sign in to comment.