diff --git a/lib/cluster/services/cluster.js b/lib/cluster/services/cluster.js index cbd1694c090..4fe59299148 100644 --- a/lib/cluster/services/cluster.js +++ b/lib/cluster/services/cluster.js @@ -121,21 +121,8 @@ module.exports = function(context, server) { //remove any pending worker request on completed jobs logger.debug(`job for ex_id: ${data.ex_id} has finished, removing any from pending queue`); logger.trace(`job ex_id: ${data.ex_id} has finished, message:`, data); - pendingWorkerRequests.remove(data.ex_id); - messaging.send('cluster:job:stop', data); - //if errors in slices, emit error events - if (data.errorCount > 0) { - var message = `job: ${data.ex_id} had ${data.errorCount} slice failures during processing`; - data.error = message; - - logger.warn(message); - events.emit('cluster:job_failure', data) - } - else { - events.emit('cluster:job_finished', data); - } }); messaging.register('slicer:recovery:failed', function(data) { @@ -143,42 +130,15 @@ module.exports = function(context, server) { logger.debug(`slicer for execution: ${data.ex_id} has failed on recovery`); pendingWorkerRequests.remove(data.ex_id); messaging.send('cluster:job:stop', {ex_id: data.ex_id}); - events.emit('cluster:job_failure', data); }); messaging.register('job:error:terminal', function(data) { logger.debug(`terminal job error for execution: ${data.ex_id}, canceling job`); - - pendingWorkerRequests.remove(data.ex_id); - messaging.send('cluster:job:stop', {ex_id: data.ex_id}); - events.emit('cluster:job_failure', data); - }); - - messaging.register('slicer:error:terminal', function(data) { - logger.debug(`terminal slicer error for execution: ${data.ex_id}, canceling job`); - pendingWorkerRequests.remove(data.ex_id); messaging.send('cluster:job:stop', {ex_id: data.ex_id}); - events.emit('cluster:slicer_failure', data); - }); - - messaging.register('slicer:processing:error', function(data) { - logger.debug(`an error has occurred processing a slice, ex_id: ${data.ex_id}`); - logger.trace(`error processing slice,ex_id: ${data.ex_id}, message:`, data); - events.emit('slicer:processing:error', data) - }); - - messaging.register('slicer:initialized', function(data) { - logger.debug(`slicer has initialized, ex_id: ${data.ex_id}`); - logger.trace(`slicer initialized, ex_id: ${data.ex_id}, message:`, data); - - events.emit('slicer:initialized', data) - }); - - messaging.register('slicer:job:update', function(ex_Update) { - logger.debug(`updating ex: ${ex_Update.ex_id}`); - logger.trace(`updating ex: ${ex_Update.ex_id}, message:`, ex_Update); - events.emit('slicer:job:update', ex_Update); + if (!data.markedJob) { + events.emit('cluster:job_failure', data); + } }); messaging.register('network:error', function(err) { diff --git a/lib/cluster/services/jobs.js b/lib/cluster/services/jobs.js index 839cb3fd4fe..9c066d64ad1 100644 --- a/lib/cluster/services/jobs.js +++ b/lib/cluster/services/jobs.js @@ -19,12 +19,6 @@ var moderatorPausedQueue = new Queue; */ -//if changing VALID_STATUS, make sure to modify getLatestExecution, it expects first 7 values are active states and the rest are finished states -var VALID_STATUS = [ - 'pending', 'scheduling', 'initializing', 'running', 'failing', 'paused', 'moderator_paused', - 'completed', 'stopped', 'rejected', 'failed', 'terminated' -]; - // Maps job notification to execution states var STATE_MAPPING = { 'stop': 'stopped', @@ -54,16 +48,7 @@ module.exports = function(context, cluster_service) { var shortid = require('shortid'); var pendingJobsScheduler; - events.on('cluster:job_finished', function(data) { - logger.info(`job ${data.ex_id} has completed`); - var metaData = {_slicer_stats: data.slicer_stats}; - - if (data.hasRecovered) { - metaData._has_errors = 'recovered'; - } - _setStatus(data, 'completed', metaData) - }); - + //need in the case the slicer is unable to mark the ex events.on('cluster:job_failure', function(data) { var metaData = {_has_errors: 'true', _slicer_stats: data.slicer_stats}; logger.error(`job ${data.ex_id} has failed to complete`); @@ -72,35 +57,7 @@ module.exports = function(context, cluster_service) { metaData._failureReason = data.error; } - _setStatus(data, 'failed', metaData) - }); - - events.on('cluster:slicer_failure', function(data) { - var metaData = {_has_errors: 'true', _slicer_stats: data.slicer_stats}; - logger.error(`slicer for ex_id: ${data.ex_id} has failed to initialize`); - - if (data.error) { - metaData._failureReason = data.error; - } - - _setStatus(data, 'failed', metaData) - }); - - events.on('slicer:processing:error', function(data) { - logger.error(`job ${data.ex_id} has had at least one slice failure and will be marked 'failed'`); - var metaData = {_has_errors: 'true'}; - - _setStatus(data, 'failing', metaData) - }); - - events.on('slicer:initialized', function(data) { - logger.info(`job: ${data.ex_id} is now running`); - _setStatus(data, 'running') - }); - - events.on('slicer:job:update', function(updateSpec) { - //this is updating the opConfig for elasticsearch start and/or end dates for ex, this assumes elasticsearch is first - updateEX(updateSpec.ex_id, {operations: updateSpec.update}) + ex_store.setStatus(data.ex_id, 'failed', metaData) }); events.on('cluster_service:cleanup_job', function(data) { @@ -287,29 +244,34 @@ module.exports = function(context, cluster_service) { function createExecutionContext(job) { return saveJob(job, 'ex') .then(function(ex) { - return Promise.all([_setStatus(ex, 'pending'), checkModerator(job)]) - }) - .spread(function(ex, moderatorResponse) { - var canRun = _.every(moderatorResponse, function(db) { - return db.canRun === true - }); + return Promise.all([ex_store.setStatus(ex.ex_id, 'pending'), checkModerator(job)]) + .spread(function(ex_id, moderatorResponse) { + var canRun = _.every(moderatorResponse, function(db) { + return db.canRun === true + }); - if (canRun) { - logger.debug(`enqueueing job to be processed, job`, ex); - pendingExecutionQueue.enqueue(ex); - } - else { - logger.warn(`job cannot be run due to throttled database connections`); - moderatorPausedQueue.enqueue(ex); - } + if (canRun) { + logger.debug(`enqueueing job to be processed, job`, ex); + pendingExecutionQueue.enqueue(ex); + } + else { + logger.warn(`job cannot be run due to throttled database connections`); + moderatorPausedQueue.enqueue(ex); + } - return {job_id: ex.job_id}; + return {job_id: ex.job_id}; + }) + .catch(function(err) { + var errMsg = parseError(err); + logger.error(`could not set to pending`, errMsg); + return Promise.reject(errMsg) + }) }) .catch(function(err) { var errMsg = parseError(err); logger.error(`could not create execution context`, errMsg); return Promise.reject(errMsg) - }); + }) } function ensureAssets(job_spec) { @@ -489,7 +451,7 @@ module.exports = function(context, cluster_service) { var query = `job_id: ${job_id} AND _context:ex`; if (checkIfActive) { - var str = VALID_STATUS.slice(7).map(state => ` _status:${state} `).join('OR'); + var str = ex_store.getTerminalStatuses().map(state => ` _status:${state} `).join('OR'); query = `job_id: ${job_id} AND _context:ex NOT (${str.trim()})` } @@ -518,7 +480,7 @@ module.exports = function(context, cluster_service) { logger.info(`Scheduling job: ${executionContext.ex_id}`); - _setStatus(executionContext, 'scheduling') + ex_store.setStatus(executionContext.ex_id, 'scheduling') .then(function() { cluster_service.allocateSlicer(executionContext, recover_execution) .then(function() { @@ -545,7 +507,7 @@ module.exports = function(context, cluster_service) { var errMsg = parseError(err); logger.error(`Failure during worker allocation - ${errMsg}`); isJobBeingAllocated = false; - _setStatus(executionContext, 'failed'); + ex_store.setStatus(executionContext.ex_id, 'failed'); allocator(); }); }); @@ -560,7 +522,7 @@ module.exports = function(context, cluster_service) { .then(function(job_spec) { logger.info(`job ${ex_id} has changed to state`, state); - return _setStatus(job_spec, state); + return ex_store.setStatus(job_spec.ex_id, state); }) .then(function(job) { return state @@ -609,35 +571,6 @@ module.exports = function(context, cluster_service) { }); } - function _setStatus(job_spec, status, metaData) { - if (VALID_STATUS.indexOf(status) != -1) { - return getExecutionContext(job_spec.ex_id) - .then(function(ex) { - var statusObj = {_status: status}; - if (metaData) { - _.assign(statusObj, metaData) - } - return ex_store.update(job_spec.ex_id, statusObj) - .then(function() { - return job_spec; - }) - .catch(function(err) { - var errMsg = parseError(err); - logger.error('error update ex_store, error:', errMsg, job_spec.ex_id, statusObj); - return Promise.reject(errMsg); - }); - }) - .catch(function(err) { - var errMsg = parseError(err); - logger.error('was not able to _setStatus, error:', errMsg, job_spec.ex_id, status, metaData); - return Promise.reject(errMsg); - }) - } - else { - return Promise.reject(`Invalid Job status: ${status}`); - } - } - function _validateJob(job_spec) { return new Promise(function(resolve, reject) { // This will throw errors if the job does not pass validation. @@ -654,7 +587,7 @@ module.exports = function(context, cluster_service) { function shutdown() { logger.info(`shutting down`); - var query = VALID_STATUS.slice(0, 7).map(str => `_status:${str}`).join(" OR "); + var query = ex_store.getRunningStatuses().map(str => `_status:${str}`).join(" OR "); return ex_search(query) .map(function(job) { logger.warn(`marking job ${job.job_id}, ex_id: ${job.ex_id} as terminated`); @@ -700,7 +633,7 @@ module.exports = function(context, cluster_service) { //pendingExecutionQueue.enqueue(job); } else { - //_setStatus(job, 'aborted'); + //ex_store.setStatus(job.ex_id, 'aborted'); } }) .then(function() { diff --git a/lib/cluster/services/messaging.js b/lib/cluster/services/messaging.js index 44f69afcb71..ee94095384e 100644 --- a/lib/cluster/services/messaging.js +++ b/lib/cluster/services/messaging.js @@ -26,12 +26,8 @@ var networkMapping = { 'cluster:node:get_port': 'cluster:node:get_port', 'slicer:recovery:failed': 'slicer:recovery:failed', 'slicer:job:finished': 'slicer:job:finished', - 'slicer:processing:error': 'slicer:processing:error', - 'slicer:initialized': 'slicer:initialized', - 'slicer:job:update': 'slicer:job:update', 'slicer:slice:new': 'slicer:slice:new', 'slicer:slice:recorded': 'slicer:slice:recorded', - 'slicer:error:terminal': 'slicer:error:terminal', 'worker:ready': 'worker:ready', 'worker:slice:complete': 'worker:slice:complete', 'job:error:terminal': 'job:error:terminal' @@ -298,6 +294,7 @@ module.exports = function messaging(context, logger, childHookFn) { //process is an event emitter, we hook through message event, but we register the functions through register if (config.clients.ipcClient) { + //TODO need better explanation of whats happening process.on('message', function(ipcMessage) { var msg = ipcMessage.message; var realMsg = processMapping[msg]; @@ -310,6 +307,7 @@ module.exports = function messaging(context, logger, childHookFn) { }); } + //TODO need better explanation of whats happening if (!config.clients.ipcClient) { processContext.on('online', function(worker) { logger.debug('worker has come online'); diff --git a/lib/cluster/slicer.js b/lib/cluster/slicer.js index 7a894d75648..e8c481c9235 100644 --- a/lib/cluster/slicer.js +++ b/lib/cluster/slicer.js @@ -74,6 +74,7 @@ module.exports = function(context) { var slicer; var analyticsData; var state_store; + var ex_store; var engineFn; var retryData; @@ -160,7 +161,8 @@ module.exports = function(context) { events.on('slicer:job:update', function(data) { logger.debug('slicer sending a job update', data.update); - messaging.send({message: 'slicer:job:update', update: data.update, ex_id: ex_id}) + //this is updating the opConfig for elasticsearch start and/or end dates for ex, this assumes elasticsearch is first + ex_store.update(ex_id, {operations: data.update}) }); messaging.register('worker:ready', 'worker_id', function(msg, worker_id) { @@ -190,7 +192,10 @@ module.exports = function(context) { events.emit('slice:failure', msg); //if an error occurred while in recovery, fail the job as a whole if (inRecoveryMode) { + //sending to remove any pending worker allocations and to stop other workers messaging.send({message: 'slicer:recovery:failed', ex_id: ex_id}); + var errorMeta = ex_store.failureMeta('there were errors processing a slice in recovery mode', slicerAnalytics); + ex_store.setStatus(ex_id, 'failed', errorMeta); } } else { @@ -252,7 +257,8 @@ module.exports = function(context) { events.once('slice:failure', function() { logger.error(`slicer: ${ex_id} has encountered a processing_error`); - messaging.send({message: 'slicer:processing:error', ex_id: ex_id}); + var errorMeta = ex_store.failureMeta(null, slicerAnalytics); + ex_store.setStatus(ex_id, 'failing', errorMeta); }); events.on('slicer:job:finished', function() { @@ -280,14 +286,24 @@ module.exports = function(context) { .then(function(errCount) { var msg = { message: 'slicer:job:finished', - ex_id: job.jobConfig.ex_id, - errorCount: errCount, - slicer_stats: slicerAnalytics + ex_id: job.jobConfig.ex_id }; - if (errCount === 0 && hasRecovered) { - msg.hasRecovered = true; + if (errCount > 0) { + var message = `job: ${ex_id} had ${data.errorCount} slice failures during processing`; + logger.error(message); + var errorMeta = ex_store.failureMeta(message, slicerAnalytics); + ex_store.setStatus(ex_id, 'failed', errorMeta); } + else { + logger.info(`job ${ex_id} has completed`); + var metaData = {_slicer_stats: slicerAnalytics}; + if (hasRecovered) { + metaData._has_errors = 'recovered'; + } + ex_store.setStatus(ex_id, 'completed', metaData) + } + messaging.send(msg); }); } @@ -299,9 +315,11 @@ module.exports = function(context) { //if slicer has restart by itself, terminate job, need to wait for registration of process message functions before we can send this message if (process.env.__process_restart) { + //TODO need to restart slicer and make all recoverable var errMsg = `Slicer for ex_id: ${ex_id} runtime error led to a restart, terminating job with failed status, please use the recover api to return slicer to a consistent state`; logger.error(errMsg); - messaging.send({message: 'slicer:error:terminal', error: errMsg, ex_id: ex_id}) + //ex_store may not be instantiated, need to rely on CM to mark job as failed + messaging.send({message: 'job:error:terminal', error: errMsg, ex_id: ex_id}) } else { Promise.resolve(require('./storage/assets')(context)) @@ -315,11 +333,12 @@ module.exports = function(context) { slicer = _job.slicer; queueLength = parseQueueLength(job.slicer, job.jobConfig); analyticsData = statContainer(_job.jobs); - return require('./storage/state')(context) + return Promise.all([require('./storage/state')(context), require('./storage/jobs')(context, 'ex')]) }) - .then(function(store) { - state_store = store; - logger.trace(`state_store for slicer has been initialized`); + .spread(function(stateStore, exStore) { + state_store = stateStore; + ex_store = exStore; + logger.trace(`state_store and job_store for slicer has been initialized`); messaging.initialize({port: job.jobConfig.slicer_port}); // We're ready for execution @@ -329,7 +348,8 @@ module.exports = function(context) { logger.error(`Slicer: failure during initialization for job ${ex_id}`); var errMsg = parseError(err); logger.error(errMsg); - messaging.send({message: 'slicer:error:terminal', error: errMsg, ex_id: ex_id}) + //ex_store may not be instantiated, need to rely on CM to mark job as failed + messaging.send({message: 'job:error:terminal', error: errMsg, ex_id: ex_id}) }); } @@ -350,11 +370,15 @@ module.exports = function(context) { setTimeout(function() { if (!workerFound) { logger.error(`A worker has not connected to a slicer for ex: ${ex_id}, shutting down job`); - messaging.send({ - message: 'job:error:terminal', - error: 'No workers have connected to slicer in allotted time', - ex_id: ex_id - }); + var errorMeta = ex_store.failureMeta('No workers have connected to slicer in allotted time', slicerAnalytics); + ex_store.setStatus(ex_id, 'failed', errorMeta) + .then(function() { + messaging.send({ + message: 'job:error:terminal', + ex_id: ex_id, + markedJob: true + }); + }); } }, context.sysconfig.teraslice.slicer_timeout); @@ -364,8 +388,16 @@ module.exports = function(context) { function terminalError(err) { var errMsg = parseError(err); logger.error(errMsg); - //signal that creation of slicer failed in job allocation - messaging.send({message: 'slicer:error:terminal', error: errMsg, ex_id: process.env.ex_id}) + var errorMeta = ex_store.failureMeta(errMsg, slicerAnalytics); + + ex_store.setStatus(ex_id, 'failed', errorMeta) + .then(function() { + messaging.send({ + message: 'job:error:terminal', + ex_id: ex_id, + markedJob: true + }); + }); } @@ -443,8 +475,8 @@ module.exports = function(context) { }; //send message that job is in running state - logger.info(`slicer: ${job.jobConfig.ex_id} has initialized`); - messaging.send({message: 'slicer:initialized', ex_id: job.jobConfig.ex_id}); + logger.info(`slicer: ${job.jobConfig.ex_id} has initialized and is running`); + ex_store.setStatus(ex_id, 'running'); //provision the retry data to the slicerQueue if they exist if (retryData.length > 0) { @@ -532,6 +564,7 @@ module.exports = function(context) { function terminalShutdown(errEV) { logger.error(`Terminal error: shutting down job ${ex_id}`); + // ex_store may not be initialized, must rely on CM messaging.send({message: 'job:error:terminal', error: errEV.err, ex_id: ex_id}) } @@ -548,20 +581,12 @@ module.exports = function(context) { }); } - //set to error if the slice set of the recovering slicer cannot complete in 10 minutes - var retryError = setTimeout(function() { - if (Object.keys(retryState).length !== 0) { - messaging.send({message: 'slicer:recovery:failed', ex_id: ex_id}); - } - }, 600000); - var retrySlicer = setInterval(function() { //only get more data if slicerQueue is empty and all work has been reported if (slicerQueue.size() === 0 && Object.keys(retryState).length === 0 && !isGettingData) { isGettingData = true; //remove all intervals/timeouts - clearTimeout(retryError); clearInterval(retrySlicer); state_store.recoveryContext(retryData.ex_id, retryData.slicer_id) @@ -642,11 +667,17 @@ module.exports = function(context) { .catch(function(err) { //retries are handled internally by slicer isProcessing = false; - var errMsg = parseError(err); - logger.error(`slicer for ex ${ex_id} had an error, shutting down job`, errMsg); - if (isOnce) { - messaging.send({message: 'job:error:terminal', error: errMsg, ex_id: ex_id}); - } + var errMsg = `slicer for ex ${ex_id} had an error, shutting down job ${parseError(err)}`; + logger.error(errMsg); + var errorMeta = ex_store.failureMeta(errMsg, slicerAnalytics); + ex_store.setStatus(ex_id, 'failed', errorMeta) + .then(function() { + messaging.send({ + message: 'job:error:terminal', + ex_id: ex_id, + markedJob: true + }); + }); }) } } diff --git a/lib/cluster/storage/jobs.js b/lib/cluster/storage/jobs.js index 2927ff14829..50c02a2b8ec 100644 --- a/lib/cluster/storage/jobs.js +++ b/lib/cluster/storage/jobs.js @@ -6,6 +6,12 @@ var uuid = require('uuid'); var Promise = require('bluebird'); var _ = require('lodash'); +var parseError = require('../../utils/error_utils').parseError; + +var RUNNING_STATUS = ['pending', 'scheduling', 'initializing', 'running', 'failing', 'paused', 'moderator_paused']; +var TERMINAL_STATUS = ['completed', 'stopped', 'rejected', 'failed', 'terminated']; + +var VALID_STATUS = RUNNING_STATUS.concat(TERMINAL_STATUS); // Module to manager job states in Elasticsearch. // All functions in this module return promises that must be resolved to @@ -62,6 +68,35 @@ module.exports = function(context, jobType) { return backend.update(record_id, update_spec); } + function failureMeta(errMsg, stats) { + var metaData = {_has_errors: 'true', _slicer_stats: stats}; + if (errMsg) { + metaData._failureReason = errMsg; + } + return metaData + } + + function setStatus(ex_id, status, metaData) { + if (isValidStatus(status)) { + var statusObj = {_status: status}; + if (metaData) { + _.assign(statusObj, metaData) + } + return update(ex_id, statusObj) + .then(function(doc) { + return ex_id; + }) + .catch(function(err) { + var errMsg = parseError(err); + logger.error('was not able to _etStatus, error:', errMsg, ex_id, statusObj); + return Promise.reject(errMsg); + }); + } + else { + return Promise.reject(`Invalid Job status: ${status}`); + } + } + function remove(record_id) { return backend.remove(record_id); } @@ -75,6 +110,18 @@ module.exports = function(context, jobType) { return state_store.search(query, from, size, sort) } + function getTerminalStatuses() { + return TERMINAL_STATUS.slice(); + } + + function getRunningStatuses() { + return RUNNING_STATUS.slice(); + } + + function isValidStatus(status) { + return _.find(VALID_STATUS, valid => valid === status) !== undefined + } + var api = { get: get, search: search, @@ -84,7 +131,12 @@ module.exports = function(context, jobType) { update: update, remove: remove, shutdown: shutdown, - getJobStateRecords: getJobStateRecords + getJobStateRecords: getJobStateRecords, + getTerminalStatuses: getTerminalStatuses, + getRunningStatuses: getRunningStatuses, + isValidStatus: isValidStatus, + setStatus: setStatus, + failureMeta: failureMeta }; var jobArgs = [context, jobs_index]; diff --git a/lib/readers/elasticsearch_date_range/slicer.js b/lib/readers/elasticsearch_date_range/slicer.js index 3403058bec9..b0091810f84 100644 --- a/lib/readers/elasticsearch_date_range/slicer.js +++ b/lib/readers/elasticsearch_date_range/slicer.js @@ -158,8 +158,9 @@ function newSlicer(context, opConfig, job, retryData, logger, client) { var operations = JSON.parse(process.env.job).operations; operations.shift(); var updatedOpConfig = Object.assign({}, opConfig, update); + updatedOpConfig.start = updatedOpConfig.start.format(dateFormat); + updatedOpConfig.end = updatedOpConfig.end.format(dateFormat); operations.unshift(updatedOpConfig); - events.emit('slicer:job:update', {update: operations}) } }