From 9af914fd284fb89cc9662c2be468d31229a87c0f Mon Sep 17 00:00:00 2001 From: Chris Loer Date: Thu, 18 Jan 2018 10:33:39 -0800 Subject: [PATCH] Coalesce GeoJSON "loadData" requests. Re-implement PR #5902 including fix for issue #5970. --- src/source/geojson_source.js | 16 ++- src/source/geojson_worker_source.js | 110 ++++++++++++++-- test/unit/source/geojson_source.test.js | 16 ++- .../unit/source/geojson_worker_source.test.js | 117 ++++++++++++++++++ 4 files changed, 243 insertions(+), 16 deletions(-) diff --git a/src/source/geojson_source.js b/src/source/geojson_source.js index abd840e761c..e708d5f9e6c 100644 --- a/src/source/geojson_source.js +++ b/src/source/geojson_source.js @@ -181,9 +181,19 @@ class GeoJSONSource extends Evented implements Source { // target {this.type}.loadData rather than literally geojson.loadData, // so that other geojson-like source types can easily reuse this // implementation - this.workerID = this.dispatcher.send(`${this.type}.loadData`, options, (err) => { - this._loaded = true; - callback(err); + this.workerID = this.dispatcher.send(`${this.type}.loadData`, options, (err, abandoned) => { + if (!abandoned) { + this._loaded = true; + // Any `loadData` calls that piled up while we were processing + // this one will get coalesced into a single call when this + // 'coalesce' message is processed. + // We would self-send from the worker if we had access to its + // message queue. Waiting instead for the 'coalesce' to round-trip + // through the foreground just means we're throttling the worker + // to run at a little less than full-throttle. + this.dispatcher.send(`${this.type}.coalesce`, this.workerOptions, null, this.workerID); + callback(err); + } }, this.workerID); } diff --git a/src/source/geojson_worker_source.js b/src/source/geojson_worker_source.js index 36646e9e891..1bab42bc21a 100644 --- a/src/source/geojson_worker_source.js +++ b/src/source/geojson_worker_source.js @@ -6,6 +6,7 @@ const GeoJSONWrapper = require('./geojson_wrapper'); const vtpbf = require('vt-pbf'); const supercluster = require('supercluster'); const geojsonvt = require('geojson-vt'); +const assert = require('assert'); const VectorTileWorkerSource = require('./vector_tile_worker_source'); @@ -31,6 +32,10 @@ export type LoadGeoJSONParameters = { geojsonVtOptions?: Object }; +export type CoalesceParameters = { + source: string +}; + export type LoadGeoJSON = (params: LoadGeoJSONParameters, callback: Callback) => void; export interface GeoJSONIndex { @@ -40,11 +45,11 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC const source = params.source, canonical = params.tileID.canonical; - if (!this._geoJSONIndexes[source]) { + if (!this._sources[source] || !this._sources[source].geoJSONIndex) { return callback(null, null); // we couldn't load the file } - const geoJSONTile = this._geoJSONIndexes[source].getTile(canonical.z, canonical.x, canonical.y); + const geoJSONTile = this._sources[source].geoJSONIndex.getTile(canonical.z, canonical.x, canonical.y); if (!geoJSONTile) { return callback(null, null); // nothing in the given tile } @@ -66,6 +71,11 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC }); } +export type SourceState = + | 'Idle' // Source empty or data loaded + | 'Coalescing' // Data finished loading, but discard 'loadData' messages until receiving 'coalesced' + | 'NeedsLoadData'; // 'loadData' received while coalescing, trigger one more 'loadData' on receiving 'coalesced' + /** * The {@link WorkerSource} implementation that supports {@link GeoJSONSource}. * This class is designed to be easily reused to support custom source types @@ -77,8 +87,13 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC * @private */ class GeoJSONWorkerSource extends VectorTileWorkerSource { - _geoJSONIndexes: { [string]: GeoJSONIndex }; loadGeoJSON: LoadGeoJSON; + _sources: { [string]: { + state?: SourceState, + pendingCallback?: Callback, + pendingLoadDataParams?: LoadGeoJSONParameters, + geoJSONIndex?: GeoJSONIndex // object mapping source ids to geojson-vt-like tile indexes + }}; /** * @param [loadGeoJSON] Optional method for custom loading/parsing of @@ -90,8 +105,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { if (loadGeoJSON) { this.loadGeoJSON = loadGeoJSON; } - // object mapping source ids to geojson-vt-like tile indexes - this._geoJSONIndexes = {}; + this._sources = {}; } /** @@ -102,11 +116,51 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { * Defers to {@link GeoJSONWorkerSource#loadGeoJSON} for the fetching/parsing, * expecting `callback(error, data)` to be called with either an error or a * parsed GeoJSON object. + * + * When `loadData` requests come in faster than they can be processed, + * they are coalesced into a single request using the latest data. + * See {@link GeoJSONWorkerSource#coalesce} + * * @param params * @param params.source The id of the source. * @param callback */ - loadData(params: LoadGeoJSONParameters, callback: Callback) { + loadData(params: LoadGeoJSONParameters, callback: Callback) { + if (!this._sources[params.source]) { + this._sources[params.source] = {}; + } + const source = this._sources[params.source]; + + if (source.pendingCallback) { + // Tell the foreground the previous call has been abandoned + source.pendingCallback(null, true); + } + source.pendingCallback = callback; + source.pendingLoadDataParams = params; + + if (source.state && + source.state !== 'Idle') { + source.state = 'NeedsLoadData'; + } else { + source.state = 'Coalescing'; + this._loadData(params.source); + } + } + + /** + * Internal implementation: called directly by `loadData` + * or by `coalesce` using stored parameters. + */ + _loadData(sourceId: string) { + const source = this._sources[sourceId]; + if (!source.pendingCallback || !source.pendingLoadDataParams) { + assert(false); + return; + } + const callback = source.pendingCallback; + const params = source.pendingLoadDataParams; + delete source.pendingCallback; + delete source.pendingLoadDataParams; this.loadGeoJSON(params, (err, data) => { if (err || !data) { return callback(err); @@ -116,7 +170,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { rewind(data, true); try { - this._geoJSONIndexes[params.source] = params.cluster ? + source.geoJSONIndex = params.cluster ? supercluster(params.superclusterOptions).load(data.features) : geojsonvt(data, params.geojsonVtOptions); } catch (err) { @@ -129,6 +183,39 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { }); } + /** + * While processing `loadData`, we coalesce all further + * `loadData` messages into a single call to _loadData + * that will happen once we've finished processing the + * first message. {@link GeoJSONSource#_updateWorkerData} + * is responsible for sending us the `coalesce` message + * at the time it receives a response from `loadData` + * + * State: Idle + * ↑ | + * 'coalesce' 'loadData' + * | (triggers load) + * | ↓ + * State: Coalescing + * ↑ | + * (triggers load) | + * 'coalesce' 'loadData' + * | ↓ + * State: NeedsLoadData + */ + coalesce(params: CoalesceParameters) { + const source = this._sources[params.source]; + if (!source) { + return; // coalesce queued after removeSource + } + if (source.state === 'Coalescing') { + source.state = 'Idle'; + } else if (source.state === 'NeedsLoadData') { + source.state = 'Coalescing'; + this._loadData(params.source); + } + } + /** * Implements {@link WorkerSource#reloadTile}. * @@ -180,8 +267,13 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { } removeSource(params: {source: string}, callback: Callback) { - if (this._geoJSONIndexes[params.source]) { - delete this._geoJSONIndexes[params.source]; + const removedSource = this._sources[params.source]; + if (removedSource) { + if (removedSource.pendingCallback) { + // Don't leak callbacks + removedSource.pendingCallback(null, true); + } + delete this._sources[params.source]; } callback(); } diff --git a/test/unit/source/geojson_source.test.js b/test/unit/source/geojson_source.test.js index a0c75863451..4bfe0039220 100644 --- a/test/unit/source/geojson_source.test.js +++ b/test/unit/source/geojson_source.test.js @@ -49,7 +49,9 @@ test('GeoJSONSource#setData', (t) => { function createSource() { return new GeoJSONSource('id', {data: {}}, { send: function (type, data, callback) { - return setTimeout(callback, 0); + if (callback) { + return setTimeout(callback, 0); + } } }); } @@ -153,7 +155,9 @@ test('GeoJSONSource#update', (t) => { t.test('fires event when metadata loads', (t) => { const mockDispatcher = { send: function(message, args, callback) { - setTimeout(callback, 0); + if (callback) { + setTimeout(callback, 0); + } } }; @@ -169,7 +173,9 @@ test('GeoJSONSource#update', (t) => { t.test('fires "error"', (t) => { const mockDispatcher = { send: function(message, args, callback) { - setTimeout(callback.bind(null, 'error'), 0); + if (callback) { + setTimeout(callback.bind(null, 'error'), 0); + } } }; @@ -190,7 +196,9 @@ test('GeoJSONSource#update', (t) => { if (message === 'geojson.loadData' && --expectedLoadDataCalls <= 0) { t.end(); } - setTimeout(callback, 0); + if (callback) { + setTimeout(callback, 0); + } } }; diff --git a/test/unit/source/geojson_worker_source.test.js b/test/unit/source/geojson_worker_source.test.js index dce1669b9f0..a645b24f2fd 100644 --- a/test/unit/source/geojson_worker_source.test.js +++ b/test/unit/source/geojson_worker_source.test.js @@ -86,6 +86,7 @@ test('reloadTile', (t) => { function addData(callback) { source.loadData({ source: 'sourceId', data: JSON.stringify(geoJson) }, (err) => { + source.coalesce({ source: 'sourceId' }); t.equal(err, null); callback(); }); @@ -131,3 +132,119 @@ test('reloadTile', (t) => { t.end(); }); + + +test('loadData', (t) => { + const layers = [ + { + id: 'layer1', + source: 'source1', + type: 'symbol', + }, + { + id: 'layer2', + source: 'source2', + type: 'symbol', + } + ]; + + const geoJson = { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [0, 0] + } + }; + + const layerIndex = new StyleLayerIndex(layers); + function createWorker() { + const worker = new GeoJSONWorkerSource(null, layerIndex); + + // Making the call to loadGeoJSON to asynchronous + // allows these tests to mimic a message queue building up + // (regardless of timing) + const originalLoadGeoJSON = worker.loadGeoJSON; + worker.loadGeoJSON = function(params, callback) { + setTimeout(() => { + originalLoadGeoJSON(params, callback); + }, 0); + }; + return worker; + } + + t.test('abandons coalesced callbacks', (t) => { + // Expect first call to run, second to be abandoned, + // and third to run in response to coalesce + const worker = createWorker(); + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + worker.coalesce({ source: 'source1' }); + }); + + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.ok(abandoned); + }); + + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + t.end(); + }); + }); + + t.test('does not mix coalesce state between sources', (t) => { + // Expect first and second calls to run independently, + // and third call should run in response to coalesce + // from first call. + const worker = createWorker(); + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + worker.coalesce({ source: 'source1' }); + }); + + worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + }); + + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + t.end(); + }); + }); + + t.test('does not mix stored callbacks between sources', (t) => { + // Two loadData calls per source means no calls should + // be abandoned. + const worker = createWorker(); + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + worker.coalesce({ source: 'source1' }); + }); + + worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + worker.coalesce({ source: 'source2' }); + }); + + worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + // test ends here because source2 has the last coalesce call + t.end(); + }); + + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + }); + }); + + t.end(); +});