Skip to content

Commit

Permalink
improve client-side SO client get pooling (elastic#82603)
Browse files Browse the repository at this point in the history
  • Loading branch information
pgayvallet authored Nov 5, 2020
1 parent 706be6b commit ed47da8
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 28 deletions.
51 changes: 51 additions & 0 deletions src/core/public/saved_objects/saved_objects_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,57 @@ describe('SavedObjectsClient', () => {
`);
});

test('removes duplicates when calling `_bulk_get`', async () => {
// Await #get call to ensure batchQueue is empty and throttle has reset
await savedObjectsClient.get('type2', doc.id);
http.fetch.mockClear();

savedObjectsClient.get(doc.type, doc.id);
savedObjectsClient.get('some-type', 'some-id');
await savedObjectsClient.get(doc.type, doc.id);

expect(http.fetch).toHaveBeenCalledTimes(1);
expect(http.fetch.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"/api/saved_objects/_bulk_get",
Object {
"body": "[{\\"id\\":\\"AVwSwFxtcMV38qjDZoQg\\",\\"type\\":\\"config\\"},{\\"id\\":\\"some-id\\",\\"type\\":\\"some-type\\"}]",
"method": "POST",
"query": undefined,
},
]
`);
});

test('resolves with correct object when there are duplicates present', async () => {
// Await #get call to ensure batchQueue is empty and throttle has reset
await savedObjectsClient.get('type2', doc.id);
http.fetch.mockClear();

const call1 = savedObjectsClient.get(doc.type, doc.id);
const objFromCall2 = await savedObjectsClient.get(doc.type, doc.id);
const objFromCall1 = await call1;

expect(objFromCall1.type).toBe(doc.type);
expect(objFromCall1.id).toBe(doc.id);

expect(objFromCall2.type).toBe(doc.type);
expect(objFromCall2.id).toBe(doc.id);
});

test('do not share instances or references between duplicate callers', async () => {
// Await #get call to ensure batchQueue is empty and throttle has reset
await savedObjectsClient.get('type2', doc.id);
http.fetch.mockClear();

const call1 = savedObjectsClient.get(doc.type, doc.id);
const objFromCall2 = await savedObjectsClient.get(doc.type, doc.id);
const objFromCall1 = await call1;

objFromCall1.set('title', 'new title');
expect(objFromCall2.get('title')).toEqual('Example title');
});

test('resolves with SimpleSavedObject instance', async () => {
const response = savedObjectsClient.get(doc.type, doc.id);
await expect(response).resolves.toBeInstanceOf(SimpleSavedObject);
Expand Down
79 changes: 51 additions & 28 deletions src/core/public/saved_objects/saved_objects_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { cloneDeep, pick, throttle } from 'lodash';
import { pick, throttle, cloneDeep } from 'lodash';
import { resolve as resolveUrl } from 'url';
import type { PublicMethodsOf } from '@kbn/utility-types';

Expand Down Expand Up @@ -144,6 +144,23 @@ const API_BASE_URL = '/api/saved_objects/';
*/
export type SavedObjectsClientContract = PublicMethodsOf<SavedObjectsClient>;

interface ObjectTypeAndId {
id: string;
type: string;
}

const getObjectsToFetch = (queue: BatchQueueEntry[]): ObjectTypeAndId[] => {
const objects: ObjectTypeAndId[] = [];
const inserted = new Set<string>();
queue.forEach(({ id, type }) => {
if (!inserted.has(`${type}|${id}`)) {
objects.push({ id, type });
inserted.add(`${type}|${id}`);
}
});
return objects;
};

/**
* Saved Objects is Kibana's data persisentence mechanism allowing plugins to
* use Elasticsearch for storing plugin state. The client-side
Expand All @@ -160,31 +177,34 @@ export class SavedObjectsClient {
* Throttled processing of get requests into bulk requests at 100ms interval
*/
private processBatchQueue = throttle(
() => {
const queue = cloneDeep(this.batchQueue);
async () => {
const queue = [...this.batchQueue];
this.batchQueue = [];

this.bulkGet(queue)
.then(({ savedObjects }) => {
queue.forEach((queueItem) => {
const foundObject = savedObjects.find((savedObject) => {
return savedObject.id === queueItem.id && savedObject.type === queueItem.type;
});
try {
const objectsToFetch = getObjectsToFetch(queue);
const { saved_objects: savedObjects } = await this.performBulkGet(objectsToFetch);

if (!foundObject) {
return queueItem.resolve(
this.createSavedObject(pick(queueItem, ['id', 'type']) as SavedObject)
);
}

queueItem.resolve(foundObject);
});
})
.catch((err) => {
queue.forEach((queueItem) => {
queueItem.reject(err);
queue.forEach((queueItem) => {
const foundObject = savedObjects.find((savedObject) => {
return savedObject.id === queueItem.id && savedObject.type === queueItem.type;
});

if (foundObject) {
// multiple calls may have been requested the same object.
// we need to clone to avoid sharing references between the instances
queueItem.resolve(this.createSavedObject(cloneDeep(foundObject)));
} else {
queueItem.resolve(
this.createSavedObject(pick(queueItem, ['id', 'type']) as SavedObject)
);
}
});
} catch (err) {
queue.forEach((queueItem) => {
queueItem.reject(err);
});
}
},
BATCH_INTERVAL,
{ leading: false }
Expand Down Expand Up @@ -383,14 +403,8 @@ export class SavedObjectsClient {
* ])
*/
public bulkGet = (objects: Array<{ id: string; type: string }> = []) => {
const path = this.getPath(['_bulk_get']);
const filteredObjects = objects.map((obj) => pick(obj, ['id', 'type']));

const request: ReturnType<SavedObjectsApi['bulkGet']> = this.savedObjectsFetch(path, {
method: 'POST',
body: JSON.stringify(filteredObjects),
});
return request.then((resp) => {
return this.performBulkGet(filteredObjects).then((resp) => {
resp.saved_objects = resp.saved_objects.map((d) => this.createSavedObject(d));
return renameKeys<
PromiseType<ReturnType<SavedObjectsApi['bulkGet']>>,
Expand All @@ -399,6 +413,15 @@ export class SavedObjectsClient {
});
};

private async performBulkGet(objects: ObjectTypeAndId[]) {
const path = this.getPath(['_bulk_get']);
const request: ReturnType<SavedObjectsApi['bulkGet']> = this.savedObjectsFetch(path, {
method: 'POST',
body: JSON.stringify(objects),
});
return request;
}

/**
* Updates an object
*
Expand Down

0 comments on commit ed47da8

Please sign in to comment.