Skip to content

Commit

Permalink
batch responses
Browse files Browse the repository at this point in the history
  • Loading branch information
msbarry committed Oct 29, 2019
1 parent 0deed6d commit 87688f7
Showing 1 changed file with 121 additions and 83 deletions.
204 changes: 121 additions & 83 deletions src/util/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ class Actor {
callbacks: { number: any };
name: string;
tasks: { number: any };
taskQueue: Array<number>;
taskQueue: Array<Array<number>>;
cancelCallbacks: { number: Cancelable };
invoker: ThrottledInvoker;
isBatching: boolean;
messages: Array<Object>;
buffers: Array<Object>;

constructor(target: any, parent: any, mapId: ?number) {
this.target = target;
Expand All @@ -38,12 +41,24 @@ class Actor {
this.callbacks = {};
this.tasks = {};
this.taskQueue = [];
this.isBatching = false;
this.messages = [];
this.buffers = [];
this.cancelCallbacks = {};
bindAll(['receive', 'process'], this);
this.invoker = new ThrottledInvoker(this.process);
this.target.addEventListener('message', this.receive, false);
}

postMessage(message: Object, buffers: Array<Object> = []) {
if (this.isBatching) {
this.messages.push(message);
this.buffers = this.buffers.concat(buffers);
} else {
this.target.postMessage([message], buffers);
}
}

/**
* Sends a message from a main-thread map to a Worker or from a Worker back to
* a main-thread map instance.
Expand All @@ -62,7 +77,7 @@ class Actor {
this.callbacks[id] = callback;
}
const buffers: Array<Transferable> = [];
this.target.postMessage({
this.postMessage({
id,
type,
hasCallback: !!callback,
Expand All @@ -76,7 +91,7 @@ class Actor {
// Set the callback to null so that it never fires after the request is aborted.
delete this.callbacks[id];
}
this.target.postMessage({
this.postMessage({
id,
type: '<cancel>',
targetMapId,
Expand All @@ -87,36 +102,41 @@ class Actor {
}

receive(message: Object) {
const data = message.data,
id = data.id;
const group: Array<number> = [];
for (const data of message.data) {
const id = data.id;

if (!id) {
return;
}
if (!id) {
continue;
}

if (data.targetMapId && this.mapId !== data.targetMapId) {
return;
}
if (data.targetMapId && this.mapId !== data.targetMapId) {
continue;
}

if (data.type === '<cancel>') {
// Remove the original request from the queue. This is only possible if it
// hasn't been kicked off yet. The id will remain in the queue, but because
// there is no associated task, it will be dropped once it's time to execute it.
delete this.tasks[id];
const cancel = this.cancelCallbacks[id];
delete this.cancelCallbacks[id];
if (cancel) {
cancel();
if (data.type === '<cancel>') {
// Remove the original request from the queue. This is only possible if it
// hasn't been kicked off yet. The id will remain in the queue, but because
// there is no associated task, it will be dropped once it's time to execute it.
delete this.tasks[id];
const cancel = this.cancelCallbacks[id];
delete this.cancelCallbacks[id];
if (cancel) {
cancel();
}
} else {
// In workers, store the tasks that we need to process before actually processing them. This
// is necessary because we want to keep receiving messages, and in particular,
// <cancel> messages. Some tasks may take a while in the worker thread, so before
// executing the next task in our queue, postMessage preempts this and <cancel>
// messages can be processed. We're using a MessageChannel object to get throttle the
// process() flow to one at a time.
this.tasks[id] = data;
group.push(id);
}
} else {
// In workers, store the tasks that we need to process before actually processing them. This
// is necessary because we want to keep receiving messages, and in particular,
// <cancel> messages. Some tasks may take a while in the worker thread, so before
// executing the next task in our queue, postMessage preempts this and <cancel>
// messages can be processed. We're using a MessageChannel object to get throttle the
// process() flow to one at a time.
this.tasks[id] = data;
this.taskQueue.push(id);
}
if (group.length) {
this.taskQueue.push(group);
if (this.isWorker) {
this.invoker.trigger();
} else {
Expand All @@ -127,74 +147,92 @@ class Actor {
}
}

batched(cb: Function) {
try {
this.isBatching = true;
cb();
} finally {
this.isBatching = false;
if (this.messages.length) {
this.target.postMessage(this.messages, this.buffers);
this.messages = [];
this.buffers = [];
}
}
}

process() {
if (!this.taskQueue.length) {
return;
}
const id = this.taskQueue.shift();
const task = this.tasks[id];
delete this.tasks[id];
const ids = this.taskQueue.shift();
// Schedule another process call if we know there's more to process _before_ invoking the
// current task. This is necessary so that processing continues even if the current task
// doesn't execute successfully.
if (this.taskQueue.length) {
this.invoker.trigger();
}
if (!task) {
// If the task ID doesn't have associated task data anymore, it was canceled.
return;
}
this.batched(() => {
for (const id of ids) {
const task = this.tasks[id];
delete this.tasks[id];
if (!task) {
// If the task ID doesn't have associated task data anymore, it was canceled.
continue;
}

if (task.type === '<response>') {
// The done() function in the counterpart has been called, and we are now
// firing the callback in the originating actor, if there is one.
const callback = this.callbacks[id];
delete this.callbacks[id];
if (callback) {
// If we get a response, but don't have a callback, the request was canceled.
if (task.error) {
callback(deserialize(task.error));
if (task.type === '<response>') {
// The done() function in the counterpart has been called, and we are now
// firing the callback in the originating actor, if there is one.
const callback = this.callbacks[id];
delete this.callbacks[id];
if (callback) {
// If we get a response, but don't have a callback, the request was canceled.
if (task.error) {
callback(deserialize(task.error));
} else {
callback(null, deserialize(task.data));
}
}
} else {
callback(null, deserialize(task.data));
}
}
} else {
let completed = false;
const done = task.hasCallback ? (err, data) => {
completed = true;
delete this.cancelCallbacks[id];
const buffers: Array<Transferable> = [];
this.target.postMessage({
id,
type: '<response>',
sourceMapId: this.mapId,
error: err ? serialize(err) : null,
data: serialize(data, buffers)
}, buffers);
} : (_) => {
completed = true;
};

let callback = null;
const params = (deserialize(task.data): any);
if (this.parent[task.type]) {
// task.type == 'loadTile', 'removeTile', etc.
callback = this.parent[task.type](task.sourceMapId, params, done);
} else if (this.parent.getWorkerSource) {
// task.type == sourcetype.method
const keys = task.type.split('.');
const scope = (this.parent: any).getWorkerSource(task.sourceMapId, keys[0], params.source);
callback = scope[keys[1]](params, done);
} else {
// No function was found.
done(new Error(`Could not find function ${task.type}`));
}
let completed = false;
const done = task.hasCallback ? (err, data) => {
completed = true;
delete this.cancelCallbacks[id];
const buffers: Array<Transferable> = [];
this.postMessage({
id,
type: '<response>',
sourceMapId: this.mapId,
error: err ? serialize(err) : null,
data: serialize(data, buffers)
}, buffers);
} : (_) => {
completed = true;
};

let callback = null;
const params = (deserialize(task.data): any);
if (this.parent[task.type]) {
// task.type == 'loadTile', 'removeTile', etc.
callback = this.parent[task.type](task.sourceMapId, params, done);
} else if (this.parent.getWorkerSource) {
// task.type == sourcetype.method
const keys = task.type.split('.');
const scope = (this.parent: any).getWorkerSource(task.sourceMapId, keys[0], params.source);
callback = scope[keys[1]](params, done);
} else {
// No function was found.
done(new Error(`Could not find function ${task.type}`));
}

if (!completed && callback && callback.cancel) {
// Allows canceling the task as long as it hasn't been completed yet.
this.cancelCallbacks[id] = callback.cancel;
if (!completed && callback && callback.cancel) {
// Allows canceling the task as long as it hasn't been completed yet.
this.cancelCallbacks[id] = callback.cancel;
}
}
}
}
});
}

remove() {
Expand Down

0 comments on commit 87688f7

Please sign in to comment.