Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search] Search batching using bfetch #83418

Merged
merged 36 commits into from
Nov 22, 2020
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
20bb351
Use bfetch for search (no abort behavior)
Nov 10, 2020
8b4eb1c
fix merge
Nov 10, 2020
9eebded
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 15, 2020
812a06b
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 16, 2020
aaa3122
Handle request abortion + unit tests
Nov 16, 2020
b1a2cf2
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 16, 2020
2cdfc47
fix jest
Nov 16, 2020
e8fc98f
shim totals in oss
Nov 16, 2020
993e385
proper formatting for errors
Nov 17, 2020
e89a46f
jest, types and docs
Nov 17, 2020
b727e49
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 17, 2020
3de838c
Fix doc
Nov 17, 2020
17de9fa
Remove old search code and rename UI Setting
Nov 17, 2020
56e7a46
jest mocks
Nov 17, 2020
5aaeefa
jest
Nov 17, 2020
c393416
Solve unhanled error
Nov 17, 2020
a1b2979
Use AbortSignal
Nov 17, 2020
811da0a
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 18, 2020
162473d
ts
Nov 18, 2020
8c83657
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 18, 2020
662b810
code review - use abort controller instead of observable
Nov 18, 2020
41f6406
Revert "Remove old search code and rename UI Setting"
Nov 18, 2020
cbd8106
Remove old search code and rename UI Setting
Nov 17, 2020
1ec890a
revert search route
Nov 18, 2020
3e334a7
fix event unsubscribe
Nov 18, 2020
c7aebdf
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 19, 2020
8abff48
code review 2
Nov 19, 2020
b4193e2
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 19, 2020
9d16439
revert filter
Nov 19, 2020
4a1e69c
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 19, 2020
bcd894f
Merge branch 'master' into enhance/use-bfetch-in-search
kibanamachine Nov 21, 2020
d57b8dc
simplify batch done logic
Nov 22, 2020
6b9a4f8
code review
Nov 22, 2020
4bd2a96
Merge branch 'enhance/use-bfetch-in-search' of github.com:lizozom/kib…
Nov 22, 2020
e9103b7
filter items in the beginning
Nov 22, 2020
3fcac6e
jest
Nov 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
```

## Parameters

| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataStartDependencies, DataPublicPluginStart&gt;</code> | |
| { expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
| { bfetch, expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [kibana-plugin-plugins-data-public](./kibana-plugin-plugins-data-public.md) &gt; [SearchInterceptorDeps](./kibana-plugin-plugins-data-public.searchinterceptordeps.md) &gt; [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md)

## SearchInterceptorDeps.bfetch property

<b>Signature:</b>

```typescript
bfetch: BfetchPublicSetup;
```
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface SearchInterceptorDeps

| Property | Type | Description |
| --- | --- | --- |
| [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) | <code>BfetchPublicSetup</code> | |
| [http](./kibana-plugin-plugins-data-public.searchinterceptordeps.http.md) | <code>CoreSetup['http']</code> | |
| [session](./kibana-plugin-plugins-data-public.searchinterceptordeps.session.md) | <code>ISessionService</code> | |
| [startServices](./kibana-plugin-plugins-data-public.searchinterceptordeps.startservices.md) | <code>Promise&lt;[CoreStart, any, unknown]&gt;</code> | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressions, usageCollection }: DataPluginSetupDependencies): {
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): {
__enhance: (enhancements: DataEnhancements) => void;
search: ISearchSetup;
fieldFormats: {
Expand All @@ -21,7 +21,7 @@ setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressio
| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataPluginStartDependencies, DataPluginStart&gt;</code> | |
| { expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
| { bfetch, expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
86 changes: 86 additions & 0 deletions src/plugins/bfetch/public/batching/batch_utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { BatchItem } from './types';
import { getDonePromise } from './batch_utils';
import { defer } from 'src/plugins/kibana_utils/common';

const tick = () => new Promise((resolve) => setTimeout(resolve, 1));

describe('getDonePromise()', () => {
test('Triggers when aborted', async () => {
const abortController = new AbortController();
const item: BatchItem<any, any> = {
future: defer<any>(),
payload: null,
done: false,
signal: abortController.signal,
};
const b = getDonePromise(item);

const spy = jest.fn();

b.then(spy);

abortController.abort();
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(item.done).toBeTruthy();
});

test('Triggers when resolved', async () => {
const abortController = new AbortController();
const item: BatchItem<any, any> = {
future: defer<any>(),
payload: null,
done: false,
signal: abortController.signal,
};
const b = getDonePromise(item);

const spy = jest.fn();

b.then(spy);

item.future.resolve(null);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(item.done).toBeTruthy();
});

test('Triggers when rejected', async () => {
const abortController = new AbortController();
const item: BatchItem<any, any> = {
future: defer<any>(),
payload: null,
done: false,
signal: abortController.signal,
};
const b = getDonePromise(item);

const spy = jest.fn();

b.then(spy);

item.future.reject(null);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(item.done).toBeTruthy();
});
});
50 changes: 50 additions & 0 deletions src/plugins/bfetch/public/batching/batch_utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { BatchItem } from './types';
import { AbortError } from '../../../kibana_utils/public';

export function isBatchDone(items: Array<BatchItem<any, any>>): boolean {
return items.every((item) => item.done);
}

export function getDonePromise(item: BatchItem<any, any>) {
return new Promise<void>((resolve) => {
const onDone = () => {
item.done = true;
if (item.signal) item.signal.removeEventListener('abort', onDone);
resolve();
};
item.future.promise.then(onDone, onDone);
if (item.signal) item.signal.addEventListener('abort', onDone);
});
}

export function rejectOnAbort(item: BatchItem<any, any>) {
const cleanup = () => {
if (item.signal) item.signal.removeEventListener('abort', rejectAborted);
};
const rejectAborted = () => {
item.future.reject(new AbortError());
cleanup();
};

if (item.signal) item.signal.addEventListener('abort', rejectAborted);
item.future.promise.then(cleanup, cleanup);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { defer, of } from '../../../kibana_utils/public';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';

const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Expand Down Expand Up @@ -168,6 +168,28 @@ describe('createStreamingBatchedFunction()', () => {
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});

test('ignores a request with an aborted signal', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
abortController.abort();

of(fn({ foo: 'bar' }, abortController.signal));
fn({ baz: 'quix' });

await new Promise((r) => setTimeout(r, 6));
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ baz: 'quix' }],
});
});

test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
Expand Down Expand Up @@ -423,6 +445,44 @@ describe('createStreamingBatchedFunction()', () => {
expect(result3).toEqual({ b: '3' });
});

describe('when requests are aborted', () => {
lizozom marked this conversation as resolved.
Show resolved Hide resolved
test('rejects promise on abort and lets others continue', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' });
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toBeInstanceOf(AbortError);

stream.next(
JSON.stringify({
id: 1,
result: { b: '2' },
}) + '\n'
);

await new Promise((r) => setTimeout(r, 1));

const [result2] = await of(promise2);
expect(result2).toEqual({ b: '2' });
});
});

describe('when stream closes prematurely', () => {
test('rejects pending promises with CONNECTION error code', async () => {
const { fetchStreaming, stream } = setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { defer, Defer } from '../../../kibana_utils/public';
import { AbortError, defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
Expand All @@ -27,13 +27,8 @@ import {
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';

export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
}

export type BatchedFunc<Payload, Result> = (payload: Payload) => Promise<Result>;
import { BatchedFunc, BatchItem } from './types';
import { isBatchDone, getDonePromise, rejectOnAbort } from './batch_utils';

export interface BatchedFunctionProtocolError extends ErrorLike {
code: string;
Expand Down Expand Up @@ -83,31 +78,63 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
maxItemAge = 10,
} = params;
const [fn] = createBatchedFunction<BatchedFunc<Payload, Result>, BatchItem<Payload, Result>>({
onCall: (payload: Payload) => {
onCall: (payload: Payload, signal?: AbortSignal) => {
const future = defer<Result>();
const entry: BatchItem<Payload, Result> = {
payload,
future,
signal,
done: false,
};
return [future.promise, entry];
},
onBatch: async (items) => {
try {
let responsesReceived = 0;
const batch = items.map(({ payload }) => payload);
const promises: Array<Promise<void>> = [];
const abortController = new AbortController();

// Filter out and reject any items who's signal is already aborted
items = items.filter((item) => {
if (item.signal?.aborted) {
item.future.reject(new AbortError());
}
return !item.signal?.aborted;
});

// Prepare batch
lizozom marked this conversation as resolved.
Show resolved Hide resolved
const batch = items
.filter((item) => {
if (item.signal?.aborted) {
item.future.reject(new AbortError());
}
return !item.signal?.aborted;
lizozom marked this conversation as resolved.
Show resolved Hide resolved
})
.map((item) => {
// Subscribe to reject promise on abort
rejectOnAbort(item);

// Track batch progress
lizozom marked this conversation as resolved.
Show resolved Hide resolved
promises.push(getDonePromise(item));
lizozom marked this conversation as resolved.
Show resolved Hide resolved

// Return payload to be sent
lizozom marked this conversation as resolved.
Show resolved Hide resolved
return item.payload;
});

// abort when all items were either resolved, rejected or aborted
Promise.all(promises).then(() => abortController.abort());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand it correctly that:

  1. Each item in a batch will come with optional signal, which can abort that specific item?
  2. If all items are aborted than the whole batch should be aborted, too?

If that is the case, I think it is better to implement it as it was before with the counters. Putting back the responsesReceived counter and possibly introducing a counter for aborted items, like numberOfAbortedItems. And then use those counts to abort the whole batch when needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You understood correctly and I thought so too at first, but the counters are problematic, because the same request can be received and then aborted, you'd need to be very careful not to count the same request twice towards both counters.

I think it's simpler and more straight forward with the implementation I did here.
Do you see any problems with my implementation, other than stlistically?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a small problem, which practically might not be an issue. The abortController.abort() is called whenever all items in the batch have somehow completed—either resolved or rejected, or aborted; in any combination.

But my understanding is that we don't need to call abortController.abort() when, for example, all items successfully resolved. We need to call abortController.abort() only when all items have completed and at least one of them completed due to an abort signal.

Do you think we should be that specific? Or aborting the request every time, even when it already finished successfully is a noop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters, as fromStreamingXhr makes sure not to apply abort to an already complete xhr request.
We could track each resolution separately, but I don't think it's worth it.


const { stream } = fetchStreamingInjected({
url,
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
});
stream.pipe(split('\n')).subscribe({
next: (json: string) => {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
if (response.error) {
responsesReceived++;
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
responsesReceived++;
items[response.id].future.resolve(response.result);
}
},
Expand All @@ -117,7 +144,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
for (const { future } of items) future.reject(normalizedError);
},
complete: () => {
const streamTerminatedPrematurely = responsesReceived !== items.length;
const streamTerminatedPrematurely = !isBatchDone(items);
lizozom marked this conversation as resolved.
Show resolved Hide resolved
if (streamTerminatedPrematurely) {
const error: BatchedFunctionProtocolError = {
message: 'Connection terminated prematurely.',
Expand Down
Loading