Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search] Search batching using bfetch #83418

Merged
merged 36 commits into from
Nov 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
20bb351
Use bfetch for search (no abort behavior)
Nov 10, 2020
8b4eb1c
fix merge
Nov 10, 2020
9eebded
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 15, 2020
812a06b
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 16, 2020
aaa3122
Handle request abortion + unit tests
Nov 16, 2020
b1a2cf2
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 16, 2020
2cdfc47
fix jest
Nov 16, 2020
e8fc98f
shim totals in oss
Nov 16, 2020
993e385
proper formatting for errors
Nov 17, 2020
e89a46f
jest, types and docs
Nov 17, 2020
b727e49
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 17, 2020
3de838c
Fix doc
Nov 17, 2020
17de9fa
Remove old search code and rename UI Setting
Nov 17, 2020
56e7a46
jest mocks
Nov 17, 2020
5aaeefa
jest
Nov 17, 2020
c393416
Solve unhanled error
Nov 17, 2020
a1b2979
Use AbortSignal
Nov 17, 2020
811da0a
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 18, 2020
162473d
ts
Nov 18, 2020
8c83657
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 18, 2020
662b810
code review - use abort controller instead of observable
Nov 18, 2020
41f6406
Revert "Remove old search code and rename UI Setting"
Nov 18, 2020
cbd8106
Remove old search code and rename UI Setting
Nov 17, 2020
1ec890a
revert search route
Nov 18, 2020
3e334a7
fix event unsubscribe
Nov 18, 2020
c7aebdf
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 19, 2020
8abff48
code review 2
Nov 19, 2020
b4193e2
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 19, 2020
9d16439
revert filter
Nov 19, 2020
4a1e69c
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 19, 2020
bcd894f
Merge branch 'master' into enhance/use-bfetch-in-search
kibanamachine Nov 21, 2020
d57b8dc
simplify batch done logic
Nov 22, 2020
6b9a4f8
code review
Nov 22, 2020
4bd2a96
Merge branch 'enhance/use-bfetch-in-search' of github.com:lizozom/kib…
Nov 22, 2020
e9103b7
filter items in the beginning
Nov 22, 2020
3fcac6e
jest
Nov 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
```

## Parameters

| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataStartDependencies, DataPublicPluginStart&gt;</code> | |
| { expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
| { bfetch, expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [kibana-plugin-plugins-data-public](./kibana-plugin-plugins-data-public.md) &gt; [SearchInterceptorDeps](./kibana-plugin-plugins-data-public.searchinterceptordeps.md) &gt; [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md)

## SearchInterceptorDeps.bfetch property

<b>Signature:</b>

```typescript
bfetch: BfetchPublicSetup;
```
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface SearchInterceptorDeps

| Property | Type | Description |
| --- | --- | --- |
| [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) | <code>BfetchPublicSetup</code> | |
| [http](./kibana-plugin-plugins-data-public.searchinterceptordeps.http.md) | <code>CoreSetup['http']</code> | |
| [session](./kibana-plugin-plugins-data-public.searchinterceptordeps.session.md) | <code>ISessionService</code> | |
| [startServices](./kibana-plugin-plugins-data-public.searchinterceptordeps.startservices.md) | <code>Promise&lt;[CoreStart, any, unknown]&gt;</code> | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressions, usageCollection }: DataPluginSetupDependencies): {
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): {
__enhance: (enhancements: DataEnhancements) => void;
search: ISearchSetup;
fieldFormats: {
Expand All @@ -21,7 +21,7 @@ setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressio
| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataPluginStartDependencies, DataPluginStart&gt;</code> | |
| { expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
| { bfetch, expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { defer, of } from '../../../kibana_utils/public';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';

const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Expand Down Expand Up @@ -168,6 +168,28 @@ describe('createStreamingBatchedFunction()', () => {
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});

test('ignores a request with an aborted signal', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
abortController.abort();

of(fn({ foo: 'bar' }, abortController.signal));
fn({ baz: 'quix' });

await new Promise((r) => setTimeout(r, 6));
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ baz: 'quix' }],
});
});

test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
Expand Down Expand Up @@ -423,6 +445,73 @@ describe('createStreamingBatchedFunction()', () => {
expect(result3).toEqual({ b: '3' });
});

describe('when requests are aborted', () => {
lizozom marked this conversation as resolved.
Show resolved Hide resolved
test('aborts stream when all are aborted', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' }, abortController.signal);
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);
expect(await isPending(promise2)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
expect(await isPending(promise2)).toBe(false);
const [, error] = await of(promise);
const [, error2] = await of(promise2);
expect(error).toBeInstanceOf(AbortError);
expect(error2).toBeInstanceOf(AbortError);
expect(fetchStreaming.mock.calls[0][0].signal.aborted).toBeTruthy();
});

test('rejects promise on abort and lets others continue', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' });
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toBeInstanceOf(AbortError);

stream.next(
JSON.stringify({
id: 1,
result: { b: '2' },
}) + '\n'
);

await new Promise((r) => setTimeout(r, 1));

const [result2] = await of(promise2);
expect(result2).toEqual({ b: '2' });
});
});

describe('when stream closes prematurely', () => {
test('rejects pending promises with CONNECTION error code', async () => {
const { fetchStreaming, stream } = setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { defer, Defer } from '../../../kibana_utils/public';
import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
Expand All @@ -27,13 +27,7 @@ import {
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';

export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
}

export type BatchedFunc<Payload, Result> = (payload: Payload) => Promise<Result>;
import { BatchedFunc, BatchItem } from './types';

export interface BatchedFunctionProtocolError extends ErrorLike {
code: string;
Expand Down Expand Up @@ -82,32 +76,67 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
flushOnMaxItems = 25,
maxItemAge = 10,
} = params;
const [fn] = createBatchedFunction<BatchedFunc<Payload, Result>, BatchItem<Payload, Result>>({
onCall: (payload: Payload) => {
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
const future = defer<Result>();
const entry: BatchItem<Payload, Result> = {
payload,
future,
signal,
};
return [future.promise, entry];
},
onBatch: async (items) => {
try {
let responsesReceived = 0;
const batch = items.map(({ payload }) => payload);
// Filter out any items whose signal is already aborted
items = items.filter((item) => {
if (item.signal?.aborted) item.future.reject(new AbortError());
return !item.signal?.aborted;
});

const donePromises: Array<Promise<any>> = items.map((item) => {
return new Promise<void>((resolve) => {
const { promise: abortPromise, cleanup } = item.signal
? abortSignalToPromise(item.signal)
: {
promise: undefined,
cleanup: () => {},
};

const onDone = () => {
resolve();
cleanup();
};
if (abortPromise)
abortPromise.catch(() => {
item.future.reject(new AbortError());
onDone();
});
item.future.promise.then(onDone, onDone);
});
});

// abort when all items were either resolved, rejected or aborted
const abortController = new AbortController();
let isBatchDone = false;
Promise.all(donePromises).then(() => {
isBatchDone = true;
abortController.abort();
});
const batch = items.map((item) => item.payload);

const { stream } = fetchStreamingInjected({
url,
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
});
stream.pipe(split('\n')).subscribe({
next: (json: string) => {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
if (response.error) {
responsesReceived++;
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
responsesReceived++;
items[response.id].future.resolve(response.result);
}
},
Expand All @@ -117,8 +146,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
for (const { future } of items) future.reject(normalizedError);
},
complete: () => {
const streamTerminatedPrematurely = responsesReceived !== items.length;
if (streamTerminatedPrematurely) {
if (!isBatchDone) {
const error: BatchedFunctionProtocolError = {
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
Expand Down
31 changes: 31 additions & 0 deletions src/plugins/bfetch/public/batching/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { Defer } from '../../../kibana_utils/public';

export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
signal?: AbortSignal;
}

export type BatchedFunc<Payload, Result> = (
payload: Payload,
signal?: AbortSignal
) => Promise<Result>;
2 changes: 2 additions & 0 deletions src/plugins/bfetch/public/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import { BfetchPublicPlugin } from './plugin';
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin';
export { split } from './streaming';

export { BatchedFunc } from './batching/types';

export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchPublicPlugin(initializerContext);
}
2 changes: 1 addition & 1 deletion src/plugins/bfetch/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './
import { removeLeadingSlash } from '../common';
import {
createStreamingBatchedFunction,
BatchedFunc,
StreamingBatchedFunctionParams,
} from './batching/create_streaming_batched_function';
import { BatchedFunc } from './batching/types';

// eslint-disable-next-line
export interface BfetchPublicSetupDependencies {}
Expand Down
27 changes: 27 additions & 0 deletions src/plugins/bfetch/public/streaming/fetch_streaming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,33 @@ test('completes stream observable when request finishes', async () => {
expect(spy).toHaveBeenCalledTimes(1);
});

test('completes stream observable when aborted', async () => {
const env = setup();
const abort = new AbortController();
const { stream } = fetchStreaming({
url: 'http://example.com',
signal: abort.signal,
});

const spy = jest.fn();
stream.subscribe({
complete: spy,
});

expect(spy).toHaveBeenCalledTimes(0);

(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);

abort.abort();

(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);

expect(spy).toHaveBeenCalledTimes(1);
});

test('promise throws when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
Expand Down
4 changes: 3 additions & 1 deletion src/plugins/bfetch/public/streaming/fetch_streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export interface FetchStreamingParams {
headers?: Record<string, string>;
method?: 'GET' | 'POST';
body?: string;
signal?: AbortSignal;
}

/**
Expand All @@ -35,6 +36,7 @@ export function fetchStreaming({
headers = {},
method = 'POST',
body = '',
signal,
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();

Expand All @@ -45,7 +47,7 @@ export function fetchStreaming({
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));

const stream = fromStreamingXhr(xhr);
const stream = fromStreamingXhr(xhr, signal);

// Send the payload to the server
xhr.send(body);
Expand Down
Loading