diff --git a/src/source/geojson_source.js b/src/source/geojson_source.js index c4d72eb2bdb..e8e3ef4ce10 100644 --- a/src/source/geojson_source.js +++ b/src/source/geojson_source.js @@ -201,13 +201,22 @@ class GeoJSONSource extends Evented implements Source { // target {this.type}.loadData rather than literally geojson.loadData, // so that other geojson-like source types can easily reuse this // implementation - this.workerID = this.dispatcher.send(`${this.type}.loadData`, options, (err, result) => { - this._loaded = true; - - if (result && result.resourceTiming && result.resourceTiming[this.id]) - this._resourceTiming = result.resourceTiming[this.id].slice(0); - - callback(err); + this.workerID = this.dispatcher.send(`${this.type}.loadData`, options, (err, abandoned) => { + if (!abandoned) { + this._loaded = true; + + if (result && result.resourceTiming && result.resourceTiming[this.id]) + this._resourceTiming = result.resourceTiming[this.id].slice(0); + // Any `loadData` calls that piled up while we were processing + // this one will get coalesced into a single call when this + // 'coalesce' message is processed. + // We would self-send from the worker if we had access to its + // message queue. Waiting instead for the 'coalesce' to round-trip + // through the foreground just means we're throttling the worker + // to run at a little less than full-throttle. + this.dispatcher.send(`${this.type}.coalesce`, this.workerOptions, null, this.workerID); + callback(err); + } }, this.workerID); } diff --git a/src/source/geojson_worker_source.js b/src/source/geojson_worker_source.js index 26a3d634128..123eb3da263 100644 --- a/src/source/geojson_worker_source.js +++ b/src/source/geojson_worker_source.js @@ -7,6 +7,7 @@ const GeoJSONWrapper = require('./geojson_wrapper'); const vtpbf = require('vt-pbf'); const supercluster = require('supercluster'); const geojsonvt = require('geojson-vt'); +const assert = require('assert'); const VectorTileWorkerSource = require('./vector_tile_worker_source'); @@ -32,6 +33,10 @@ export type LoadGeoJSONParameters = { geojsonVtOptions?: Object }; +export type CoalesceParameters = { + source: string +}; + export type LoadGeoJSON = (params: LoadGeoJSONParameters, callback: Callback) => void; export interface GeoJSONIndex { @@ -41,11 +46,11 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC const source = params.source, canonical = params.tileID.canonical; - if (!this._geoJSONIndexes[source]) { + if (!this._sources[source] || !this._sources[source].geoJSONIndex) { return callback(null, null); // we couldn't load the file } - const geoJSONTile = this._geoJSONIndexes[source].getTile(canonical.z, canonical.x, canonical.y); + const geoJSONTile = this._sources[source].geoJSONIndex.getTile(canonical.z, canonical.x, canonical.y); if (!geoJSONTile) { return callback(null, null); // nothing in the given tile } @@ -67,6 +72,11 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC }); } +export type SourceState = + | 'Idle' // Source empty or data loaded + | 'Coalescing' // Data finished loading, but discard 'loadData' messages until receiving 'coalesced' + | 'NeedsLoadData'; // 'loadData' received while coalescing, trigger one more 'loadData' on receiving 'coalesced' + /** * The {@link WorkerSource} implementation that supports {@link GeoJSONSource}. * This class is designed to be easily reused to support custom source types @@ -78,8 +88,13 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC * @private */ class GeoJSONWorkerSource extends VectorTileWorkerSource { - _geoJSONIndexes: { [string]: GeoJSONIndex }; loadGeoJSON: LoadGeoJSON; + _sources: { [string]: { + state?: SourceState, + pendingCallback?: Callback, + pendingLoadDataParams?: LoadGeoJSONParameters, + geoJSONIndex?: GeoJSONIndex // object mapping source ids to geojson-vt-like tile indexes + }}; /** * @param [loadGeoJSON] Optional method for custom loading/parsing of @@ -91,8 +106,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { if (loadGeoJSON) { this.loadGeoJSON = loadGeoJSON; } - // object mapping source ids to geojson-vt-like tile indexes - this._geoJSONIndexes = {}; + this._sources = {}; } /** @@ -103,11 +117,51 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { * Defers to {@link GeoJSONWorkerSource#loadGeoJSON} for the fetching/parsing, * expecting `callback(error, data)` to be called with either an error or a * parsed GeoJSON object. + * + * When `loadData` requests come in faster than they can be processed, + * they are coalesced into a single request using the latest data. + * See {@link GeoJSONWorkerSource#coalesce} + * * @param params * @param params.source The id of the source. * @param callback */ - loadData(params: LoadGeoJSONParameters, callback: Callback<{[string]: {[string]: Array}}>) { + loadData(params: LoadGeoJSONParameters, callback: Callback) { + if (!this._sources[params.source]) { + this._sources[params.source] = {}; + } + const source = this._sources[params.source]; + + if (source.pendingCallback) { + // Tell the foreground the previous call has been abandoned + source.pendingCallback(null, true); + } + source.pendingCallback = callback; + source.pendingLoadDataParams = params; + + if (source.state && + source.state !== 'Idle') { + source.state = 'NeedsLoadData'; + } else { + source.state = 'Coalescing'; + this._loadData(params.source); + } + } + + /** + * Internal implementation: called directly by `loadData` + * or by `coalesce` using stored parameters. + */ + _loadData(sourceId: string) { + const source = this._sources[sourceId]; + if (!source.pendingCallback || !source.pendingLoadDataParams) { + assert(false); + return; + } + const callback = source.pendingCallback; + const params = source.pendingLoadDataParams; + delete source.pendingCallback; + delete source.pendingLoadDataParams; this.loadGeoJSON(params, (err, data) => { if (err || !data) { return callback(err); @@ -117,7 +171,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { rewind(data, true); try { - this._geoJSONIndexes[params.source] = params.cluster ? + source.geoJSONIndex = params.cluster ? supercluster(params.superclusterOptions).load(data.features) : geojsonvt(data, params.geojsonVtOptions); } catch (err) { @@ -141,6 +195,39 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { }); } + /** + * While processing `loadData`, we coalesce all further + * `loadData` messages into a single call to _loadData + * that will happen once we've finished processing the + * first message. {@link GeoJSONSource#_updateWorkerData} + * is responsible for sending us the `coalesce` message + * at the time it receives a response from `loadData` + * + * State: Idle + * ↑ | + * 'coalesce' 'loadData' + * | (triggers load) + * | ↓ + * State: Coalescing + * ↑ | + * (triggers load) | + * 'coalesce' 'loadData' + * | ↓ + * State: NeedsLoadData + */ + coalesce(params: CoalesceParameters) { + const source = this._sources[params.source]; + if (!source) { + return; // coalesce queued after removeSource + } + if (source.state === 'Coalescing') { + source.state = 'Idle'; + } else if (source.state === 'NeedsLoadData') { + source.state = 'Coalescing'; + this._loadData(params.source); + } + } + /** * Implements {@link WorkerSource#reloadTile}. * @@ -192,8 +279,13 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource { } removeSource(params: {source: string}, callback: Callback) { - if (this._geoJSONIndexes[params.source]) { - delete this._geoJSONIndexes[params.source]; + const removedSource = this._sources[params.source]; + if (removedSource) { + if (removedSource.pendingCallback) { + // Don't leak callbacks + removedSource.pendingCallback(null, true); + } + delete this._sources[params.source]; } callback(); } diff --git a/test/unit/source/geojson_source.test.js b/test/unit/source/geojson_source.test.js index fcc28f3bfa2..04641d34a0b 100644 --- a/test/unit/source/geojson_source.test.js +++ b/test/unit/source/geojson_source.test.js @@ -52,7 +52,9 @@ test('GeoJSONSource#setData', (t) => { opts = util.extend(opts, { data: {} }); return new GeoJSONSource('id', opts, { send: function (type, data, callback) { - return setTimeout(callback, 0); + if (callback) { + return setTimeout(callback, 0); + } } }); } @@ -172,7 +174,9 @@ test('GeoJSONSource#update', (t) => { t.test('fires event when metadata loads', (t) => { const mockDispatcher = { send: function(message, args, callback) { - setTimeout(callback, 0); + if (callback) { + setTimeout(callback, 0); + } } }; @@ -188,7 +192,9 @@ test('GeoJSONSource#update', (t) => { t.test('fires "error"', (t) => { const mockDispatcher = { send: function(message, args, callback) { - setTimeout(callback.bind(null, 'error'), 0); + if (callback) { + setTimeout(callback.bind(null, 'error'), 0); + } } }; @@ -209,7 +215,9 @@ test('GeoJSONSource#update', (t) => { if (message === 'geojson.loadData' && --expectedLoadDataCalls <= 0) { t.end(); } - setTimeout(callback, 0); + if (callback) { + setTimeout(callback, 0); + } } }; diff --git a/test/unit/source/geojson_worker_source.test.js b/test/unit/source/geojson_worker_source.test.js index 885ef3868d4..0aa3623ccbb 100644 --- a/test/unit/source/geojson_worker_source.test.js +++ b/test/unit/source/geojson_worker_source.test.js @@ -87,6 +87,7 @@ test('reloadTile', (t) => { function addData(callback) { source.loadData({ source: 'sourceId', data: JSON.stringify(geoJson) }, (err) => { + source.coalesce({ source: 'sourceId' }); t.equal(err, null); callback(); }); @@ -197,3 +198,118 @@ test('resourceTiming', (t) => { t.end(); }); + +test('loadData', (t) => { + const layers = [ + { + id: 'layer1', + source: 'source1', + type: 'symbol', + }, + { + id: 'layer2', + source: 'source2', + type: 'symbol', + } + ]; + + const geoJson = { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [0, 0] + } + }; + + const layerIndex = new StyleLayerIndex(layers); + function createWorker() { + const worker = new GeoJSONWorkerSource(null, layerIndex); + + // Making the call to loadGeoJSON asynchronous + // allows these tests to mimic a message queue building up + // (regardless of timing) + const originalLoadGeoJSON = worker.loadGeoJSON; + worker.loadGeoJSON = function(params, callback) { + setTimeout(() => { + originalLoadGeoJSON(params, callback); + }, 0); + }; + return worker; + } + + t.test('abandons coalesced callbacks', (t) => { + // Expect first call to run, second to be abandoned, + // and third to run in response to coalesce + const worker = createWorker(); + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + worker.coalesce({ source: 'source1' }); + }); + + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.ok(abandoned); + }); + + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + t.end(); + }); + }); + + t.test('does not mix coalesce state between sources', (t) => { + // Expect first and second calls to run independently, + // and third call should run in response to coalesce + // from first call. + const worker = createWorker(); + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + worker.coalesce({ source: 'source1' }); + }); + + worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + }); + + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + t.end(); + }); + }); + + t.test('does not mix stored callbacks between sources', (t) => { + // Two loadData calls per source means no calls should + // be abandoned. + const worker = createWorker(); + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + worker.coalesce({ source: 'source1' }); + }); + + worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + worker.coalesce({ source: 'source2' }); + }); + + worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + // test ends here because source2 has the last coalesce call + t.end(); + }); + + worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => { + t.equal(err, null); + t.notOk(abandoned); + }); + }); + + t.end(); +});