Skip to content

Commit

Permalink
Coalesce GeoJSON "loadData" requests.
Browse files Browse the repository at this point in the history
Re-implement PR #5902 including fix for issue #5970.
  • Loading branch information
ChrisLoer committed Jan 19, 2018
1 parent 94f775a commit 9af914f
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 16 deletions.
16 changes: 13 additions & 3 deletions src/source/geojson_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,19 @@ class GeoJSONSource extends Evented implements Source {
// target {this.type}.loadData rather than literally geojson.loadData,
// so that other geojson-like source types can easily reuse this
// implementation
this.workerID = this.dispatcher.send(`${this.type}.loadData`, options, (err) => {
this._loaded = true;
callback(err);
this.workerID = this.dispatcher.send(`${this.type}.loadData`, options, (err, abandoned) => {
if (!abandoned) {
this._loaded = true;
// Any `loadData` calls that piled up while we were processing
// this one will get coalesced into a single call when this
// 'coalesce' message is processed.
// We would self-send from the worker if we had access to its
// message queue. Waiting instead for the 'coalesce' to round-trip
// through the foreground just means we're throttling the worker
// to run at a little less than full-throttle.
this.dispatcher.send(`${this.type}.coalesce`, this.workerOptions, null, this.workerID);
callback(err);
}
}, this.workerID);
}

Expand Down
110 changes: 101 additions & 9 deletions src/source/geojson_worker_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const GeoJSONWrapper = require('./geojson_wrapper');
const vtpbf = require('vt-pbf');
const supercluster = require('supercluster');
const geojsonvt = require('geojson-vt');
const assert = require('assert');

const VectorTileWorkerSource = require('./vector_tile_worker_source');

Expand All @@ -31,6 +32,10 @@ export type LoadGeoJSONParameters = {
geojsonVtOptions?: Object
};

export type CoalesceParameters = {
source: string
};

export type LoadGeoJSON = (params: LoadGeoJSONParameters, callback: Callback<mixed>) => void;

export interface GeoJSONIndex {
Expand All @@ -40,11 +45,11 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC
const source = params.source,
canonical = params.tileID.canonical;

if (!this._geoJSONIndexes[source]) {
if (!this._sources[source] || !this._sources[source].geoJSONIndex) {
return callback(null, null); // we couldn't load the file
}

const geoJSONTile = this._geoJSONIndexes[source].getTile(canonical.z, canonical.x, canonical.y);
const geoJSONTile = this._sources[source].geoJSONIndex.getTile(canonical.z, canonical.x, canonical.y);
if (!geoJSONTile) {
return callback(null, null); // nothing in the given tile
}
Expand All @@ -66,6 +71,11 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC
});
}

export type SourceState =
| 'Idle' // Source empty or data loaded
| 'Coalescing' // Data finished loading, but discard 'loadData' messages until receiving 'coalesced'
| 'NeedsLoadData'; // 'loadData' received while coalescing, trigger one more 'loadData' on receiving 'coalesced'

/**
* The {@link WorkerSource} implementation that supports {@link GeoJSONSource}.
* This class is designed to be easily reused to support custom source types
Expand All @@ -77,8 +87,13 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC
* @private
*/
class GeoJSONWorkerSource extends VectorTileWorkerSource {
_geoJSONIndexes: { [string]: GeoJSONIndex };
loadGeoJSON: LoadGeoJSON;
_sources: { [string]: {
state?: SourceState,
pendingCallback?: Callback<boolean>,
pendingLoadDataParams?: LoadGeoJSONParameters,
geoJSONIndex?: GeoJSONIndex // object mapping source ids to geojson-vt-like tile indexes
}};

/**
* @param [loadGeoJSON] Optional method for custom loading/parsing of
Expand All @@ -90,8 +105,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
if (loadGeoJSON) {
this.loadGeoJSON = loadGeoJSON;
}
// object mapping source ids to geojson-vt-like tile indexes
this._geoJSONIndexes = {};
this._sources = {};
}

/**
Expand All @@ -102,11 +116,51 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
* Defers to {@link GeoJSONWorkerSource#loadGeoJSON} for the fetching/parsing,
* expecting `callback(error, data)` to be called with either an error or a
* parsed GeoJSON object.
*
* When `loadData` requests come in faster than they can be processed,
* they are coalesced into a single request using the latest data.
* See {@link GeoJSONWorkerSource#coalesce}
*
* @param params
* @param params.source The id of the source.
* @param callback
*/
loadData(params: LoadGeoJSONParameters, callback: Callback<void>) {
loadData(params: LoadGeoJSONParameters, callback: Callback<boolean>) {
if (!this._sources[params.source]) {
this._sources[params.source] = {};
}
const source = this._sources[params.source];

if (source.pendingCallback) {
// Tell the foreground the previous call has been abandoned
source.pendingCallback(null, true);
}
source.pendingCallback = callback;
source.pendingLoadDataParams = params;

if (source.state &&
source.state !== 'Idle') {
source.state = 'NeedsLoadData';
} else {
source.state = 'Coalescing';
this._loadData(params.source);
}
}

/**
* Internal implementation: called directly by `loadData`
* or by `coalesce` using stored parameters.
*/
_loadData(sourceId: string) {
const source = this._sources[sourceId];
if (!source.pendingCallback || !source.pendingLoadDataParams) {
assert(false);
return;
}
const callback = source.pendingCallback;
const params = source.pendingLoadDataParams;
delete source.pendingCallback;
delete source.pendingLoadDataParams;
this.loadGeoJSON(params, (err, data) => {
if (err || !data) {
return callback(err);
Expand All @@ -116,7 +170,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
rewind(data, true);

try {
this._geoJSONIndexes[params.source] = params.cluster ?
source.geoJSONIndex = params.cluster ?
supercluster(params.superclusterOptions).load(data.features) :
geojsonvt(data, params.geojsonVtOptions);
} catch (err) {
Expand All @@ -129,6 +183,39 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
});
}

/**
* While processing `loadData`, we coalesce all further
* `loadData` messages into a single call to _loadData
* that will happen once we've finished processing the
* first message. {@link GeoJSONSource#_updateWorkerData}
* is responsible for sending us the `coalesce` message
* at the time it receives a response from `loadData`
*
* State: Idle
* ↑ |
* 'coalesce' 'loadData'
* | (triggers load)
* | ↓
* State: Coalescing
* ↑ |
* (triggers load) |
* 'coalesce' 'loadData'
* | ↓
* State: NeedsLoadData
*/
coalesce(params: CoalesceParameters) {
const source = this._sources[params.source];
if (!source) {
return; // coalesce queued after removeSource
}
if (source.state === 'Coalescing') {
source.state = 'Idle';
} else if (source.state === 'NeedsLoadData') {
source.state = 'Coalescing';
this._loadData(params.source);
}
}

/**
* Implements {@link WorkerSource#reloadTile}.
*
Expand Down Expand Up @@ -180,8 +267,13 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
}

removeSource(params: {source: string}, callback: Callback<mixed>) {
if (this._geoJSONIndexes[params.source]) {
delete this._geoJSONIndexes[params.source];
const removedSource = this._sources[params.source];
if (removedSource) {
if (removedSource.pendingCallback) {
// Don't leak callbacks
removedSource.pendingCallback(null, true);
}
delete this._sources[params.source];
}
callback();
}
Expand Down
16 changes: 12 additions & 4 deletions test/unit/source/geojson_source.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ test('GeoJSONSource#setData', (t) => {
function createSource() {
return new GeoJSONSource('id', {data: {}}, {
send: function (type, data, callback) {
return setTimeout(callback, 0);
if (callback) {
return setTimeout(callback, 0);
}
}
});
}
Expand Down Expand Up @@ -153,7 +155,9 @@ test('GeoJSONSource#update', (t) => {
t.test('fires event when metadata loads', (t) => {
const mockDispatcher = {
send: function(message, args, callback) {
setTimeout(callback, 0);
if (callback) {
setTimeout(callback, 0);
}
}
};

Expand All @@ -169,7 +173,9 @@ test('GeoJSONSource#update', (t) => {
t.test('fires "error"', (t) => {
const mockDispatcher = {
send: function(message, args, callback) {
setTimeout(callback.bind(null, 'error'), 0);
if (callback) {
setTimeout(callback.bind(null, 'error'), 0);
}
}
};

Expand All @@ -190,7 +196,9 @@ test('GeoJSONSource#update', (t) => {
if (message === 'geojson.loadData' && --expectedLoadDataCalls <= 0) {
t.end();
}
setTimeout(callback, 0);
if (callback) {
setTimeout(callback, 0);
}
}
};

Expand Down
117 changes: 117 additions & 0 deletions test/unit/source/geojson_worker_source.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ test('reloadTile', (t) => {

function addData(callback) {
source.loadData({ source: 'sourceId', data: JSON.stringify(geoJson) }, (err) => {
source.coalesce({ source: 'sourceId' });
t.equal(err, null);
callback();
});
Expand Down Expand Up @@ -131,3 +132,119 @@ test('reloadTile', (t) => {

t.end();
});


test('loadData', (t) => {
const layers = [
{
id: 'layer1',
source: 'source1',
type: 'symbol',
},
{
id: 'layer2',
source: 'source2',
type: 'symbol',
}
];

const geoJson = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [0, 0]
}
};

const layerIndex = new StyleLayerIndex(layers);
function createWorker() {
const worker = new GeoJSONWorkerSource(null, layerIndex);

// Making the call to loadGeoJSON to asynchronous
// allows these tests to mimic a message queue building up
// (regardless of timing)
const originalLoadGeoJSON = worker.loadGeoJSON;
worker.loadGeoJSON = function(params, callback) {
setTimeout(() => {
originalLoadGeoJSON(params, callback);
}, 0);
};
return worker;
}

t.test('abandons coalesced callbacks', (t) => {
// Expect first call to run, second to be abandoned,
// and third to run in response to coalesce
const worker = createWorker();
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.notOk(abandoned);
worker.coalesce({ source: 'source1' });
});

worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.ok(abandoned);
});

worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.notOk(abandoned);
t.end();
});
});

t.test('does not mix coalesce state between sources', (t) => {
// Expect first and second calls to run independently,
// and third call should run in response to coalesce
// from first call.
const worker = createWorker();
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.notOk(abandoned);
worker.coalesce({ source: 'source1' });
});

worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.notOk(abandoned);
});

worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.notOk(abandoned);
t.end();
});
});

t.test('does not mix stored callbacks between sources', (t) => {
// Two loadData calls per source means no calls should
// be abandoned.
const worker = createWorker();
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.notOk(abandoned);
worker.coalesce({ source: 'source1' });
});

worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.notOk(abandoned);
worker.coalesce({ source: 'source2' });
});

worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.notOk(abandoned);
// test ends here because source2 has the last coalesce call
t.end();
});

worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
t.equal(err, null);
t.notOk(abandoned);
});
});

t.end();
});

0 comments on commit 9af914f

Please sign in to comment.