Skip to content

Commit

Permalink
Merge pull request #523 from jsnoble/job_duplication
Browse files Browse the repository at this point in the history
prevent cleanUpNode fn from duplicating jobs resolves #454
  • Loading branch information
godber authored Sep 5, 2017
2 parents 138e17a + cc34a42 commit e4d94b8
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
30 changes: 22 additions & 8 deletions lib/cluster/services/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ module.exports = function(context, server) {
});

messaging.register('network:error', function(err) {
var errMsg = parseError(err)
var errMsg = parseError(err);
logger.error(`Error : cluster_master had an error with one of its connections`, errMsg)
});

Expand Down Expand Up @@ -254,17 +254,17 @@ module.exports = function(context, server) {
});
}

_.forIn(node.workerJobID, function(value) {
_.forIn(node.workerJobID, function(numOfWorkers, ex_id) {
//looking for unique ex_id's not in slicerJobID
if (!node.slicerJobID[value]) {
if (!node.slicerJobID[ex_id]) {
//emitting to job service, since cluster is instantiated before job service is ready
var numOfWorkers = cluster_state[node_id].active.filter(function(worker) {
return worker.ex_id === value;
return worker.ex_id === ex_id;
}).length;

events.emit('cluster_service:cleanup_job', {
ex_id: value,
id: value,
ex_id: ex_id,
id: ex_id,
node_id: node_id,
workers: numOfWorkers,
assignment: 'worker'
Expand Down Expand Up @@ -306,7 +306,13 @@ module.exports = function(context, server) {

if (curr.assignment === 'worker') {
prev.numOfWorkers++;
prev.workerJobID[curr.ex_id] = curr.ex_id;
//if not resgistered, set it to one, if so then increment it
if (!prev.workerJobID[curr.ex_id]) {
prev.workerJobID[curr.ex_id] = 1;
}
else {
prev.workerJobID[curr.ex_id] += 1;
}
}

return prev;
Expand Down Expand Up @@ -500,7 +506,15 @@ module.exports = function(context, server) {
}
}
//if left over worker requests, enqueue them, queue works based off of id, so it redundantly references ex_id
var workerData = {job: jobStr, id: ex_id, ex_id: ex_id, job_id: job_id, workers: 1, assignment: 'worker'};
var workerData = {
job: jobStr,
id: ex_id,
ex_id: ex_id,
job_id: job_id,
workers: 1,
assignment: 'worker',
needsAssets: needsAssets
};

while (numOfWorkersRequested > 0) {
logger.trace(`adding worker to pending queue for ex: ${ex_id}`);
Expand Down
43 changes: 34 additions & 9 deletions lib/cluster/services/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,36 @@ module.exports = function(context, cluster_service) {

events.on('cluster_service:cleanup_job', function(data) {
var options = {running: true, failing: true, paused: true};
getExecutionContext(data.ex_id).then(function(ex) {
if (options[ex._status]) {
logger.warn(`node ${data.node_id} has disconnected with active workers for job: ${data.ex_id} , enqueuing the workers`);
restartExecution(ex.ex_id)
}
}).catch(function(err) {
logger.error(`could not cleanup job for ex: ${data.ex_id}`, err)
});
getExecutionContext(data.ex_id)
.then(function(ex) {
if (options[ex._status]) {
logger.warn(`node ${data.node_id} has disconnected with active workers for job: ${data.ex_id} , enqueuing the workers`);
var needsAssets = ex.assets && ex.assets.length > 0;
var numOfWorkers = data.workers;
var jobStr = JSON.stringify(ex);

var requestedWorkersData = {
job: jobStr,
id: ex.ex_id,
ex_id: ex.ex_id,
job_id: ex.job_id,
node_id: node_id,
workers: 1,
assignment: 'worker',
needsAssets: needsAssets
};

while (numOfWorkers > 0) {
logger.trace(`adding worker to pending queue for ex: ${ex.ex_id}`);
//this will add the workers to the pendingWorkerRequests queue
cluster_service.addToQueue(requestedWorkersData);
numOfWorkers -= 1;
}
}
})
.catch(function(err) {
logger.error(`could not cleanup job for ex: ${data.ex_id}`, err)
});
});

events.on('moderate_jobs:pause', function(connectionList) {
Expand Down Expand Up @@ -502,7 +524,10 @@ module.exports = function(context, cluster_service) {
.then(function() {
cluster_service.allocateSlicer(executionContext, recover_execution)
.then(function() {
return _setStatus(executionContext, 'initializing');
return _setStatus(executionContext, 'initializing', {
slicer_port: executionContext.slicer_port,
slicer_hostname: executionContext.slicer_hostname
});
})
.then(function() {
return cluster_service.allocateWorkers(executionContext, executionContext.workers)
Expand Down
10 changes: 9 additions & 1 deletion lib/cluster/storage/backends/mappings/job.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"type": "string",
"index": "not_analyzed"
},
"_context":{
"_context": {
"type": "string",
"index": "not_analyzed"
},
Expand All @@ -30,6 +30,14 @@
"type": "string",
"index": "not_analyzed"
},
"slicer_hostname": {
"type": "string",
"index": "not_analyzed"
},
"slicer_port": {
"type": "string",
"index": "not_analyzed"
},
"_slicer_stats": {
"type": "object"
},
Expand Down

0 comments on commit e4d94b8

Please sign in to comment.