Skip to content

Commit

Permalink
Add a configurable delay before closing websockets.
Browse files Browse the repository at this point in the history
Add a new juttled config option delayed_endpoint_close that controls
how long to wait before wanting to close an endpoint and actually
closing the endpoint. It defaults to 10 seconds.

Modify the websocket unit test to close websockets after 2 seconds and
ensure that there is at least 1 second between the job_end message and
the websocket being closed.
  • Loading branch information
Mark Stemm committed Feb 4, 2016
1 parent e7f876d commit 04b61a3
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 7 deletions.
1 change: 1 addition & 0 deletions lib/job-handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var root_dir;
function init(options) {
job_mgr = new JobManager({max_saved_messages: options.max_saved_messages,
delayed_job_cleanup: options.delayed_job_cleanup,
delayed_endpoint_close: options.delayed_endpoint_close,
config_path: options.config_path});
observer_mgr = new ObserverManager({job_mgr: job_mgr});

Expand Down
4 changes: 3 additions & 1 deletion lib/job-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var JobManager = Base.extend({

self._max_saved_messages = options.max_saved_messages;
self._delayed_job_cleanup = options.delayed_job_cleanup;
self._delayed_endpoint_close = options.delayed_endpoint_close;

// This object emits job_start/job_end events when jobs are
// started and deleted.
Expand Down Expand Up @@ -99,7 +100,8 @@ var JobManager = Base.extend({
} else {
_.extend(job_options, {
endpoints: [],
max_saved_messages: self._max_saved_messages
max_saved_messages: self._max_saved_messages,
delayed_endpoint_close: self._delayed_endpoint_close
});
job = new WebsocketJuttleJob(job_options);
}
Expand Down
8 changes: 7 additions & 1 deletion lib/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ function add_routes(app, options) {

let max_saved_messages = 1024;
let delayed_job_cleanup = 10000;
let delayed_endpoint_close = options.delayed_endpoint_close || 10000;

if (_.has(options.config, 'juttled')) {
if (_.has(options.config.juttled, 'max_saved_messages')) {
Expand All @@ -41,10 +42,15 @@ function add_routes(app, options) {
if (_.has(options.config.juttled, 'delayed_job_cleanup')) {
delayed_job_cleanup = options.config.juttled.delayed_job_cleanup;
}

if (_.has(options.config.juttled, 'delayed_endpoint_close')) {
delayed_endpoint_close = options.config.juttled.delayed_endpoint_close;
}
}

jobs.init({max_saved_messages: max_saved_messages,
delayed_job_cleanup: delayed_job_cleanup,
delayed_endpoint_close: delayed_endpoint_close,
config_path: options.config_path,
root_directory: options.root_directory});
paths.init({root_directory: options.root_directory});
Expand Down Expand Up @@ -89,7 +95,7 @@ function add_routes(app, options) {
jobs.subscribe_job);
app.ws(API_PREFIX + '/observers/:observer_id',
jobs.subscribe_observer);
app.ws(API_PREFIX + '/rendezvous/:topic',
app.ws('/rendezvous/:topic',
paths.rendezvous_topic);

return router;
Expand Down
11 changes: 8 additions & 3 deletions lib/ws-juttle-job.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ var WebsocketJuttleJob = JuttleJob.extend({
self._job_start_msg = undefined;
self._job_end_msg = undefined;

self._delayed_endpoint_close = options.delayed_endpoint_close;

// When this job emits an 'end' event, the program has
// completed, and we should close all websocket connections.
self.events.on('end', function() {
self._endpoints.forEach(function(endpoint) {
endpoint.close();
setTimeout(function() {
self._endpoints.forEach(function(endpoint) {
endpoint.close();
}, self._delayed_endpoint_close);
});
});
},
Expand Down Expand Up @@ -79,7 +83,8 @@ var WebsocketJuttleJob = JuttleJob.extend({
if (self._job_stopped) {
logger.debug(self._log_prefix + 'Received job stopped, sending this endpoint a job_stopped message and closing');
endpoint.send(self._job_end_msg);
endpoint.close();
setTimeout(endpoint.close.bind(endpoint),
self._delayed_endpoint_close);
return;
}
}
Expand Down
18 changes: 16 additions & 2 deletions test/juttle-engine.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ describe('Juttle Engine Tests', function() {
findFreePort(10000, 20000)
.then((freePort) => {
juttleBaseUrl = 'http://localhost:' + freePort + '/api/v0';
juttled = new JuttleEngine({port: freePort, root_directory: juttleRoot});
juttled = new JuttleEngine({port: freePort, root_directory: juttleRoot, delayed_endpoint_close: 2000});
});
});

Expand Down Expand Up @@ -1153,6 +1153,7 @@ describe('Juttle Engine Tests', function() {
var num_ticks = 0;
var num_marks = 0;
var num_sink_ends = 0;
var got_job_end_time = undefined;
ws_client = new WebSocket(juttleBaseUrl + '/jobs/' + job_id);
ws_client.on('message', function(data) {
//console.log("Got Websocket:", data);
Expand Down Expand Up @@ -1186,6 +1187,7 @@ describe('Juttle Engine Tests', function() {
}
]);
} else if (data.type === 'job_end') {
got_job_end_time = Date.now();
expect(data.job_id === job_id);

// Now check that we received all the ticks/marks/etc we expected.
Expand All @@ -1196,7 +1198,6 @@ describe('Juttle Engine Tests', function() {

expect(num_marks).to.be.equal(6);
expect(num_sink_ends).to.equal(2);
done();
} else if (data.type === 'tick') {
num_ticks++;
expect(data.sink_id).to.match(/view\d+/);
Expand Down Expand Up @@ -1226,6 +1227,19 @@ describe('Juttle Engine Tests', function() {
}
}
});

ws_client.on('close', function(data) {

// There should be at least 1 second between
// the job_end message and the websocket
// closing. This shows that
// delayed_websocket_close is actually
// working.

var got_ws_close_time = Date.now();
expect(got_ws_close_time-got_job_end_time).to.be.at.least(1000);
done();
});
});
};

Expand Down

0 comments on commit 04b61a3

Please sign in to comment.