diff --git a/src/util/actor.js b/src/util/actor.js index 7cee0d58dea..52e866427b0 100644 --- a/src/util/actor.js +++ b/src/util/actor.js @@ -26,9 +26,12 @@ class Actor { callbacks: { number: any }; name: string; tasks: { number: any }; - taskQueue: Array; + taskQueue: Array>; cancelCallbacks: { number: Cancelable }; invoker: ThrottledInvoker; + isBatching: boolean; + messages: Array; + buffers: Array; constructor(target: any, parent: any, mapId: ?number) { this.target = target; @@ -38,12 +41,24 @@ class Actor { this.callbacks = {}; this.tasks = {}; this.taskQueue = []; + this.isBatching = false; + this.messages = []; + this.buffers = []; this.cancelCallbacks = {}; bindAll(['receive', 'process'], this); this.invoker = new ThrottledInvoker(this.process); this.target.addEventListener('message', this.receive, false); } + postMessage(message: Object, buffers: Array = []) { + if (this.isBatching) { + this.messages.push(message); + this.buffers = this.buffers.concat(buffers); + } else { + this.target.postMessage([message], buffers); + } + } + /** * Sends a message from a main-thread map to a Worker or from a Worker back to * a main-thread map instance. @@ -62,7 +77,7 @@ class Actor { this.callbacks[id] = callback; } const buffers: Array = []; - this.target.postMessage({ + this.postMessage({ id, type, hasCallback: !!callback, @@ -76,7 +91,7 @@ class Actor { // Set the callback to null so that it never fires after the request is aborted. delete this.callbacks[id]; } - this.target.postMessage({ + this.postMessage({ id, type: '', targetMapId, @@ -87,36 +102,41 @@ class Actor { } receive(message: Object) { - const data = message.data, - id = data.id; + const group: Array = []; + for (const data of message.data) { + const id = data.id; - if (!id) { - return; - } + if (!id) { + continue; + } - if (data.targetMapId && this.mapId !== data.targetMapId) { - return; - } + if (data.targetMapId && this.mapId !== data.targetMapId) { + continue; + } - if (data.type === '') { - // Remove the original request from the queue. This is only possible if it - // hasn't been kicked off yet. The id will remain in the queue, but because - // there is no associated task, it will be dropped once it's time to execute it. - delete this.tasks[id]; - const cancel = this.cancelCallbacks[id]; - delete this.cancelCallbacks[id]; - if (cancel) { - cancel(); + if (data.type === '') { + // Remove the original request from the queue. This is only possible if it + // hasn't been kicked off yet. The id will remain in the queue, but because + // there is no associated task, it will be dropped once it's time to execute it. + delete this.tasks[id]; + const cancel = this.cancelCallbacks[id]; + delete this.cancelCallbacks[id]; + if (cancel) { + cancel(); + } + } else { + // In workers, store the tasks that we need to process before actually processing them. This + // is necessary because we want to keep receiving messages, and in particular, + // messages. Some tasks may take a while in the worker thread, so before + // executing the next task in our queue, postMessage preempts this and + // messages can be processed. We're using a MessageChannel object to get throttle the + // process() flow to one at a time. + this.tasks[id] = data; + group.push(id); } - } else { - // In workers, store the tasks that we need to process before actually processing them. This - // is necessary because we want to keep receiving messages, and in particular, - // messages. Some tasks may take a while in the worker thread, so before - // executing the next task in our queue, postMessage preempts this and - // messages can be processed. We're using a MessageChannel object to get throttle the - // process() flow to one at a time. - this.tasks[id] = data; - this.taskQueue.push(id); + } + if (group.length) { + this.taskQueue.push(group); if (this.isWorker) { this.invoker.trigger(); } else { @@ -127,74 +147,92 @@ class Actor { } } + batched(cb: Function) { + try { + this.isBatching = true; + cb(); + } finally { + this.isBatching = false; + if (this.messages.length) { + this.target.postMessage(this.messages, this.buffers); + this.messages = []; + this.buffers = []; + } + } + } + process() { if (!this.taskQueue.length) { return; } - const id = this.taskQueue.shift(); - const task = this.tasks[id]; - delete this.tasks[id]; + const ids = this.taskQueue.shift(); // Schedule another process call if we know there's more to process _before_ invoking the // current task. This is necessary so that processing continues even if the current task // doesn't execute successfully. if (this.taskQueue.length) { this.invoker.trigger(); } - if (!task) { - // If the task ID doesn't have associated task data anymore, it was canceled. - return; - } + this.batched(() => { + for (const id of ids) { + const task = this.tasks[id]; + delete this.tasks[id]; + if (!task) { + // If the task ID doesn't have associated task data anymore, it was canceled. + continue; + } - if (task.type === '') { - // The done() function in the counterpart has been called, and we are now - // firing the callback in the originating actor, if there is one. - const callback = this.callbacks[id]; - delete this.callbacks[id]; - if (callback) { - // If we get a response, but don't have a callback, the request was canceled. - if (task.error) { - callback(deserialize(task.error)); + if (task.type === '') { + // The done() function in the counterpart has been called, and we are now + // firing the callback in the originating actor, if there is one. + const callback = this.callbacks[id]; + delete this.callbacks[id]; + if (callback) { + // If we get a response, but don't have a callback, the request was canceled. + if (task.error) { + callback(deserialize(task.error)); + } else { + callback(null, deserialize(task.data)); + } + } } else { - callback(null, deserialize(task.data)); - } - } - } else { - let completed = false; - const done = task.hasCallback ? (err, data) => { - completed = true; - delete this.cancelCallbacks[id]; - const buffers: Array = []; - this.target.postMessage({ - id, - type: '', - sourceMapId: this.mapId, - error: err ? serialize(err) : null, - data: serialize(data, buffers) - }, buffers); - } : (_) => { - completed = true; - }; - - let callback = null; - const params = (deserialize(task.data): any); - if (this.parent[task.type]) { - // task.type == 'loadTile', 'removeTile', etc. - callback = this.parent[task.type](task.sourceMapId, params, done); - } else if (this.parent.getWorkerSource) { - // task.type == sourcetype.method - const keys = task.type.split('.'); - const scope = (this.parent: any).getWorkerSource(task.sourceMapId, keys[0], params.source); - callback = scope[keys[1]](params, done); - } else { - // No function was found. - done(new Error(`Could not find function ${task.type}`)); - } + let completed = false; + const done = task.hasCallback ? (err, data) => { + completed = true; + delete this.cancelCallbacks[id]; + const buffers: Array = []; + this.postMessage({ + id, + type: '', + sourceMapId: this.mapId, + error: err ? serialize(err) : null, + data: serialize(data, buffers) + }, buffers); + } : (_) => { + completed = true; + }; + + let callback = null; + const params = (deserialize(task.data): any); + if (this.parent[task.type]) { + // task.type == 'loadTile', 'removeTile', etc. + callback = this.parent[task.type](task.sourceMapId, params, done); + } else if (this.parent.getWorkerSource) { + // task.type == sourcetype.method + const keys = task.type.split('.'); + const scope = (this.parent: any).getWorkerSource(task.sourceMapId, keys[0], params.source); + callback = scope[keys[1]](params, done); + } else { + // No function was found. + done(new Error(`Could not find function ${task.type}`)); + } - if (!completed && callback && callback.cancel) { - // Allows canceling the task as long as it hasn't been completed yet. - this.cancelCallbacks[id] = callback.cancel; + if (!completed && callback && callback.cancel) { + // Allows canceling the task as long as it hasn't been completed yet. + this.cancelCallbacks[id] = callback.cancel; + } + } } - } + }); } remove() {