From 3ebad687b075c2a65ce952821522a0f05deeaadb Mon Sep 17 00:00:00 2001 From: Jared Noble Date: Tue, 6 Feb 2018 10:36:44 -0700 Subject: [PATCH 1/3] fixed slicer analytics message parsing resolves #657 --- lib/cluster/worker.js | 360 --------------------------------- lib/cluster/worker/executor.js | 1 + lib/cluster/worker/index.js | 1 + 3 files changed, 2 insertions(+), 360 deletions(-) delete mode 100644 lib/cluster/worker.js create mode 100644 lib/cluster/worker/executor.js create mode 100644 lib/cluster/worker/index.js diff --git a/lib/cluster/worker.js b/lib/cluster/worker.js deleted file mode 100644 index 0cd0d5d8533..00000000000 --- a/lib/cluster/worker.js +++ /dev/null @@ -1,360 +0,0 @@ -'use strict'; - -const Promise = require('bluebird'); -const _ = require('lodash'); -const parseError = require('error_parser'); -const messagingFn = require('./services/messaging'); - - -module.exports = function module(context) { - const events = context.apis.foundation.getSystemEvents(); - const cluster = context.cluster; - const config = context.sysconfig.teraslice; - const exId = process.env.ex_id; - const jobId = process.env.job_id; - const ID = `${context.sysconfig.teraslice.hostname}__${cluster.worker.id}`; - const executionRunner = require('./runners/execution')(context); - const recycleWorkerRandomFactor = _.random(75, 100); - - let isDone = true; - let isShuttingDown = false; - let analyticsStore; - let stateStore; - let queue; - let maxRetries; - let executionContext; - let recycle; - - // this will be used to keep track of the previously sent message just in case of a disconnect - let sentMessage = false; - - // this will store errors and the number of retries - const errorLog = {}; - - const logger = context.apis.foundation.makeLogger({ - ex_id: exId, - job_id: jobId, - module: 'worker', - worker_id: ID - }); - // need ipc channels open before job construction - const messaging = messagingFn(context, logger); - - // if worker cannot make client, job needs to shutdown, needs to be setup before executionRunner - events.on('client:initialization:error', terminalShutdown); - - const host = messaging.getHostUrl(); - - logger.info(`worker ${ID} is online, communicating with host: ${host}`); - - messaging.register({ - event: 'assets:loaded', - callback: ipcMessage => events.emit('execution:assets_loaded', ipcMessage) - }); - - // set up listener - messaging.register({ - event: 'slicer:slice:new', - callback: (msg) => { - isDone = false; - // getting a slice means the previous message was handled - sentMessage = false; - logger.info(`received slice: ${msg.payload.slice_id}`); - const sliceLogger = context.apis.foundation.makeLogger({ - ex_id: exId, - job_id: jobId, - module: 'slice', - worker_id: ID, - slice_id: msg.payload.slice_id - }); - - runSlice(msg.payload, sliceLogger); - } - }); - - messaging.register({ - event: 'slicer:slice:recorded', - callback: () => { - // the sent message has been processed, so set it to false; - logger.debug('slice has been marked as completed by slicer'); - sentMessage = false; - } - }); - - messaging.register({ - event: 'network:error', - callback: (err) => { - events.emit('network:error'); - logger.error('Error in worker socket: ', err); - } - }); - - messaging.register({ - event: 'network:disconnect', - callback: (event) => { - events.emit('network:disconnect'); - if (!isShuttingDown) { - logger.error(`worker ${ID} has disconnected from slicer ex_id ${exId}`, event); - } - } - }); - - messaging.register({ - event: 'network:connect', - callback: () => { - if (sentMessage) { - logger.warn('reconnecting to slicer, previous slice: ', sentMessage); - sentMessage.retry = true; - messaging.send({ to: 'execution_controller', message: 'worker:slice:complete', worker_id: ID, payload: sentMessage }); - } else { - logger.debug(`a worker has made the initial connection to slicer ex_id: ${exId}`); - messaging.send({ to: 'execution_controller', message: 'worker:ready', worker_id: ID, payload: { worker_id: ID } }); - } - } - }); - - const finalShutdown = _.once(shutdown); - - messaging.register({ event: 'worker:shutdown', callback: finalShutdown }); - - // to catch signal propagation, but cleanup through msg sent from master - function noOP() { - } - - messaging.register({ event: 'process:SIGTERM', callback: noOP }); - messaging.register({ event: 'process:SIGINT', callback: noOP }); - - Promise.resolve(require('./storage/assets')(context)) - .then(() => - // assets store is loaded so it can register under context.apis - executionRunner.initialize(events, logger) - ) - .then((_executionContext) => { - executionContext = _executionContext; - queue = _executionContext.queue; - maxRetries = _executionContext.config.max_retries; - recycle = recycleFn(); - return require('./storage/state')(context); - }) - .then((store) => { - stateStore = store; - return require('./storage/analytics')(context); - }) - .then((store) => { - analyticsStore = store; - logger.info(`connecting to host: ${host}`); - messaging.initialize(); - }) - .catch((err) => { - const errMsg = `worker: ${ID} could not instantiate for execution: ${exId}, error: ${parseError(err)}`; - logger.error(errMsg); - messaging.send({ - to: 'cluster_master', - message: 'execution:error:terminal', - ex_id: exId, - error: errMsg - }) - .then(() => logger.flush()) - .then(() => process.exit()) - .catch((flushErr) => { - const flushErrMsg = parseError(flushErr); - logger.error(flushErrMsg); - process.exit(); - }); - }); - - function isReady(res, slice, msg, specData, sliceLogger) { - // res may return null if no data was received - return stateStore.updateState(slice, 'completed', slice.index) - .then(() => { - events.emit('slice:success', slice); - sentMessage = { worker_id: ID, slice, analytics: specData }; - sliceLogger.info('completed slice: ', slice); - if (isShuttingDown) { - sentMessage.isShuttingDown = true; - } - - if (specData) { - if (executionContext.reporter) { - executionContext.reporter(context, executionContext.config, specData); - logMessage(sliceLogger, msg); - } else { - logMessage(sliceLogger, msg, specData); - return analyticsStore.log(executionContext, slice, specData) - .catch((err) => { - const errMsg = parseError(err); - logger.error(`Failure when storing analytics: ${errMsg}`); - }); - } - } - return true; - }) - .finally(() => { - events.emit('slice:finalize'); - isDone = true; - recycle(); - }); - } - - function runSlice(slice, sliceLogger) { - function sliceFailed(err) { - const errMsg = parseError(err); - sentMessage = { worker_id: ID, slice, error: errMsg }; - events.emit('slice:failure', sentMessage); - return stateStore.updateState(slice, 'error', errMsg) - .then(() => sliceLogger.error(`failed to process ${JSON.stringify(sentMessage)}, slice state has been marked as error`)) - .catch((updateError) => { - const updateErrorMsg = parseError(updateError); - sliceLogger.error(`An error has occurred while marking slice as failed: ${updateErrorMsg} , message: `, slice); - }); - } - - const msg = slice.request; - const sliceID = slice.slice_id; - - let finalQueue = queue; - let specData; - - if (executionContext.config.analytics) { - specData = { time: [], size: [], memory: [] }; - - finalQueue = queue.map(fn => fn.bind(null, specData)); - } - - Promise.reduce(finalQueue, (prev, fn) => Promise.resolve(prev) - .then(data => fn(data, sliceLogger, msg) - ), msg) - .then(res => isReady(res, slice, msg, specData, sliceLogger)) - .catch((err) => { - const errMsg = parseError(err); - sliceLogger.error(`An error has occurred: ${errMsg}, message: `, slice); - if (maxRetries) { - // checking if error has occurred before - if (errorLog[sliceID]) { - errorLog[sliceID] += 1; - - if (errorLog[sliceID] >= maxRetries) { - sliceLogger.error('Max retires has been reached for: ', slice); - - sliceFailed(err) - .finally(() => { - events.emit('slice:finalize'); - isDone = true; - recycle(); - }); - } else { - events.emit('slice:retry'); - runSlice(slice, sliceLogger); - } - } else { - events.emit('slice:retry'); - errorLog[sliceID] = 1; - runSlice(slice, sliceLogger); - } - } else { - // no retries, proceed to next slice - sliceFailed(err) - .finally(() => { - events.emit('slice:finalize'); - isDone = true; - recycle(); - }); - } - }); - } - - - function shutdown() { - let counter = config.shutdown_timeout; - isShuttingDown = true; - events.emit('worker:shutdown'); - const shutDownInterval = setInterval(() => { - logger.trace(`is done with current slice: ${isDone}, shutdown counter: ${counter}`); - if (isDone || counter <= 0) { - logger.info('shutting down'); - clearInterval(shutDownInterval); - const shutdownSequence = []; - if (analyticsStore !== undefined) { - shutdownSequence.push(analyticsStore.shutdown); - } - if (stateStore !== undefined) { - shutdownSequence.push(stateStore.shutdown); - } - - shutdownSequence.push(logger.flush); - - Promise.reduce(shutdownSequence, (prev, curr) => Promise.resolve(prev) - .then(() => curr()) - .catch((err) => { - const errMsg = parseError(err); - logger.error(`Error shutting down store ${prev.name}`, errMsg); - return curr(); - })) - .then(() => { - process.exit(); - }) - .catch((err) => { - const errMsg = parseError(err); - logger.error(errMsg); - // TODO: the log might never be called to stdout with process.exit here - process.exit(); - }); - } else { - if (counter % 6000 === 0) { - logger.info(`shutdown sequence initiated, but is still processing. Will force shutdown in ${counter / 1000} seconds`); - } - - counter -= 1000; - } - }, 1000); - } - - function terminalShutdown(errEV) { - logger.error(`Terminal error, shutting down execution ${exId}`); - events.emit('worker:shutdown'); - messaging.respond({ message: 'execution:error:terminal', error: errEV.error, ex_id: exId }); - } - - function recycleFn() { - let sliceCount = 0; - const recycleWorker = executionContext.config.recycle_worker; - return function () { - if (recycleWorker) { - sliceCount += 1; - // recycle worker between 75% and 100% of executionContext.config.recycle_worker - const recycleCount = Math.trunc((recycleWorkerRandomFactor / 100) * recycleWorker); - if (sliceCount >= recycleCount && !isShuttingDown) { - logger.info(`worker: ${ID} recycled after processing ${sliceCount} slices`); - // still need to signal that slice completed and not be enqueued - sentMessage.isShuttingDown = true; - messaging.send({ to: 'execution_controller', message: 'worker:slice:complete', worker_id: ID, payload: sentMessage }); - setTimeout(() => { - process.exit(42); - }, 100); - } else { - messaging.send({ to: 'execution_controller', message: 'worker:slice:complete', worker_id: ID, payload: sentMessage }); - } - } else { - messaging.send({ to: 'execution_controller', message: 'worker:slice:complete', worker_id: ID, payload: sentMessage }); - } - }; - } - - function logMessage(sliceLogger, msg, specData) { - const str = 'analytics for slice '; - let dataStr = ''; - - if (typeof msg === 'string') { - dataStr = `${msg}, `; - } else { - _.forOwn(msg, (value, key) => { - dataStr += `${key} : ${value} `; - }); - } - _.forOwn(specData, (value, key) => { - dataStr += `${key} : ${value} `; - }); - - sliceLogger.info(str + dataStr); - } -}; diff --git a/lib/cluster/worker/executor.js b/lib/cluster/worker/executor.js new file mode 100644 index 00000000000..a726efc43fc --- /dev/null +++ b/lib/cluster/worker/executor.js @@ -0,0 +1 @@ +'use strict'; \ No newline at end of file diff --git a/lib/cluster/worker/index.js b/lib/cluster/worker/index.js new file mode 100644 index 00000000000..a726efc43fc --- /dev/null +++ b/lib/cluster/worker/index.js @@ -0,0 +1 @@ +'use strict'; \ No newline at end of file From 3421faadac3c75d3af5827c4fef30d307178622a Mon Sep 17 00:00:00 2001 From: Jared Noble Date: Wed, 7 Feb 2018 09:44:48 -0700 Subject: [PATCH 2/3] second pass --- integration-tests/spec/wait.js | 67 ++++---- lib/cluster/worker/executor.js | 283 ++++++++++++++++++++++++++++++++- lib/cluster/worker/index.js | 71 ++++++++- lib/processors/noop.js | 13 +- spec/worker-spec.js | 73 +++++++++ 5 files changed, 457 insertions(+), 50 deletions(-) create mode 100644 spec/worker-spec.js diff --git a/integration-tests/spec/wait.js b/integration-tests/spec/wait.js index ba6c5962a38..e1044a0bb5a 100644 --- a/integration-tests/spec/wait.js +++ b/integration-tests/spec/wait.js @@ -1,8 +1,8 @@ 'use strict'; -var _ = require('lodash'); -var Promise = require('bluebird'); -var misc = require('./misc')(); +const _ = require('lodash'); +const Promise = require('bluebird'); +const misc = require('./misc')(); module.exports = function wait() { /* @@ -11,12 +11,8 @@ module.exports = function wait() { */ function forLength(func, value, iterations) { return forValue( - function() { - return func() - .then(function(result) { - return result.length - }) - }, + () => func() + .then(result => result.length), value, iterations); } @@ -26,44 +22,37 @@ module.exports = function wait() { * time for the value to match before the returned promise will * reject. */ - function forValue(func, value, iterations) { - if (! iterations) iterations = 100; - var counter = 0; + function forValue(func, value, _iterations) { + const iterations = _iterations || 100; + let counter = 0; - return new Promise(function(resolve, reject) { + return new Promise(((resolve, reject) => { function checkValue() { func() - .then(function(result) { - counter++; - + .then((result) => { + counter += 1; if (result === value) { return resolve(result); } - if (counter > iterations) { reject(`forValue didn't find target value after ${iterations} iterations.`); - } - else { + } else { setTimeout(checkValue, 500); } - }) + }); } checkValue(); - }); + })); } /* * Wait for 'node_count' nodes to be available. */ - function forNodes(node_count) { - return forLength(function() { - return misc.teraslice().cluster - .state() - .then(function(state) { - return _.keys(state) - }); - }, node_count); + function forNodes(nodeCount) { + return forLength(() => misc.teraslice().cluster + .state() + .then(state => _.keys(state)), nodeCount); } /* @@ -74,17 +63,15 @@ module.exports = function wait() { * 'joined' */ function forWorkersJoined(jobId, workerCount, iterations) { - return forValue(() => { - return misc.teraslice().cluster - .slicers() - .then((slicers) => { - const slicer = _.find(slicers, s => s.job_id === jobId); - if (slicer !== undefined) { - return slicer.workers_joined; - } - return 0; - }); - }, workerCount, iterations) + return forValue(() => misc.teraslice().cluster + .slicers() + .then((slicers) => { + const slicer = _.find(slicers, s => s.job_id === jobId); + if (slicer !== undefined) { + return slicer.workers_joined; + } + return 0; + }), workerCount, iterations) .catch((e) => { throw (new Error(`(forWorkersJoined) ${e}`)); }); diff --git a/lib/cluster/worker/executor.js b/lib/cluster/worker/executor.js index a726efc43fc..6d62a34691b 100644 --- a/lib/cluster/worker/executor.js +++ b/lib/cluster/worker/executor.js @@ -1 +1,282 @@ -'use strict'; \ No newline at end of file +'use strict'; + +const _ = require('lodash'); +const parseError = require('error_parser'); +const Promise = require('bluebird'); + +module.exports = function module(context, messaging, executionContext, stateStore, analyticsStore) { + const events = context.apis.foundation.getSystemEvents(); + const config = context.sysconfig.teraslice; + const cluster = context.cluster; + const exId = process.env.ex_id; + const jobId = process.env.job_id; + const ID = `${context.sysconfig.teraslice.hostname}__${cluster.worker.id}`; + const queue = executionContext.queue; + + const recycle = _recycleFn(); + const host = messaging.getHostUrl(); + const finalShutdown = _.once(shutdown); + + const logger = context.apis.foundation.makeLogger({ + ex_id: exId, + job_id: jobId, + module: 'worker_executor', + worker_id: ID + }); + + let isShuttingDown = false; + let isDone = true; + + // this will be used to keep track of the previously sent message just in case of a disconnect + let sentMessage = false; + + messaging.register({ event: 'worker:shutdown', callback: finalShutdown }); + + messaging.register({ + event: 'assets:loaded', + callback: ipcMessage => events.emit('execution:assets_loaded', ipcMessage) + }); + + messaging.register({ + event: 'slicer:slice:new', + callback: (msg) => { + isDone = false; + // getting a slice means the previous message was handled + sentMessage = false; + logger.info(`received slice: ${msg.payload.slice_id}`); + const sliceLogger = context.apis.foundation.makeLogger({ + ex_id: exId, + job_id: jobId, + module: 'slice', + worker_id: ID, + slice_id: msg.payload.slice_id + }); + + _executeSlice(msg.payload, sliceLogger); + } + }); + + messaging.register({ + event: 'slicer:slice:recorded', + callback: () => { + // the sent message has been processed, so set it to false; + logger.debug('slice has been marked as completed by slicer'); + sentMessage = false; + } + }); + + messaging.register({ + event: 'network:error', + callback: (err) => { + events.emit('network:error'); + logger.error('Error in worker socket: ', err); + } + }); + + messaging.register({ + event: 'network:disconnect', + callback: (event) => { + events.emit('network:disconnect'); + if (!isShuttingDown) { + logger.error(`worker ${ID} has disconnected from slicer ex_id ${exId}`, event); + } + } + }); + + messaging.register({ + event: 'network:connect', + callback: () => { + if (sentMessage) { + logger.warn('reconnecting to slicer, previous slice: ', sentMessage); + sentMessage.retry = true; + messaging.send({ to: 'execution_controller', message: 'worker:slice:complete', worker_id: ID, payload: sentMessage }); + } else { + logger.debug(`a worker has made the initial connection to slicer ex_id: ${exId}`); + messaging.send({ to: 'execution_controller', message: 'worker:ready', worker_id: ID, payload: { worker_id: ID } }); + } + } + }); + // TODO rename initialize + messaging.initialize(); + logger.info(`worker ${ID} is online, communicating with host: ${host}`); + + function _sliceCompleted(sliceResults, slice, msg, specData, sliceLogger) { + // resultResponse may need to be checked in later functionality + return stateStore.updateState(slice, 'completed', slice.index) + .then(() => { + events.emit('slice:success', slice); + sentMessage = { worker_id: ID, slice, analytics: specData }; + sliceLogger.info('completed slice: ', slice); + if (isShuttingDown) { + sentMessage.isShuttingDown = true; + } + + if (specData) { + if (executionContext.reporter) { + executionContext.reporter(context, executionContext.config, specData); + _logMessage(sliceLogger, msg); + } else { + _logMessage(sliceLogger, msg, specData); + return analyticsStore.log(executionContext, slice, specData) + .catch((err) => { + const errMsg = parseError(err); + logger.error(`Failure when storing analytics: ${errMsg}`); + }); + } + } + return true; + }) + .finally(() => { + events.emit('slice:finalize'); + isDone = true; + recycle(); + }); + } + + function _retrySlice(slice, sliceLogger) { + const maxRetries = executionContext.config.max_retries; + let retryCount = 0; + + return (err) => { + const errMsg = parseError(err); + sliceLogger.error(`An error has occurred: ${errMsg}, message: `, slice); + if (maxRetries) { + retryCount += 1; + if (retryCount >= maxRetries) { + sliceLogger.error('Max retires has been reached for: ', slice); + sliceFailed(err, slice, sliceLogger) + .finally(() => { + events.emit('slice:finalize', slice); + isDone = true; + recycle(); + }); + } else { + events.emit('slice:retry', slice); + _executeSlice(slice, sliceLogger); + } + } else { + // no retries, proceed to next slice + sliceFailed(err, slice, sliceLogger) + .finally(() => { + events.emit('slice:finalize', slice); + isDone = true; + recycle(); + }); + } + }; + } + + function sliceFailed(err, slice, sliceLogger) { + const errMsg = parseError(err); + sentMessage = { worker_id: ID, slice, error: errMsg }; + events.emit('slice:failure', slice); + return stateStore.updateState(slice, 'error', errMsg) + .then(() => sliceLogger.error(`failed to process ${JSON.stringify(sentMessage)}, slice state has been marked as error`)) + .catch((updateError) => { + const updateErrorMsg = parseError(updateError); + sliceLogger.error(`An error has occurred while marking slice as failed: ${updateErrorMsg} , message: `, slice); + }); + } + + + function _executeSlice(slice, sliceLogger) { + const msg = slice.request; + let finalQueue = queue; + let specData; + + if (executionContext.config.analytics) { + specData = { time: [], size: [], memory: [] }; + + finalQueue = queue.map(fn => fn.bind(null, specData)); + } + + Promise.reduce(finalQueue, (prev, fn) => Promise.resolve(prev) + .then(data => fn(data, sliceLogger, msg) + ), msg) + .then(sliceResults => _sliceCompleted(sliceResults, slice, msg, specData, sliceLogger)) + .catch(_retrySlice) + } + + // TODO consider rename + function _logMessage(sliceLogger, msg, specData) { + const str = 'analytics for slice '; + let dataStr = ''; + + if (typeof msg === 'string') { + dataStr = `${msg}, `; + } else { + _.forOwn(msg, (value, key) => { + dataStr += `${key} : ${value} `; + }); + } + _.forOwn(specData, (value, key) => { + dataStr += `${key} : ${value} `; + }); + + sliceLogger.info(str + dataStr); + } + + function shutdown() { + let counter = config.shutdown_timeout; + isShuttingDown = true; + events.emit('worker:shutdown'); + const shutDownInterval = setInterval(() => { + logger.trace(`is done with current slice: ${isDone}, shutdown counter: ${counter}`); + if (isDone || counter <= 0) { + logger.info('shutting down'); + clearInterval(shutDownInterval); + const shutdownSequence = []; + shutdownSequence.push(analyticsStore.shutdown); + shutdownSequence.push(stateStore.shutdown); + shutdownSequence.push(logger.flush); + + Promise.reduce(shutdownSequence, (prev, curr) => Promise.resolve(prev) + .then(() => curr()) + .catch((err) => { + const errMsg = parseError(err); + logger.error(`Error shutting down store ${prev.name}`, errMsg); + return curr(); + })) + .then(() => { + process.exit(); + }) + .catch((err) => { + const errMsg = parseError(err); + logger.error(errMsg); + // TODO: the log might never be called to stdout with process.exit here + process.exit(); + }); + } else { + if (counter % 6000 === 0) { + logger.info(`shutdown sequence initiated, but is still processing. Will force shutdown in ${counter / 1000} seconds`); + } + + counter -= 1000; + } + }, 1000); + } + // TODO seperate messaging sending from recycle + function _recycleFn() { + const recycleWorker = executionContext.config.recycle_worker; + const recycleWorkerRandomFactor = _.random(75, 100); + let sliceCount = 0; + + return function () { + if (recycleWorker) { + sliceCount += 1; + // recycle worker between 75% and 100% of executionContext.config.recycle_worker + const recycleCount = Math.trunc((recycleWorkerRandomFactor / 100) * recycleWorker); + if (sliceCount >= recycleCount && !isShuttingDown) { + logger.info(`worker: ${ID} recycled after processing ${sliceCount} slices`); + // still need to signal that slice completed and not be enqueued + sentMessage.isShuttingDown = true; + setTimeout(() => { + process.exit(42); + }, 100); + } + + messaging.send({ to: 'execution_controller', message: 'worker:slice:complete', worker_id: ID, payload: sentMessage }); + + }; + } +}; diff --git a/lib/cluster/worker/index.js b/lib/cluster/worker/index.js index a726efc43fc..423af75a0a7 100644 --- a/lib/cluster/worker/index.js +++ b/lib/cluster/worker/index.js @@ -1 +1,70 @@ -'use strict'; \ No newline at end of file +'use strict'; + +const Promise = require('bluebird'); +const parseError = require('error_parser'); +const messagingFn = require('../services/messaging'); + +module.exports = function module(context) { + const events = context.apis.foundation.getSystemEvents(); + const cluster = context.cluster; + const exId = process.env.ex_id; + const jobId = process.env.job_id; + const ID = `${context.sysconfig.teraslice.hostname}__${cluster.worker.id}`; + const executionRunner = require('../runners/execution')(context); + + const logger = context.apis.foundation.makeLogger({ + ex_id: exId, + job_id: jobId, + module: 'worker_process', + worker_id: ID + }); + // need ipc channels open before job construction + const messaging = messagingFn(context, logger); + + // if worker cannot make client, job needs to shutdown, needs to be setup before executionRunner + events.on('client:initialization:error', terminalShutdown); + + messaging.register({ event: 'process:SIGTERM', callback: () => {} }); + messaging.register({ event: 'process:SIGINT', callback: () => {} }); + + + function initializeWorker() { + // assets store is loaded so it can register under context.apis + return Promise.resolve(require('../storage/assets')(context)) + .then(() => Promise.all([ + executionRunner.initialize(events, logger), + require('../storage/state')(context), + require('../storage/analytics')(context) + ])) + .spread((executionContext, stateStore, analyticsStore) => require('./executor')(context, messaging, executionContext, stateStore, analyticsStore)) + .catch((err) => { + const errMsg = `worker: ${ID} could not instantiate for execution: ${exId}, error: ${parseError(err)}`; + logger.error(errMsg); + + // TODO rename message sent to be more semantically correct + messaging.send({ + to: 'cluster_master', + message: 'execution:error:terminal', + ex_id: exId, + error: errMsg + }) + .then(() => logger.flush()) + .then(() => process.exit()) + .catch((flushErr) => { + const flushErrMsg = parseError(flushErr); + logger.error(flushErrMsg); + process.exit(); + }); + }); + } + + // TODO stick store shutdown logic out here + + function terminalShutdown(errEV) { + logger.error(`Terminal error, shutting down execution ${exId}`); + events.emit('worker:shutdown'); + messaging.respond({ message: 'execution:error:terminal', error: errEV.error, ex_id: exId }); + } + + initializeWorker(); +}; diff --git a/lib/processors/noop.js b/lib/processors/noop.js index cf0ab2e8177..62678c1b5af 100644 --- a/lib/processors/noop.js +++ b/lib/processors/noop.js @@ -2,18 +2,15 @@ function newProcessor(context, opConfig, executionConfig) { - return function(data) { - return data; - }; + return data => data; } function schema() { - return { - }; -}; + return {}; +} module.exports = { - newProcessor: newProcessor, - schema: schema + newProcessor, + schema }; diff --git a/spec/worker-spec.js b/spec/worker-spec.js new file mode 100644 index 00000000000..81d0c495478 --- /dev/null +++ b/spec/worker-spec.js @@ -0,0 +1,73 @@ +'use strict'; + +const events = require('events'); +const eventEmitter = new events.EventEmitter(); + +fdescribe('Worker', () => { + const workerExecutorModule = require('../lib/cluster/worker/executor'); + let sentMsg; + let loggerErrMsg; + let debugMsg; + let logInfo; + let warnMsg; + + const logger = { + error(err) { + loggerErrMsg = err; + }, + info(info) { + logInfo = info; + }, + warn(msg) { + warnMsg = msg; + }, + trace() {}, + debug(msg) { + debugMsg = msg; + }, + flush() {} + }; + + const context = { + sysconfig: { + teraslice: { + hostname: 'testHostName' + } + }, + apis: { + foundation: { + makeLogger: () => logger, + getSystemEvents: () => eventEmitter + } + }, + cluster: { + worker: { + id: 'someID' + } + } + }; + const messaging = { + register: () => {}, + getHostUrl: () => 'someURL', + send: _sentMsg => sentMsg = _sentMsg, + initialize: () => {} + }; + const executionContext = { + queue: [], + config: { + max_retries: 3, + analytics: true, + recycle_worker: false + } + }; + const stateStore = {}; + const analyticsStore = {}; + + function getWorker() { + return workerExecutorModule(context, messaging, executionContext, stateStore, analyticsStore); + } + + it('can load', () => { + expect(() => getWorker()).not.toThrowError(); + }); +}); From 41382e9fa2227be932f6408b8fc866f1717ba435 Mon Sep 17 00:00:00 2001 From: Jared Noble Date: Mon, 12 Feb 2018 09:35:33 -0700 Subject: [PATCH 3/3] final worker changes --- lib/cluster/cluster_master.js | 2 +- lib/cluster/execution_controller/engine.js | 33 +- lib/cluster/execution_controller/index.js | 33 +- lib/cluster/node_master.js | 2 +- lib/cluster/services/messaging.js | 4 +- lib/cluster/worker/executor.js | 254 +++++++------ lib/cluster/worker/index.js | 32 +- spec/execution_controller/engine-spec.js | 13 +- spec/worker-spec.js | 406 ++++++++++++++++++++- 9 files changed, 598 insertions(+), 181 deletions(-) diff --git a/lib/cluster/cluster_master.js b/lib/cluster/cluster_master.js index fbda12224e8..4034e9a536c 100644 --- a/lib/cluster/cluster_master.js +++ b/lib/cluster/cluster_master.js @@ -86,7 +86,7 @@ module.exports = function (context) { } require('./services/execution.js')(context) .then((clusterService) => { - messaging.initialize({ server }); + messaging.listen({ server }); logger.trace('cluster_service has instantiated'); context.services.execution = clusterService; return require('./services/jobs')(context); diff --git a/lib/cluster/execution_controller/engine.js b/lib/cluster/execution_controller/engine.js index 76daa93723e..42afed8f29c 100644 --- a/lib/cluster/execution_controller/engine.js +++ b/lib/cluster/execution_controller/engine.js @@ -121,10 +121,6 @@ module.exports = function module(context, messaging, exStore, stateStore, execut } }); - // events can be fired from anything that instantiates a client, such as stores and slicers - // needs to be setup before executionRunner - messaging.register({ event: 'worker:shutdown', callback: _executionShutdown }); - messaging.register({ event: 'assets:loaded', callback: ipcMessage => events.emit('execution:assets_loaded', ipcMessage) @@ -278,13 +274,12 @@ module.exports = function module(context, messaging, exStore, stateStore, execut // send message that execution is in running state logger.info(`execution: ${exId} has initialized and is listening on port ${executionConfig.slicer_port}`); exStore.setStatus(exId, 'running'); - if (!executionAnalytics) executionAnalytics = require('./execution_analytics')(context, messaging); if (!slicerAnalytics) slicerAnalytics = require('./slice_analytics')(context, executionContext); _setQueueLength(executionContext); if (!recovery) recovery = require('./recovery')(context, messaging, executionAnalytics, exStore, stateStore, executionContext); - messaging.initialize({ port: executionContext.config.slicer_port }); + messaging.listen({ port: executionContext.config.slicer_port }); // start the engine if (engineCanRun) { @@ -385,11 +380,6 @@ module.exports = function module(context, messaging, exStore, stateStore, execut engine = setInterval(engineFn, 1); } - function _shutdown() { - engineCanRun = false; - clearInterval(engine); - } - function _adjustSlicerQueueLength() { if (dynamicQueueLength && messaging.getClientCounts() > queueLength) { queueLength = messaging.getClientCounts(); @@ -430,7 +420,7 @@ module.exports = function module(context, messaging, exStore, stateStore, execut slicerAnalytics.analyzeStats(); } - logger.info(`execution ${executionConfig.name} has finished in ${time} seconds`); + logger.info(`execution ${exId} has finished in ${time} seconds`); } function _allSlicesProcessed() { @@ -489,25 +479,19 @@ module.exports = function module(context, messaging, exStore, stateStore, execut _watchDog(checkFn, timeout, errMsg, logMsg); } - function _executionShutdown() { + function shutdown() { logger.info(`slicer for execution: ${exId} has received a shutdown notice`); isShuttingDown = true; + engineCanRun = false; + clearInterval(engine); events.emit('execution:stop'); - function stateStoreShutdown() { - if (stateStore) { - return stateStore.shutdown(); - } - return true; - } - - Promise.all([_shutdown(), executionAnalytics.shutdown(), stateStoreShutdown()]) + return Promise.resolve() + .then(executionAnalytics.shutdown) .then(() => logger.flush()) - .then(() => events.emit('execution:shutdown')) .catch((err) => { const errMsg = parseError(err); logger.error(errMsg); - events.emit('execution:shutdown'); }); } @@ -573,8 +557,6 @@ module.exports = function module(context, messaging, exStore, stateStore, execut _adjustSlicerQueueLength, _pause, _resume, - _shutdown, - _executionShutdown, _engineSetup, _executionRecovery, _terminalError, @@ -587,6 +569,7 @@ module.exports = function module(context, messaging, exStore, stateStore, execut return { initialize, + shutdown, __test_context: testContext }; }; diff --git a/lib/cluster/execution_controller/index.js b/lib/cluster/execution_controller/index.js index 71a4f971d09..d12617fbfab 100644 --- a/lib/cluster/execution_controller/index.js +++ b/lib/cluster/execution_controller/index.js @@ -13,10 +13,15 @@ module.exports = function module(contextConfig) { const events = context.apis.foundation.getSystemEvents(); const logger = context.apis.foundation.makeLogger({ module: 'execution_controller', ex_id: exId, job_id: jobId }); const messaging = messageModule(context, logger); + let exStore; + let stateStore; + let engine; // to catch signal propagation, but cleanup through msg sent from master messaging.register({ event: 'process:SIGTERM', callback: () => {} }); messaging.register({ event: 'process:SIGINT', callback: () => {} }); + messaging.register({ event: 'worker:shutdown', callback: executionShutdown }); + events.on('client:initialization:error', terminalShutdown); // emitted after final cleanup of execution is complete @@ -28,7 +33,10 @@ module.exports = function module(contextConfig) { function initializeExecutionController() { Promise.resolve(executionInit()) .catch(terminalShutdown) - .then(engine => engine.initialize()) + .then((_engine) => { + engine = _engine; + engine.initialize(); + }) .catch(terminalShutdown); } @@ -41,15 +49,18 @@ module.exports = function module(contextConfig) { // assets store is loaded first so it can register apis before executionRunner is called return Promise.resolve(require('../storage/assets')(context)) .then(() => Promise.all([executionRunner.initialize(events, logger), require('../storage/state')(context), require('../storage/execution')(context)])) - .spread((executionContext, stateStore, exStore) => { + .spread((executionContext, _stateStore, _exStore) => { logger.trace('stateStore and jobStore for slicer has been initialized'); + stateStore = _stateStore; + exStore = _exStore; return require('./engine')(context, messaging, exStore, stateStore, executionContext); }); } function terminalShutdown(errObj) { - logger.error(`Terminal error: shutting down execution ${exId}`); const errMsg = errObj.error || parseError(errObj); + logger.error(`Terminal error: shutting down execution ${exId}, error: ${errMsg}`); + // exStore may not be initialized, must rely on CM messaging.send({ to: 'cluster_master', @@ -58,4 +69,20 @@ module.exports = function module(contextConfig) { ex_id: exId }); } + + function executionShutdown() { + const shutdownSequence = []; + if (stateStore) shutdownSequence.push(stateStore.shutdown()); + if (exStore) shutdownSequence.push(exStore.shutdown()); + if (engine) shutdownSequence.push(engine.shutdown()); + + Promise.all(shutdownSequence) + .then(logger.flush) + .catch((err) => { + const errMsg = parseError(err); + logger.error(`Error while attempting to shutdown execution_controller ex_id: ${exId}, error: ${errMsg}`); + return logger.flush(); + }) + .finally(process.exit); + } }; diff --git a/lib/cluster/node_master.js b/lib/cluster/node_master.js index dd9a7911bbc..12b7f550c9e 100644 --- a/lib/cluster/node_master.js +++ b/lib/cluster/node_master.js @@ -355,7 +355,7 @@ module.exports = function module(context) { return state; } - messaging.initialize(); + messaging.listen(); if (context.sysconfig.teraslice.master) { const assetsPort = systemPorts.getPort(true); diff --git a/lib/cluster/services/messaging.js b/lib/cluster/services/messaging.js index 809ae30c0d4..6c57134e77f 100644 --- a/lib/cluster/services/messaging.js +++ b/lib/cluster/services/messaging.js @@ -283,7 +283,7 @@ module.exports = function messaging(context, logger, childHookFn) { return destinationType; } - function initialize(obj) { + function listen(obj) { let port; let server; @@ -502,7 +502,7 @@ module.exports = function messaging(context, logger, childHookFn) { return { register, - initialize, + listen, getHostUrl, getClientCounts, send, diff --git a/lib/cluster/worker/executor.js b/lib/cluster/worker/executor.js index 6d62a34691b..60401e92864 100644 --- a/lib/cluster/worker/executor.js +++ b/lib/cluster/worker/executor.js @@ -8,14 +8,13 @@ module.exports = function module(context, messaging, executionContext, stateStor const events = context.apis.foundation.getSystemEvents(); const config = context.sysconfig.teraslice; const cluster = context.cluster; - const exId = process.env.ex_id; - const jobId = process.env.job_id; + let exId = process.env.ex_id; + let jobId = process.env.job_id; const ID = `${context.sysconfig.teraslice.hostname}__${cluster.worker.id}`; const queue = executionContext.queue; const recycle = _recycleFn(); const host = messaging.getHostUrl(); - const finalShutdown = _.once(shutdown); const logger = context.apis.foundation.makeLogger({ ex_id: exId, @@ -30,7 +29,6 @@ module.exports = function module(context, messaging, executionContext, stateStor // this will be used to keep track of the previously sent message just in case of a disconnect let sentMessage = false; - messaging.register({ event: 'worker:shutdown', callback: finalShutdown }); messaging.register({ event: 'assets:loaded', @@ -44,15 +42,8 @@ module.exports = function module(context, messaging, executionContext, stateStor // getting a slice means the previous message was handled sentMessage = false; logger.info(`received slice: ${msg.payload.slice_id}`); - const sliceLogger = context.apis.foundation.makeLogger({ - ex_id: exId, - job_id: jobId, - module: 'slice', - worker_id: ID, - slice_id: msg.payload.slice_id - }); - _executeSlice(msg.payload, sliceLogger); + _processSlice(msg.payload); } }); @@ -68,8 +59,9 @@ module.exports = function module(context, messaging, executionContext, stateStor messaging.register({ event: 'network:error', callback: (err) => { + const errMsg = parseError(err); events.emit('network:error'); - logger.error('Error in worker socket: ', err); + logger.error(`Error in worker socket, error: ${errMsg}`); } }); @@ -89,84 +81,51 @@ module.exports = function module(context, messaging, executionContext, stateStor if (sentMessage) { logger.warn('reconnecting to slicer, previous slice: ', sentMessage); sentMessage.retry = true; - messaging.send({ to: 'execution_controller', message: 'worker:slice:complete', worker_id: ID, payload: sentMessage }); + messaging.send({ + to: 'execution_controller', + message: 'worker:slice:complete', + worker_id: ID, + payload: sentMessage + }); } else { logger.debug(`a worker has made the initial connection to slicer ex_id: ${exId}`); - messaging.send({ to: 'execution_controller', message: 'worker:ready', worker_id: ID, payload: { worker_id: ID } }); + messaging.send({ + to: 'execution_controller', + message: 'worker:ready', + worker_id: ID, + payload: { worker_id: ID } + }); } } }); - // TODO rename initialize - messaging.initialize(); + + messaging.listen(); logger.info(`worker ${ID} is online, communicating with host: ${host}`); - function _sliceCompleted(sliceResults, slice, msg, specData, sliceLogger) { + function _sliceCompleted(sliceResults, sliceMetaData, slice, specData, sliceLogger) { // resultResponse may need to be checked in later functionality - return stateStore.updateState(slice, 'completed', slice.index) + return stateStore.updateState(sliceMetaData, 'completed') .then(() => { - events.emit('slice:success', slice); - sentMessage = { worker_id: ID, slice, analytics: specData }; - sliceLogger.info('completed slice: ', slice); + events.emit('slice:success', sliceMetaData); + sentMessage = { worker_id: ID, slice: sliceMetaData, analytics: specData }; + sliceLogger.info('completed slice: ', sliceMetaData); if (isShuttingDown) { sentMessage.isShuttingDown = true; } - if (specData) { - if (executionContext.reporter) { - executionContext.reporter(context, executionContext.config, specData); - _logMessage(sliceLogger, msg); - } else { - _logMessage(sliceLogger, msg, specData); - return analyticsStore.log(executionContext, slice, specData) - .catch((err) => { - const errMsg = parseError(err); - logger.error(`Failure when storing analytics: ${errMsg}`); - }); - } + _logSliceStats(sliceLogger, slice, specData); + return analyticsStore.log(executionContext, sliceMetaData, specData) + .catch((err) => { + const errMsg = parseError(err); + logger.error(`Failure when storing analytics: ${errMsg}`); + }); } return true; }) - .finally(() => { - events.emit('slice:finalize'); - isDone = true; - recycle(); - }); - } - - function _retrySlice(slice, sliceLogger) { - const maxRetries = executionContext.config.max_retries; - let retryCount = 0; - - return (err) => { - const errMsg = parseError(err); - sliceLogger.error(`An error has occurred: ${errMsg}, message: `, slice); - if (maxRetries) { - retryCount += 1; - if (retryCount >= maxRetries) { - sliceLogger.error('Max retires has been reached for: ', slice); - sliceFailed(err, slice, sliceLogger) - .finally(() => { - events.emit('slice:finalize', slice); - isDone = true; - recycle(); - }); - } else { - events.emit('slice:retry', slice); - _executeSlice(slice, sliceLogger); - } - } else { - // no retries, proceed to next slice - sliceFailed(err, slice, sliceLogger) - .finally(() => { - events.emit('slice:finalize', slice); - isDone = true; - recycle(); - }); - } - }; + .finally(() => _sliceCleanup(slice)); } - function sliceFailed(err, slice, sliceLogger) { + function _sliceFailed(err, slice, sliceLogger) { const errMsg = parseError(err); sentMessage = { worker_id: ID, slice, error: errMsg }; events.emit('slice:failure', slice); @@ -178,27 +137,66 @@ module.exports = function module(context, messaging, executionContext, stateStor }); } + function _processSlice(slice) { + const sliceLogger = context.apis.foundation.makeLogger({ + ex_id: exId, + job_id: jobId, + module: 'slice', + worker_id: ID, + slice_id: slice.slice_id + }); - function _executeSlice(slice, sliceLogger) { - const msg = slice.request; - let finalQueue = queue; + let operations = queue; let specData; if (executionContext.config.analytics) { specData = { time: [], size: [], memory: [] }; - - finalQueue = queue.map(fn => fn.bind(null, specData)); + operations = queue.map(fn => fn.bind(null, specData)); } + const retrySliceFn = _retrySliceModule(slice, operations, sliceLogger, specData); + + _executeSlice(slice, operations, sliceLogger, specData) + .catch(retrySliceFn); + } + - Promise.reduce(finalQueue, (prev, fn) => Promise.resolve(prev) + function _executeSlice(slice, operations, sliceLogger, specData) { + const msg = slice.request; + return Promise.reduce(operations, (prev, fn) => Promise.resolve(prev) .then(data => fn(data, sliceLogger, msg) ), msg) - .then(sliceResults => _sliceCompleted(sliceResults, slice, msg, specData, sliceLogger)) - .catch(_retrySlice) + .then((sliceResults) => { + _sliceCompleted(sliceResults, slice, msg, specData, sliceLogger); + }); + } + + function _retrySliceModule(slice, operations, sliceLogger, specData) { + const maxRetries = executionContext.config.max_retries; + let retryCount = 0; + + return function retrySlice(err) { + const errMsg = parseError(err); + sliceLogger.error(`An error has occurred: ${errMsg}, message: `, slice); + if (maxRetries) { + retryCount += 1; + if (retryCount >= maxRetries) { + sliceLogger.error('Max retires has been reached for: ', slice); + _sliceFailed(err, slice, sliceLogger) + .finally(_sliceCleanup(slice)); + } else { + events.emit('slice:retry', slice); + _executeSlice(slice, operations, sliceLogger, specData) + .catch(retrySlice); + } + } else { + // no retries, proceed to next slice + _sliceFailed(err, slice, sliceLogger) + .finally(_sliceCleanup(slice)); + } + }; } - // TODO consider rename - function _logMessage(sliceLogger, msg, specData) { + function _logSliceStats(sliceLogger, msg, specData) { const str = 'analytics for slice '; let dataStr = ''; @@ -220,48 +218,33 @@ module.exports = function module(context, messaging, executionContext, stateStor let counter = config.shutdown_timeout; isShuttingDown = true; events.emit('worker:shutdown'); - const shutDownInterval = setInterval(() => { - logger.trace(`is done with current slice: ${isDone}, shutdown counter: ${counter}`); - if (isDone || counter <= 0) { - logger.info('shutting down'); - clearInterval(shutDownInterval); - const shutdownSequence = []; - shutdownSequence.push(analyticsStore.shutdown); - shutdownSequence.push(stateStore.shutdown); - shutdownSequence.push(logger.flush); - - Promise.reduce(shutdownSequence, (prev, curr) => Promise.resolve(prev) - .then(() => curr()) - .catch((err) => { - const errMsg = parseError(err); - logger.error(`Error shutting down store ${prev.name}`, errMsg); - return curr(); - })) - .then(() => { - process.exit(); - }) - .catch((err) => { - const errMsg = parseError(err); - logger.error(errMsg); - // TODO: the log might never be called to stdout with process.exit here - process.exit(); - }); - } else { - if (counter % 6000 === 0) { - logger.info(`shutdown sequence initiated, but is still processing. Will force shutdown in ${counter / 1000} seconds`); - } + return new Promise((resolve) => { + const shutDownInterval = setInterval(() => { + logger.trace(`is done with current slice: ${isDone}, shutdown counter: ${counter}`); + if (isDone || counter <= 0) { + logger.info('shutting down'); + clearInterval(shutDownInterval); + + Promise.resolve() + .then(logger.flush) + .finally(resolve(true)); + } else { + if (counter % 6000 === 0) { + logger.info(`shutdown sequence initiated, but is still processing. Will force shutdown in ${counter / 1000} seconds`); + } - counter -= 1000; - } - }, 1000); + counter -= 1000; + } + }, 1000); + }); } - // TODO seperate messaging sending from recycle + function _recycleFn() { const recycleWorker = executionContext.config.recycle_worker; const recycleWorkerRandomFactor = _.random(75, 100); let sliceCount = 0; - return function () { + return function (sentMsg) { if (recycleWorker) { sliceCount += 1; // recycle worker between 75% and 100% of executionContext.config.recycle_worker @@ -269,14 +252,43 @@ module.exports = function module(context, messaging, executionContext, stateStor if (sliceCount >= recycleCount && !isShuttingDown) { logger.info(`worker: ${ID} recycled after processing ${sliceCount} slices`); // still need to signal that slice completed and not be enqueued - sentMessage.isShuttingDown = true; + sentMsg.isShuttingDown = true; setTimeout(() => { - process.exit(42); + events.emit('worker:recycle'); }, 100); } + } + }; + } - messaging.send({ to: 'execution_controller', message: 'worker:slice:complete', worker_id: ID, payload: sentMessage }); + function _sliceCleanup(slice) { + events.emit('slice:finalize', slice); + isDone = true; + recycle(sentMessage); + messaging.send({ + to: 'execution_controller', + message: 'worker:slice:complete', + worker_id: ID, + payload: sentMessage + }); + } + + function testContext(_exId, _jobId) { + if (_exId) exId = _exId; + if (_jobId) jobId = _jobId; + + return { + _lastMessage: () => sentMessage, + _sliceFailed, + _sliceCompleted, + _recycleFn, + _retrySliceModule }; } + + return { + shutdown, + __test_context: testContext + }; }; diff --git a/lib/cluster/worker/index.js b/lib/cluster/worker/index.js index 423af75a0a7..03107d738dc 100644 --- a/lib/cluster/worker/index.js +++ b/lib/cluster/worker/index.js @@ -2,6 +2,7 @@ const Promise = require('bluebird'); const parseError = require('error_parser'); +const _ = require('lodash'); const messagingFn = require('../services/messaging'); module.exports = function module(context) { @@ -20,13 +21,20 @@ module.exports = function module(context) { }); // need ipc channels open before job construction const messaging = messagingFn(context, logger); + const shutdown = _.once(processShutdown); + + let stateStore; + let analyticsStore; + let executor; // if worker cannot make client, job needs to shutdown, needs to be setup before executionRunner events.on('client:initialization:error', terminalShutdown); messaging.register({ event: 'process:SIGTERM', callback: () => {} }); messaging.register({ event: 'process:SIGINT', callback: () => {} }); + messaging.register({ event: 'worker:shutdown', callback: shutdown }); + events.on('worker:recycle', shutdown); function initializeWorker() { // assets store is loaded so it can register under context.apis @@ -36,7 +44,11 @@ module.exports = function module(context) { require('../storage/state')(context), require('../storage/analytics')(context) ])) - .spread((executionContext, stateStore, analyticsStore) => require('./executor')(context, messaging, executionContext, stateStore, analyticsStore)) + .spread((executionContext, _stateStore, _analyticsStore) => { + stateStore = _stateStore; + analyticsStore = _analyticsStore; + executor = require('./executor')(context, messaging, executionContext, stateStore, analyticsStore); + }) .catch((err) => { const errMsg = `worker: ${ID} could not instantiate for execution: ${exId}, error: ${parseError(err)}`; logger.error(errMsg); @@ -58,13 +70,27 @@ module.exports = function module(context) { }); } - // TODO stick store shutdown logic out here - function terminalShutdown(errEV) { logger.error(`Terminal error, shutting down execution ${exId}`); events.emit('worker:shutdown'); messaging.respond({ message: 'execution:error:terminal', error: errEV.error, ex_id: exId }); } + function processShutdown() { + const shutdownSequence = []; + if (stateStore) shutdownSequence.push(stateStore.shutdown()); + if (analyticsStore) shutdownSequence.push(analyticsStore.shutdown()); + if (executor) shutdownSequence.push(executor.shutdown()); + + Promise.all(shutdownSequence) + .then(logger.flush) + .catch((err) => { + const errMsg = parseError(err); + logger.error(`Error while attempting to shutdown worker ${ID}, error: ${errMsg}`); + return logger.flush(); + }) + .finally(process.exit); + } + initializeWorker(); }; diff --git a/spec/execution_controller/engine-spec.js b/spec/execution_controller/engine-spec.js index 79820168bdf..37c0ddd903e 100644 --- a/spec/execution_controller/engine-spec.js +++ b/spec/execution_controller/engine-spec.js @@ -61,7 +61,7 @@ describe('execution engine', () => { register: (obj) => { messagingEvents[obj.event] = obj.callback; }, - initialize: () => {}, + listen: () => {}, respond: (msg) => { respondingMsg = msg; } }; const executionAnalytics = { @@ -156,7 +156,7 @@ describe('execution engine', () => { .then(() => { expect(logInfo).toEqual('execution: 1234 has initialized and is listening on port 3000'); }) - .then(testContext._shutdown()) + .then(engine.shutdown()) .catch(fail) .finally(done); }); @@ -173,8 +173,6 @@ describe('execution engine', () => { expect(typeof messagingEvents['worker:slice:complete']).toEqual('function'); expect(messagingEvents['network:disconnect']).toBeDefined(); expect(typeof messagingEvents['network:disconnect']).toEqual('function'); - expect(messagingEvents['worker:shutdown']).toBeDefined(); - expect(typeof messagingEvents['worker:shutdown']).toEqual('function'); expect(messagingEvents['assets:loaded']).toBeDefined(); expect(typeof messagingEvents['assets:loaded']).toEqual('function'); }); @@ -579,7 +577,7 @@ describe('execution engine', () => { const pause = messagingEvents['cluster:execution:pause']; const resume = messagingEvents['cluster:execution:resume']; - const shutdown = engineTestContext._shutdown; + const shutdown = engine.shutdown; engine.initialize() .then(() => { @@ -686,18 +684,15 @@ describe('execution engine', () => { const engine = makeEngine(); const exId = 1234; const engineTextContext = engine.__test_context(executionAnalytics, slicerAnalytics, recovery, exId); - const executionShutdown = engineTextContext._executionShutdown; + const executionShutdown = engine.shutdown; let gotStopEvent = false; - let gotShutdownEvent = false; myEmitter.on('execution:stop', () => gotStopEvent = true); - myEmitter.on('execution:shutdown', () => gotShutdownEvent = true); Promise.all([executionShutdown(), waitFor(50)]) .then(() => { expect(logInfo).toEqual(`slicer for execution: ${exId} has received a shutdown notice`); expect(gotStopEvent).toEqual(true); - expect(gotShutdownEvent).toEqual(true); }) .catch(fail) .finally(done); diff --git a/spec/worker-spec.js b/spec/worker-spec.js index 81d0c495478..aad3b8ac34a 100644 --- a/spec/worker-spec.js +++ b/spec/worker-spec.js @@ -1,22 +1,29 @@ 'use strict'; -const events = require('events'); -const eventEmitter = new events.EventEmitter(); +const eventsModule = require('events'); +const _ = require('lodash'); -fdescribe('Worker', () => { +const eventEmitter = new eventsModule.EventEmitter(); + +describe('Worker', () => { const workerExecutorModule = require('../lib/cluster/worker/executor'); + const messagingEvents = {}; + + let updatedSlice; + let analyticsData; let sentMsg; - let loggerErrMsg; + let errorMsg; let debugMsg; - let logInfo; + let logMsg; let warnMsg; + let loggerConfig; const logger = { error(err) { - loggerErrMsg = err; + errorMsg = err; }, info(info) { - logInfo = info; + logMsg = info; }, warn(msg) { warnMsg = msg; @@ -31,12 +38,16 @@ fdescribe('Worker', () => { const context = { sysconfig: { teraslice: { - hostname: 'testHostName' + hostname: 'testHostName', + shutdown_timeout: 60000 } }, apis: { foundation: { - makeLogger: () => logger, + makeLogger: (_config) => { + loggerConfig = _config; + return logger; + }, getSystemEvents: () => eventEmitter } }, @@ -47,10 +58,12 @@ fdescribe('Worker', () => { } }; const messaging = { - register: () => {}, + register: (obj) => { + messagingEvents[obj.event] = obj.callback; + }, getHostUrl: () => 'someURL', send: _sentMsg => sentMsg = _sentMsg, - initialize: () => {} + listen: () => {} }; const executionContext = { queue: [], @@ -60,14 +73,375 @@ fdescribe('Worker', () => { recycle_worker: false } }; - const stateStore = {}; - const analyticsStore = {}; + const stateStore = { + updateState: (slice, type, errMsg) => { + updatedSlice = { slice, type, errMsg }; + return Promise.resolve(true); + } + }; + const analyticsStore = { + log: (_executionContext, slice, specData) => { + analyticsData = { executionContext: _executionContext, slice, specData }; + return Promise.resolve(true); + } + }; + + function makeEmitter() { + const newEmitter = new eventsModule.EventEmitter(); + context.apis.foundation.getSystemEvents = () => newEmitter; + return newEmitter; + } - function getWorker() { - return workerExecutorModule(context, messaging, executionContext, stateStore, analyticsStore); + function waitFor(timeout) { + return new Promise((resolve) => { + setTimeout(() => { + resolve(true); + }, timeout); + }); } - it('can load', () => { + function getWorker(_executionContext) { + const execution = _executionContext || executionContext; + return workerExecutorModule(context, messaging, execution, stateStore, analyticsStore); + } + + it('can load without throwing', () => { expect(() => getWorker()).not.toThrowError(); + const module = getWorker(); + expect(module).toBeDefined(); + expect(typeof module).toEqual('object'); + expect(module.shutdown).toBeDefined(); + expect(typeof module.shutdown).toEqual('function'); + expect(module.__test_context).toBeDefined(); + expect(typeof module.__test_context).toEqual('function'); + }); + + it('registers messsaging events', () => { + getWorker(); + expect(messagingEvents['assets:loaded']).toBeDefined(); + expect(typeof messagingEvents['assets:loaded']).toEqual('function'); + expect(messagingEvents['slicer:slice:new']).toBeDefined(); + expect(typeof messagingEvents['slicer:slice:new']).toEqual('function'); + expect(messagingEvents['slicer:slice:recorded']).toBeDefined(); + expect(typeof messagingEvents['slicer:slice:recorded']).toEqual('function'); + expect(messagingEvents['network:error']).toBeDefined(); + expect(typeof messagingEvents['network:error']).toEqual('function'); + expect(messagingEvents['network:disconnect']).toBeDefined(); + expect(typeof messagingEvents['network:disconnect']).toEqual('function'); + expect(messagingEvents['network:connect']).toBeDefined(); + expect(typeof messagingEvents['network:connect']).toEqual('function'); + }); + + it('assets loaded event', (done) => { + const events = makeEmitter(); + const data = { some: 'assetMetaData' }; + let innerEventCalled = false; + let respData; + getWorker(); + + events.on('execution:assets_loaded', (resp) => { + innerEventCalled = true; + respData = resp; + }); + + messagingEvents['assets:loaded'](data); + + waitFor(10) + .then(() => { + expect(innerEventCalled).toEqual(true); + expect(respData).toEqual(data); + done(); + }); + }); + + it('transmits network errors', (done) => { + const events = makeEmitter(); + let innerEventCalled = false; + getWorker(); + + events.on('network:error', () => { + innerEventCalled = true; + }); + + messagingEvents['network:error']('some error'); + + waitFor(10) + .then(() => { + expect(innerEventCalled).toEqual(true); + expect(errorMsg).toEqual('Error in worker socket, error: some error'); + done(); + }); + }); + + it('can mark slices as failed', (done) => { + const events = makeEmitter(); + let innerEventCalled = false; + const worker = getWorker(); + const sliceId = 'some5Lic8'; + const error = new Error('some slice error'); + const sliceFailed = worker.__test_context()._sliceFailed; + const lastMessage = worker.__test_context()._lastMessage; + + const slice = { payload: { slice_id: sliceId, some: 'data' } }; + + events.on('slice:failure', () => { + innerEventCalled = true; + }); + + sliceFailed(error, slice, logger); + + waitFor(10) + .then(() => { + expect(innerEventCalled).toEqual(true); + expect(errorMsg).toEqual(`failed to process ${JSON.stringify(lastMessage())}, slice state has been marked as error`); + expect(updatedSlice).toBeDefined(); + expect(updatedSlice.slice).toEqual(slice); + expect(updatedSlice.type).toEqual('error'); + expect(updatedSlice.errMsg).toEqual(error.stack); + }) + .finally(done); + }); + + it('can mark slices as completed', (done) => { + const events = makeEmitter(); + const worker = getWorker(); + const sliceId = 'some5Lic8'; + const sliceCompleted = worker.__test_context()._sliceCompleted; + let gotSuccess = false; + let gotFinalize = false; + + const sliceMetaData = { + slice_id: sliceId, + request: { some: 'data' } + }; + const slice = sliceMetaData.request; + const specData = { time: [12, 24], size: [50, 50], memory: [1234, 5678] }; + + const endingMsg = { + to: 'execution_controller', + message: 'worker:slice:complete', + worker_id: 'testHostName__someID', + payload: { + worker_id: 'testHostName__someID', + slice: { + slice_id: 'some5Lic8', + request: { some: 'data' } + }, + analytics: { + time: [12, 24], + size: [50, 50], + memory: [1234, 5678] + } + }, + }; + + events.on('slice:success', (results) => { + gotSuccess = results; + }); + + events.on('slice:finalize', () => { + gotFinalize = true; + }); + + Promise.resolve() + .then(() => Promise.all([ + sliceCompleted(null, sliceMetaData, slice, specData, logger), + waitFor(30) + ])) + .then(() => { + expect(gotSuccess).toEqual(sliceMetaData); + expect(gotFinalize).toEqual(true); + expect(sentMsg).toEqual(endingMsg); + }) + .catch(fail) + .finally(done); + }); + + it('will emit recycle after so many invocations', (done) => { + const events = makeEmitter(); + const myExecution = _.cloneDeep(executionContext); + myExecution.config.recycle_worker = 2; + const worker = getWorker(myExecution); + const recycle = worker.__test_context()._recycleFn(); + const sentMessage = {}; + let gotEvent = false; + + + events.on('worker:recycle', () => { + gotEvent = true; + }); + + recycle(sentMessage); + expect(sentMessage).toEqual(sentMessage); + expect(gotEvent).toEqual(false); + + waitFor(120) + .then(() => { + expect(sentMessage).toEqual({ isShuttingDown: true }); + expect(gotEvent).toEqual(true); + done(); + }); + }); + + it('can shutdown', (done) => { + const events = makeEmitter(); + let innerEventCalled = false; + const worker = getWorker(); + + events.on('worker:shutdown', () => { + innerEventCalled = true; + }); + + Promise.all([worker.shutdown(), waitFor(10)]) + .then(() => { + expect(innerEventCalled).toEqual(true); + }) + .catch(fail) + .finally(done); + }); + + it('can process slices', (done) => { + const myExecution = _.cloneDeep(executionContext); + + function makeData(analyticsObj) { + return () => Promise.resolve([{ data: 'someData' }, { data: 'otherData' }]); + } + function mapData(analyticsObj) { + return results => results.map(obj => obj.data); + } + + myExecution.queue = [makeData, mapData]; + const exId = '1234'; + const jobId = '5678'; + const events = makeEmitter(); + const worker = getWorker(myExecution); + const lastMessage = worker.__test_context(exId, jobId)._lastMessage; + + const slice = { slice_id: 'as35g' }; + let sliceSuccess = false; + + events.on('slice:success', () => { + sliceSuccess = true; + }); + + Promise.all([ + messagingEvents['slicer:slice:new']({ payload: slice }), + waitFor(10) + ]) + .then(() => { + expect(loggerConfig).toEqual({ + ex_id: exId, + job_id: jobId, + module: 'slice', + worker_id: 'testHostName__someID', + slice_id: slice.slice_id + }); + + expect(updatedSlice.slice).toEqual(slice); + expect(updatedSlice.type).toEqual('completed'); + expect(sliceSuccess).toEqual(true); + expect(lastMessage()).toEqual({ + worker_id: 'testHostName__someID', + slice: { slice_id: 'as35g' }, + analytics: { time: [], size: [], memory: [] } + }); + }) + .catch(fail) + .finally(done); + }); + + it('can keep track of last sent messages', (done) => { + const exId = '1234'; + const jobId = '5678'; + const worker = getWorker(); + const lastMessage = worker.__test_context(exId, jobId)._lastMessage; + const slice = { slice_id: 'as35g' }; + + Promise.all([ + messagingEvents['slicer:slice:new']({ payload: slice }), + waitFor(10) + ]) + .then(() => { + const theLastMessage = lastMessage(); + expect(theLastMessage).toEqual({ + worker_id: 'testHostName__someID', + slice: { slice_id: 'as35g' }, + analytics: { time: [], size: [], memory: [] } + }); + // this should add the retry key to last sent message + messagingEvents['network:connect'](); + const newLastMessage = Object.assign({}, theLastMessage, { retry: true }); + expect(lastMessage()).toEqual(newLastMessage); + expect(sentMsg).toEqual({ + to: 'execution_controller', + message: 'worker:slice:complete', + worker_id: 'testHostName__someID', + payload: newLastMessage + }); + + messagingEvents['slicer:slice:recorded'](); + expect(lastMessage()).toEqual(false); + messagingEvents['network:connect'](); + expect(sentMsg).toEqual({ + to: 'execution_controller', + message: 'worker:ready', + worker_id: 'testHostName__someID', + payload: { worker_id: 'testHostName__someID' } + }); + }) + .catch(fail) + .finally(done); + }); + + it('can retry slice executions', (done) => { + const exId = '1234'; + const jobId = '5678'; + const slice = { slice_id: 'as35g' }; + const events = makeEmitter(); + const worker = getWorker(); + const _retrySliceModule = worker.__test_context(exId, jobId)._retrySliceModule; + const errEvent = new Error('an error'); + const retrySlice = _retrySliceModule(slice, [() => Promise.reject('an error'), () => Promise.reject('an error')], logger, {}); + let sliceRetry = false; + + events.on('slice:retry', (response) => { + sliceRetry = response; + }); + + Promise.all([retrySlice(errEvent), waitFor(10)]) + .then(() => { + expect(sliceRetry).toEqual(slice); + return Promise.all([retrySlice(errEvent), retrySlice(errEvent), waitFor(50)]); + }) + .then(() => { + expect(updatedSlice.type).toEqual('error'); + expect(updatedSlice.slice).toEqual(slice); + }) + .catch(fail) + .finally(done); + }); + + it('can get a network disconnect event', (done) => { + const exId = '1234'; + const jobId = '5678'; + const events = makeEmitter(); + const worker = getWorker(); + worker.__test_context(exId, jobId); + + let gotDisconnectEvent = false; + const disconnEvent = { some: 'event' }; + const finalErrorMessage = `worker testHostName__someID has disconnected from slicer ex_id ${exId}`; + + events.on('network:disconnect', () => { + gotDisconnectEvent = true; + }); + + Promise.all([messagingEvents['network:disconnect'](disconnEvent), waitFor(10)]) + .then(() => { + expect(gotDisconnectEvent).toEqual(true); + expect(errorMsg).toEqual(finalErrorMessage); + }) + .catch(fail) + .finally(done); }); });