Skip to content

Commit

Permalink
Merge pull request #522 from jsnoble/stop_job_timeouts
Browse files Browse the repository at this point in the history
fixed stop job timeout issues resolves #434
  • Loading branch information
godber authored Sep 5, 2017
2 parents 469afd7 + 9614f6a commit 138e17a
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 5 deletions.
58 changes: 58 additions & 0 deletions integration-tests/spec/cases/cluster/job-state.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
'use strict';

var _ = require('lodash');
var Promise = require('bluebird');
var misc = require('../../misc')();
var wait = require('../../wait')();

module.exports = function() {
var teraslice = misc.teraslice();

describe('worker allocation', function() {

it('should cycle through after state changes with other jobs running', function(done) {
var job_spec1 = misc.newJob('generator');
var job_spec2 = misc.newJob('generator');
var job1_id;
var job2_id;
job_spec2.operations[1].name = 'second_generator';

Promise.all([teraslice.jobs.submit(job_spec1), teraslice.jobs.submit(job_spec2)])
.spread(function(job1, job2) {
job1_id = job1.id();
job2_id = job2.id();
expect(job1_id).toBeDefined();
expect(job2_id).toBeDefined();

return job1.waitForStatus('running')
.then(function() {
return job1.pause();
})
.then(function() {
return job1.waitForStatus('paused');
})
.then(function() {
return job1.resume();
})
.then(function() {
return job1.waitForStatus('running');
})
.then(function() {
return job1.stop();
})
.then(function() {
return job1.waitForStatus('stopped');
})
.then(function() {
return job2.stop();
})
.then(function() {
return job2.waitForStatus('stopped');
})
})
.catch(fail)
.finally(done);
});
});

};
2 changes: 1 addition & 1 deletion integration-tests/spec/cases/data/reindex.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ module.exports = function() {
fail()
})
.finally(done)
})
});

it('should complete after lifecycle changes', function(done) {
var job_spec = misc.newJob('reindex');
Expand Down
16 changes: 16 additions & 0 deletions integration-tests/spec/fixtures/jobs/generator.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "generator",
"slicers": 1,
"lifecycle": "persistent",
"workers": 3,
"operations": [
{
"_op": "elasticsearch_data_generator",
"size": 1000,
"stress_test": false
},
{
"_op": "noop"
}
]
}
3 changes: 2 additions & 1 deletion integration-tests/spec/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ describe('teraslice', function() {
process.exit(2)
})
});


require('./cases/cluster/job-state')();
require('./cases/data/id-reader')();
require('./cases/data/elasticsearch-bulk')();
require('./cases/data/reindex')();
Expand Down
4 changes: 2 additions & 2 deletions lib/cluster/node_master.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ module.exports = function(context) {
var intervalTime = 200;

messageWorkers(context, data, {message: 'worker:shutdown'}, function(worker) {
if (worker.slicer_port) {
if (worker.ex_id === data.ex_id && worker.slicer_port) {
slicerFound = true;
}
return worker.ex_id === data.ex_id
Expand Down Expand Up @@ -364,7 +364,7 @@ module.exports = function(context) {
assignment: clusterWorkers[childID].assignment,
pid: clusterWorkers[childID].process.pid
};

if (clusterWorkers[childID].ex_id) {
child.ex_id = clusterWorkers[childID].ex_id
}
Expand Down
2 changes: 1 addition & 1 deletion lib/cluster/services/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ module.exports = function(context, server) {

function findNodesForJob(ex_id, slicer_only) {
var nodes = [];

//TODO but what about disconnected nodes?
_.forOwn(cluster_state, function(node) {
var hasJob = node.active.filter(function(worker) {
if (slicer_only) {
Expand Down

0 comments on commit 138e17a

Please sign in to comment.