Skip to content

Commit

Permalink
[Search] Search batching using bfetch (elastic#83418) (elastic#84018)
Browse files Browse the repository at this point in the history
* Use bfetch for search (no abort behavior)

* fix merge

* Handle request abortion + unit tests

* fix jest

* shim totals in oss

* proper formatting for errors

* jest, types and docs

* Fix doc

* Remove old search code and rename UI Setting

* jest mocks

* jest

* Solve unhanled error

* Use AbortSignal

* ts

* code review - use abort controller instead of observable

* Revert "Remove old search code and rename UI Setting"

This reverts commit 17de9fa.

* Remove old search code and rename UI Setting

* revert search route

* fix event unsubscribe

* code review 2

* revert filter

* simplify batch done logic

* code review

* filter items in the beginning

* jest

Co-authored-by: Kibana Machine <[email protected]>

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
lizozom and kibanamachine authored Nov 22, 2020
1 parent b2b41e9 commit 1385323
Show file tree
Hide file tree
Showing 31 changed files with 414 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
```

## Parameters

| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataStartDependencies, DataPublicPluginStart&gt;</code> | |
| { expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
| { bfetch, expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [kibana-plugin-plugins-data-public](./kibana-plugin-plugins-data-public.md) &gt; [SearchInterceptorDeps](./kibana-plugin-plugins-data-public.searchinterceptordeps.md) &gt; [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md)

## SearchInterceptorDeps.bfetch property

<b>Signature:</b>

```typescript
bfetch: BfetchPublicSetup;
```
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface SearchInterceptorDeps

| Property | Type | Description |
| --- | --- | --- |
| [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) | <code>BfetchPublicSetup</code> | |
| [http](./kibana-plugin-plugins-data-public.searchinterceptordeps.http.md) | <code>CoreSetup['http']</code> | |
| [session](./kibana-plugin-plugins-data-public.searchinterceptordeps.session.md) | <code>ISessionService</code> | |
| [startServices](./kibana-plugin-plugins-data-public.searchinterceptordeps.startservices.md) | <code>Promise&lt;[CoreStart, any, unknown]&gt;</code> | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressions, usageCollection }: DataPluginSetupDependencies): {
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): {
__enhance: (enhancements: DataEnhancements) => void;
search: ISearchSetup;
fieldFormats: {
Expand All @@ -21,7 +21,7 @@ setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressio
| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataPluginStartDependencies, DataPluginStart&gt;</code> | |
| { expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
| { bfetch, expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { defer, of } from '../../../kibana_utils/public';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';

const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Expand Down Expand Up @@ -168,6 +168,28 @@ describe('createStreamingBatchedFunction()', () => {
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});

test('ignores a request with an aborted signal', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
abortController.abort();

of(fn({ foo: 'bar' }, abortController.signal));
fn({ baz: 'quix' });

await new Promise((r) => setTimeout(r, 6));
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ baz: 'quix' }],
});
});

test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
Expand Down Expand Up @@ -423,6 +445,73 @@ describe('createStreamingBatchedFunction()', () => {
expect(result3).toEqual({ b: '3' });
});

describe('when requests are aborted', () => {
test('aborts stream when all are aborted', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' }, abortController.signal);
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);
expect(await isPending(promise2)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
expect(await isPending(promise2)).toBe(false);
const [, error] = await of(promise);
const [, error2] = await of(promise2);
expect(error).toBeInstanceOf(AbortError);
expect(error2).toBeInstanceOf(AbortError);
expect(fetchStreaming.mock.calls[0][0].signal.aborted).toBeTruthy();
});

test('rejects promise on abort and lets others continue', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' });
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toBeInstanceOf(AbortError);

stream.next(
JSON.stringify({
id: 1,
result: { b: '2' },
}) + '\n'
);

await new Promise((r) => setTimeout(r, 1));

const [result2] = await of(promise2);
expect(result2).toEqual({ b: '2' });
});
});

describe('when stream closes prematurely', () => {
test('rejects pending promises with CONNECTION error code', async () => {
const { fetchStreaming, stream } = setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { defer, Defer } from '../../../kibana_utils/public';
import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
Expand All @@ -27,13 +27,7 @@ import {
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';

export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
}

export type BatchedFunc<Payload, Result> = (payload: Payload) => Promise<Result>;
import { BatchedFunc, BatchItem } from './types';

export interface BatchedFunctionProtocolError extends ErrorLike {
code: string;
Expand Down Expand Up @@ -82,32 +76,67 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
flushOnMaxItems = 25,
maxItemAge = 10,
} = params;
const [fn] = createBatchedFunction<BatchedFunc<Payload, Result>, BatchItem<Payload, Result>>({
onCall: (payload: Payload) => {
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
const future = defer<Result>();
const entry: BatchItem<Payload, Result> = {
payload,
future,
signal,
};
return [future.promise, entry];
},
onBatch: async (items) => {
try {
let responsesReceived = 0;
const batch = items.map(({ payload }) => payload);
// Filter out any items whose signal is already aborted
items = items.filter((item) => {
if (item.signal?.aborted) item.future.reject(new AbortError());
return !item.signal?.aborted;
});

const donePromises: Array<Promise<any>> = items.map((item) => {
return new Promise<void>((resolve) => {
const { promise: abortPromise, cleanup } = item.signal
? abortSignalToPromise(item.signal)
: {
promise: undefined,
cleanup: () => {},
};

const onDone = () => {
resolve();
cleanup();
};
if (abortPromise)
abortPromise.catch(() => {
item.future.reject(new AbortError());
onDone();
});
item.future.promise.then(onDone, onDone);
});
});

// abort when all items were either resolved, rejected or aborted
const abortController = new AbortController();
let isBatchDone = false;
Promise.all(donePromises).then(() => {
isBatchDone = true;
abortController.abort();
});
const batch = items.map((item) => item.payload);

const { stream } = fetchStreamingInjected({
url,
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
});
stream.pipe(split('\n')).subscribe({
next: (json: string) => {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
if (response.error) {
responsesReceived++;
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
responsesReceived++;
items[response.id].future.resolve(response.result);
}
},
Expand All @@ -117,8 +146,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
for (const { future } of items) future.reject(normalizedError);
},
complete: () => {
const streamTerminatedPrematurely = responsesReceived !== items.length;
if (streamTerminatedPrematurely) {
if (!isBatchDone) {
const error: BatchedFunctionProtocolError = {
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
Expand Down
31 changes: 31 additions & 0 deletions src/plugins/bfetch/public/batching/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { Defer } from '../../../kibana_utils/public';

export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
signal?: AbortSignal;
}

export type BatchedFunc<Payload, Result> = (
payload: Payload,
signal?: AbortSignal
) => Promise<Result>;
2 changes: 2 additions & 0 deletions src/plugins/bfetch/public/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import { BfetchPublicPlugin } from './plugin';
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin';
export { split } from './streaming';

export { BatchedFunc } from './batching/types';

export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchPublicPlugin(initializerContext);
}
2 changes: 1 addition & 1 deletion src/plugins/bfetch/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './
import { removeLeadingSlash } from '../common';
import {
createStreamingBatchedFunction,
BatchedFunc,
StreamingBatchedFunctionParams,
} from './batching/create_streaming_batched_function';
import { BatchedFunc } from './batching/types';

// eslint-disable-next-line
export interface BfetchPublicSetupDependencies {}
Expand Down
27 changes: 27 additions & 0 deletions src/plugins/bfetch/public/streaming/fetch_streaming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,33 @@ test('completes stream observable when request finishes', async () => {
expect(spy).toHaveBeenCalledTimes(1);
});

test('completes stream observable when aborted', async () => {
const env = setup();
const abort = new AbortController();
const { stream } = fetchStreaming({
url: 'http://example.com',
signal: abort.signal,
});

const spy = jest.fn();
stream.subscribe({
complete: spy,
});

expect(spy).toHaveBeenCalledTimes(0);

(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);

abort.abort();

(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);

expect(spy).toHaveBeenCalledTimes(1);
});

test('promise throws when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
Expand Down
4 changes: 3 additions & 1 deletion src/plugins/bfetch/public/streaming/fetch_streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export interface FetchStreamingParams {
headers?: Record<string, string>;
method?: 'GET' | 'POST';
body?: string;
signal?: AbortSignal;
}

/**
Expand All @@ -35,6 +36,7 @@ export function fetchStreaming({
headers = {},
method = 'POST',
body = '',
signal,
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();

Expand All @@ -45,7 +47,7 @@ export function fetchStreaming({
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));

const stream = fromStreamingXhr(xhr);
const stream = fromStreamingXhr(xhr, signal);

// Send the payload to the server
xhr.send(body);
Expand Down
Loading

0 comments on commit 1385323

Please sign in to comment.