Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow slices to be sent to specific workers resolves #624 #653

Merged
merged 3 commits into from
Feb 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions lib/cluster/execution_controller/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ module.exports = function module(context, messaging, exStore, stateStore, execut
executionAnalytics.increment('workers_joined');
// if there are more clients than the length of the queue, increase the queue size
_adjustSlicerQueueLength();
events.emit('worker:ready', workerId);
workerQueue.enqueue(workerOnlineData);
}
});
Expand Down Expand Up @@ -241,14 +242,23 @@ module.exports = function module(context, messaging, exStore, stateStore, execut
function _engineSetup() {
engineFn = function slicerEngineExecution() {
while (workerQueue.size() && slicerQueue.size()) {
const worker = workerQueue.dequeue();
const sliceData = slicerQueue.dequeue();
messaging.send({
to: 'worker',
address: worker.worker_id,
message: 'slicer:slice:new',
payload: sliceData
});
const messageData = { to: 'worker', message: 'slicer:slice:new', payload: sliceData };

if (sliceData.request.request_worker) {
const worker = workerQueue.extract('worker_id', sliceData.request.request_worker);
if (worker) {
messageData.address = worker.worker_id;
messaging.send(messageData);
} else {
events.emit('slice:invalid', sliceData.request);
stateStore.updateState(sliceData.request, 'invalid');
}
} else {
const worker = workerQueue.dequeue();
messageData.address = worker.worker_id;
messaging.send(messageData);
}
}

const currentWorkers = workerQueue.size();
Expand Down
8 changes: 4 additions & 4 deletions lib/cluster/runners/op.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ module.exports = function module(context) {
getClient
});

function findOp(name, assetsPath, jobAssets) {
function findOp(name, assetsPath, executionAssets) {
let filePath;
let codeName;

Expand Down Expand Up @@ -79,7 +79,7 @@ module.exports = function module(context) {
if (filePath) return filePath;

if (assetsPath && existsSync(assetsPath)) {
jobAssets.forEach((assetID) => {
executionAssets.forEach((assetID) => {
const assetOpPath = `${assetsPath}/${assetID}`;
// if the path is not found yet and the opPath exists
if (!filePath && existsSync(assetOpPath)) {
Expand All @@ -103,10 +103,10 @@ module.exports = function module(context) {
return filePath;
}

function load(opName, assetPath, jobAssets) {
function load(opName, assetPath, executionAssets) {
isString(opName);

const codePath = findOp(opName, assetPath, jobAssets);
const codePath = findOp(opName, assetPath, executionAssets);
try {
return require(codePath);
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion lib/cluster/services/assets.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ module.exports = function (context) {
})
.catch((err) => {
const code = err.code ? err.code : 500;
sendError(res, code, err.message);
sendError(res, code, parseError(err));
});
}
};
5 changes: 2 additions & 3 deletions lib/cluster/storage/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ module.exports = function module(context) {
}

function recoverSlices(exId, slicerId) {
const retryQuery = `ex_id:${exId} AND slicer_id:${slicerId} AND NOT state:completed`;

const retryQuery = `ex_id:${exId} AND slicer_id:${slicerId} AND NOT (state:completed OR state:invalid)`;
// Look for all slices that haven't been completed so they can be retried.
return backend.refresh(indexName)
.then(() => backend.search(retryQuery, 0, 5000))
Expand All @@ -101,7 +100,7 @@ module.exports = function module(context) {
}

function shutdown() {
logger.info('shutting down.');
logger.info('shutting down');
return backend.shutdown();
}

Expand Down
3 changes: 1 addition & 2 deletions lib/processors/elasticsearch_bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ var _ = require('lodash');
var getClient = require('../utils/config').getClient;
var getOpConfig = require('../utils/config').getOpConfig;

function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {
var context = context;
var logger;
var opConfig = opConfig;
var limit = opConfig.size;
var client;

var bulk_contexts = {};
var multisend = opConfig.multisend;
var multisend_index_append = opConfig.multisend_index_append;
Expand Down
2 changes: 1 addition & 1 deletion lib/processors/elasticsearch_index_selector.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ var _ = require('lodash');
var getOpConfig = require('../utils/config').getOpConfig;


function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {

function formattedDate(record) {
var offsets = {
Expand Down
2 changes: 1 addition & 1 deletion lib/processors/noop.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';


function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {
return function(data) {
return data;
};
Expand Down
2 changes: 1 addition & 1 deletion lib/processors/save_file.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var fs = require('fs');

function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {
var path = opConfig.file_path;

return function(data) {
Expand Down
2 changes: 1 addition & 1 deletion lib/processors/stdout.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var _ = require('lodash');

function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {
var opConfig = opConfig;

return function(data) {
Expand Down
84 changes: 81 additions & 3 deletions spec/execution_controller/engine-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ describe('execution engine', () => {
let respondingMsg = null;
let executionOperationsUpdate = null;
let exStatus = null;
const updateState = {};

const messaging = {
send: (msg) => {
Expand Down Expand Up @@ -83,16 +84,18 @@ describe('execution engine', () => {
update: (exId, obj) => { executionOperationsUpdate = obj; }
};
const stateStore = {
executionStartingSlice: () => {
},
executionStartingSlice: () => {},
recoverSlices: () => {
const data = testSlices.slice();
testSlices = [];
return Promise.resolve(data);
},
createState: () => {},
count: () => Promise.resolve(0),
shutdown: () => {}
shutdown: () => {},
updateState: (slice, state, error) => {
updateState[state] = slice;
}
};
const executionContext = {
config: {
Expand Down Expand Up @@ -716,4 +719,79 @@ describe('execution engine', () => {
.catch(fail)
.finally(done);
});

it('can send slices to specific workers', (done) => {
const myEmitter = makeEmitter();
const engine = makeEngine();
const exId = 1234;
const engineTextContext = engine.__test_context(executionAnalytics, slicerAnalytics, recovery, exId);
const workerQueue = engineTextContext.workerQueue;
const slicerQueue = engineTextContext.slicerQueue;
const slice1 = { request: { some: 'slice' } };
const slice2 = Object.assign({}, slice1, { request: { request_worker: 3 } });
const slice3 = Object.assign({}, slice1, { request: { request_worker: 99 } });

const worker1 = { worker_id: 1 };
const worker2 = { worker_id: 2 };
const worker3 = { worker_id: 3 };

let invalidSlice = null;

function workerQueueList() {
const results = [];
workerQueue.each(worker => results.push(worker));
return results;
}

engineTextContext._engineSetup();

myEmitter.on('slice:invalid', data => invalidSlice = data);

workerQueue.enqueue(worker1);
workerQueue.enqueue(worker2);
workerQueue.enqueue(worker3);

waitFor(10)
.then(() => {
// We expect that no workers have been allocated yet
expect(workerQueue.size()).toEqual(3);
slicerQueue.enqueue(slice1);
return waitFor(10);
})
.then(() => {
expect(workerQueue.size()).toEqual(2);
expect(sentMsg).toEqual({
to: 'worker',
message: 'slicer:slice:new',
payload: slice1,
address: 1
});
expect(workerQueueList()).toEqual([worker2, worker3]);
slicerQueue.enqueue(slice2);
return waitFor(10);
})
.then(() => {
expect(workerQueue.size()).toEqual(1);
expect(sentMsg).toEqual({
to: 'worker',
message: 'slicer:slice:new',
payload: slice2,
address: 3
});
expect(workerQueueList()).toEqual([worker2]);
slicerQueue.enqueue(slice3);
// check that there was no invalid state records so far
expect(invalidSlice).toEqual(null);
expect(updateState).toEqual({});
return waitFor(10);
})
.then(() => {
expect(workerQueue.size()).toEqual(1);
expect(invalidSlice).toEqual(slice3.request);
expect(updateState).toEqual({ invalid: slice3.request });
expect(workerQueueList()).toEqual([worker2]);
})
.catch(fail)
.finally(done);
});
});