diff --git a/lib/job-handlers.js b/lib/job-handlers.js index eb5be3e..3846cb5 100644 --- a/lib/job-handlers.js +++ b/lib/job-handlers.js @@ -16,6 +16,7 @@ var root_dir; function init(options) { job_mgr = new JobManager({max_saved_messages: options.max_saved_messages, delayed_job_cleanup: options.delayed_job_cleanup, + delayed_endpoint_close: options.delayed_endpoint_close, config_path: options.config_path}); observer_mgr = new ObserverManager({job_mgr: job_mgr}); diff --git a/lib/job-manager.js b/lib/job-manager.js index 470876d..6aa9d7d 100644 --- a/lib/job-manager.js +++ b/lib/job-manager.js @@ -22,6 +22,7 @@ var JobManager = Base.extend({ self._max_saved_messages = options.max_saved_messages; self._delayed_job_cleanup = options.delayed_job_cleanup; + self._delayed_endpoint_close = options.delayed_endpoint_close; // This object emits job_start/job_end events when jobs are // started and deleted. @@ -99,7 +100,8 @@ var JobManager = Base.extend({ } else { _.extend(job_options, { endpoints: [], - max_saved_messages: self._max_saved_messages + max_saved_messages: self._max_saved_messages, + delayed_endpoint_close: self._delayed_endpoint_close }); job = new WebsocketJuttleJob(job_options); } diff --git a/lib/routes.js b/lib/routes.js index 9df7302..160bbf8 100644 --- a/lib/routes.js +++ b/lib/routes.js @@ -32,6 +32,7 @@ function add_routes(app, options) { let max_saved_messages = 1024; let delayed_job_cleanup = 10000; + let delayed_endpoint_close = options.delayed_endpoint_close || 10000; if (_.has(options.config, 'juttled')) { if (_.has(options.config.juttled, 'max_saved_messages')) { @@ -41,10 +42,15 @@ function add_routes(app, options) { if (_.has(options.config.juttled, 'delayed_job_cleanup')) { delayed_job_cleanup = options.config.juttled.delayed_job_cleanup; } + + if (_.has(options.config.juttled, 'delayed_endpoint_close')) { + delayed_endpoint_close = options.config.juttled.delayed_endpoint_close; + } } jobs.init({max_saved_messages: max_saved_messages, delayed_job_cleanup: delayed_job_cleanup, + delayed_endpoint_close: delayed_endpoint_close, config_path: options.config_path, root_directory: options.root_directory}); paths.init({root_directory: options.root_directory}); @@ -89,7 +95,7 @@ function add_routes(app, options) { jobs.subscribe_job); app.ws(API_PREFIX + '/observers/:observer_id', jobs.subscribe_observer); - app.ws(API_PREFIX + '/rendezvous/:topic', + app.ws('/rendezvous/:topic', paths.rendezvous_topic); return router; diff --git a/lib/ws-juttle-job.js b/lib/ws-juttle-job.js index 066ac25..f96b993 100644 --- a/lib/ws-juttle-job.js +++ b/lib/ws-juttle-job.js @@ -24,11 +24,15 @@ var WebsocketJuttleJob = JuttleJob.extend({ self._job_start_msg = undefined; self._job_end_msg = undefined; + self._delayed_endpoint_close = options.delayed_endpoint_close; + // When this job emits an 'end' event, the program has // completed, and we should close all websocket connections. self.events.on('end', function() { - self._endpoints.forEach(function(endpoint) { - endpoint.close(); + setTimeout(function() { + self._endpoints.forEach(function(endpoint) { + endpoint.close(); + }, self._delayed_endpoint_close); }); }); }, @@ -79,7 +83,8 @@ var WebsocketJuttleJob = JuttleJob.extend({ if (self._job_stopped) { logger.debug(self._log_prefix + 'Received job stopped, sending this endpoint a job_stopped message and closing'); endpoint.send(self._job_end_msg); - endpoint.close(); + setTimeout(endpoint.close.bind(endpoint), + self._delayed_endpoint_close); return; } } diff --git a/test/juttle-engine.spec.js b/test/juttle-engine.spec.js index be01f21..62d1258 100644 --- a/test/juttle-engine.spec.js +++ b/test/juttle-engine.spec.js @@ -94,7 +94,7 @@ describe('Juttle Engine Tests', function() { findFreePort(10000, 20000) .then((freePort) => { juttleBaseUrl = 'http://localhost:' + freePort + '/api/v0'; - juttled = new JuttleEngine({port: freePort, root_directory: juttleRoot}); + juttled = new JuttleEngine({port: freePort, root_directory: juttleRoot, delayed_endpoint_close: 2000}); }); }); @@ -1153,6 +1153,7 @@ describe('Juttle Engine Tests', function() { var num_ticks = 0; var num_marks = 0; var num_sink_ends = 0; + var got_job_end_time = undefined; ws_client = new WebSocket(juttleBaseUrl + '/jobs/' + job_id); ws_client.on('message', function(data) { //console.log("Got Websocket:", data); @@ -1186,6 +1187,7 @@ describe('Juttle Engine Tests', function() { } ]); } else if (data.type === 'job_end') { + got_job_end_time = Date.now(); expect(data.job_id === job_id); // Now check that we received all the ticks/marks/etc we expected. @@ -1196,7 +1198,6 @@ describe('Juttle Engine Tests', function() { expect(num_marks).to.be.equal(6); expect(num_sink_ends).to.equal(2); - done(); } else if (data.type === 'tick') { num_ticks++; expect(data.sink_id).to.match(/view\d+/); @@ -1226,6 +1227,19 @@ describe('Juttle Engine Tests', function() { } } }); + + ws_client.on('close', function(data) { + + // There should be at least 1 second between + // the job_end message and the websocket + // closing. This shows that + // delayed_websocket_close is actually + // working. + + var got_ws_close_time = Date.now(); + expect(got_ws_close_time-got_job_end_time).to.be.at.least(1000); + done(); + }); }); };