Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[Search] Search batching using bfetch (#83418)" #84037

Merged
merged 1 commit into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
```

## Parameters

| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataStartDependencies, DataPublicPluginStart&gt;</code> | |
| { bfetch, expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
| { expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |

<b>Returns:</b>

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ export interface SearchInterceptorDeps

| Property | Type | Description |
| --- | --- | --- |
| [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) | <code>BfetchPublicSetup</code> | |
| [http](./kibana-plugin-plugins-data-public.searchinterceptordeps.http.md) | <code>CoreSetup['http']</code> | |
| [session](./kibana-plugin-plugins-data-public.searchinterceptordeps.session.md) | <code>ISessionService</code> | |
| [startServices](./kibana-plugin-plugins-data-public.searchinterceptordeps.startservices.md) | <code>Promise&lt;[CoreStart, any, unknown]&gt;</code> | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): {
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressions, usageCollection }: DataPluginSetupDependencies): {
__enhance: (enhancements: DataEnhancements) => void;
search: ISearchSetup;
fieldFormats: {
Expand All @@ -21,7 +21,7 @@ setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, e
| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataPluginStartDependencies, DataPluginStart&gt;</code> | |
| { bfetch, expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
| { expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';

const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Expand Down Expand Up @@ -168,28 +168,6 @@ describe('createStreamingBatchedFunction()', () => {
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});

test('ignores a request with an aborted signal', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
abortController.abort();

of(fn({ foo: 'bar' }, abortController.signal));
fn({ baz: 'quix' });

await new Promise((r) => setTimeout(r, 6));
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ baz: 'quix' }],
});
});

test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
Expand Down Expand Up @@ -445,73 +423,6 @@ describe('createStreamingBatchedFunction()', () => {
expect(result3).toEqual({ b: '3' });
});

describe('when requests are aborted', () => {
test('aborts stream when all are aborted', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' }, abortController.signal);
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);
expect(await isPending(promise2)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
expect(await isPending(promise2)).toBe(false);
const [, error] = await of(promise);
const [, error2] = await of(promise2);
expect(error).toBeInstanceOf(AbortError);
expect(error2).toBeInstanceOf(AbortError);
expect(fetchStreaming.mock.calls[0][0].signal.aborted).toBeTruthy();
});

test('rejects promise on abort and lets others continue', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' });
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toBeInstanceOf(AbortError);

stream.next(
JSON.stringify({
id: 1,
result: { b: '2' },
}) + '\n'
);

await new Promise((r) => setTimeout(r, 1));

const [result2] = await of(promise2);
expect(result2).toEqual({ b: '2' });
});
});

describe('when stream closes prematurely', () => {
test('rejects pending promises with CONNECTION error code', async () => {
const { fetchStreaming, stream } = setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import { defer, Defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
Expand All @@ -27,7 +27,13 @@ import {
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';
import { BatchedFunc, BatchItem } from './types';

export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
}

export type BatchedFunc<Payload, Result> = (payload: Payload) => Promise<Result>;

export interface BatchedFunctionProtocolError extends ErrorLike {
code: string;
Expand Down Expand Up @@ -76,67 +82,32 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
flushOnMaxItems = 25,
maxItemAge = 10,
} = params;
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
const [fn] = createBatchedFunction<BatchedFunc<Payload, Result>, BatchItem<Payload, Result>>({
onCall: (payload: Payload) => {
const future = defer<Result>();
const entry: BatchItem<Payload, Result> = {
payload,
future,
signal,
};
return [future.promise, entry];
},
onBatch: async (items) => {
try {
// Filter out any items whose signal is already aborted
items = items.filter((item) => {
if (item.signal?.aborted) item.future.reject(new AbortError());
return !item.signal?.aborted;
});

const donePromises: Array<Promise<any>> = items.map((item) => {
return new Promise<void>((resolve) => {
const { promise: abortPromise, cleanup } = item.signal
? abortSignalToPromise(item.signal)
: {
promise: undefined,
cleanup: () => {},
};

const onDone = () => {
resolve();
cleanup();
};
if (abortPromise)
abortPromise.catch(() => {
item.future.reject(new AbortError());
onDone();
});
item.future.promise.then(onDone, onDone);
});
});

// abort when all items were either resolved, rejected or aborted
const abortController = new AbortController();
let isBatchDone = false;
Promise.all(donePromises).then(() => {
isBatchDone = true;
abortController.abort();
});
const batch = items.map((item) => item.payload);

let responsesReceived = 0;
const batch = items.map(({ payload }) => payload);
const { stream } = fetchStreamingInjected({
url,
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
});
stream.pipe(split('\n')).subscribe({
next: (json: string) => {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
if (response.error) {
responsesReceived++;
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
responsesReceived++;
items[response.id].future.resolve(response.result);
}
},
Expand All @@ -146,7 +117,8 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
for (const { future } of items) future.reject(normalizedError);
},
complete: () => {
if (!isBatchDone) {
const streamTerminatedPrematurely = responsesReceived !== items.length;
if (streamTerminatedPrematurely) {
const error: BatchedFunctionProtocolError = {
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
Expand Down
31 changes: 0 additions & 31 deletions src/plugins/bfetch/public/batching/types.ts

This file was deleted.

2 changes: 0 additions & 2 deletions src/plugins/bfetch/public/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import { BfetchPublicPlugin } from './plugin';
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin';
export { split } from './streaming';

export { BatchedFunc } from './batching/types';

export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchPublicPlugin(initializerContext);
}
2 changes: 1 addition & 1 deletion src/plugins/bfetch/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './
import { removeLeadingSlash } from '../common';
import {
createStreamingBatchedFunction,
BatchedFunc,
StreamingBatchedFunctionParams,
} from './batching/create_streaming_batched_function';
import { BatchedFunc } from './batching/types';

// eslint-disable-next-line
export interface BfetchPublicSetupDependencies {}
Expand Down
27 changes: 0 additions & 27 deletions src/plugins/bfetch/public/streaming/fetch_streaming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,33 +132,6 @@ test('completes stream observable when request finishes', async () => {
expect(spy).toHaveBeenCalledTimes(1);
});

test('completes stream observable when aborted', async () => {
const env = setup();
const abort = new AbortController();
const { stream } = fetchStreaming({
url: 'http://example.com',
signal: abort.signal,
});

const spy = jest.fn();
stream.subscribe({
complete: spy,
});

expect(spy).toHaveBeenCalledTimes(0);

(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);

abort.abort();

(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);

expect(spy).toHaveBeenCalledTimes(1);
});

test('promise throws when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
Expand Down
4 changes: 1 addition & 3 deletions src/plugins/bfetch/public/streaming/fetch_streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export interface FetchStreamingParams {
headers?: Record<string, string>;
method?: 'GET' | 'POST';
body?: string;
signal?: AbortSignal;
}

/**
Expand All @@ -36,7 +35,6 @@ export function fetchStreaming({
headers = {},
method = 'POST',
body = '',
signal,
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();

Expand All @@ -47,7 +45,7 @@ export function fetchStreaming({
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));

const stream = fromStreamingXhr(xhr, signal);
const stream = fromStreamingXhr(xhr);

// Send the payload to the server
xhr.send(body);
Expand Down
Loading