Skip to content

Commit

Permalink
Revert "[Search] Search batching using bfetch (elastic#83418)" (elast…
Browse files Browse the repository at this point in the history
…ic#84037)

This reverts commit 5708c5d.
  • Loading branch information
mistic committed Nov 23, 2020
1 parent 1385323 commit 3fb45c7
Show file tree
Hide file tree
Showing 31 changed files with 81 additions and 414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
```

## Parameters

| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataStartDependencies, DataPublicPluginStart&gt;</code> | |
| { bfetch, expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
| { expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |

<b>Returns:</b>

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ export interface SearchInterceptorDeps

| Property | Type | Description |
| --- | --- | --- |
| [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) | <code>BfetchPublicSetup</code> | |
| [http](./kibana-plugin-plugins-data-public.searchinterceptordeps.http.md) | <code>CoreSetup['http']</code> | |
| [session](./kibana-plugin-plugins-data-public.searchinterceptordeps.session.md) | <code>ISessionService</code> | |
| [startServices](./kibana-plugin-plugins-data-public.searchinterceptordeps.startservices.md) | <code>Promise&lt;[CoreStart, any, unknown]&gt;</code> | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): {
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressions, usageCollection }: DataPluginSetupDependencies): {
__enhance: (enhancements: DataEnhancements) => void;
search: ISearchSetup;
fieldFormats: {
Expand All @@ -21,7 +21,7 @@ setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, e
| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataPluginStartDependencies, DataPluginStart&gt;</code> | |
| { bfetch, expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
| { expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';

const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Expand Down Expand Up @@ -168,28 +168,6 @@ describe('createStreamingBatchedFunction()', () => {
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});

test('ignores a request with an aborted signal', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
abortController.abort();

of(fn({ foo: 'bar' }, abortController.signal));
fn({ baz: 'quix' });

await new Promise((r) => setTimeout(r, 6));
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ baz: 'quix' }],
});
});

test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
Expand Down Expand Up @@ -445,73 +423,6 @@ describe('createStreamingBatchedFunction()', () => {
expect(result3).toEqual({ b: '3' });
});

describe('when requests are aborted', () => {
test('aborts stream when all are aborted', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' }, abortController.signal);
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);
expect(await isPending(promise2)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
expect(await isPending(promise2)).toBe(false);
const [, error] = await of(promise);
const [, error2] = await of(promise2);
expect(error).toBeInstanceOf(AbortError);
expect(error2).toBeInstanceOf(AbortError);
expect(fetchStreaming.mock.calls[0][0].signal.aborted).toBeTruthy();
});

test('rejects promise on abort and lets others continue', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' });
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toBeInstanceOf(AbortError);

stream.next(
JSON.stringify({
id: 1,
result: { b: '2' },
}) + '\n'
);

await new Promise((r) => setTimeout(r, 1));

const [result2] = await of(promise2);
expect(result2).toEqual({ b: '2' });
});
});

describe('when stream closes prematurely', () => {
test('rejects pending promises with CONNECTION error code', async () => {
const { fetchStreaming, stream } = setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import { defer, Defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
Expand All @@ -27,7 +27,13 @@ import {
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';
import { BatchedFunc, BatchItem } from './types';

export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
}

export type BatchedFunc<Payload, Result> = (payload: Payload) => Promise<Result>;

export interface BatchedFunctionProtocolError extends ErrorLike {
code: string;
Expand Down Expand Up @@ -76,67 +82,32 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
flushOnMaxItems = 25,
maxItemAge = 10,
} = params;
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
const [fn] = createBatchedFunction<BatchedFunc<Payload, Result>, BatchItem<Payload, Result>>({
onCall: (payload: Payload) => {
const future = defer<Result>();
const entry: BatchItem<Payload, Result> = {
payload,
future,
signal,
};
return [future.promise, entry];
},
onBatch: async (items) => {
try {
// Filter out any items whose signal is already aborted
items = items.filter((item) => {
if (item.signal?.aborted) item.future.reject(new AbortError());
return !item.signal?.aborted;
});

const donePromises: Array<Promise<any>> = items.map((item) => {
return new Promise<void>((resolve) => {
const { promise: abortPromise, cleanup } = item.signal
? abortSignalToPromise(item.signal)
: {
promise: undefined,
cleanup: () => {},
};

const onDone = () => {
resolve();
cleanup();
};
if (abortPromise)
abortPromise.catch(() => {
item.future.reject(new AbortError());
onDone();
});
item.future.promise.then(onDone, onDone);
});
});

// abort when all items were either resolved, rejected or aborted
const abortController = new AbortController();
let isBatchDone = false;
Promise.all(donePromises).then(() => {
isBatchDone = true;
abortController.abort();
});
const batch = items.map((item) => item.payload);

let responsesReceived = 0;
const batch = items.map(({ payload }) => payload);
const { stream } = fetchStreamingInjected({
url,
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
});
stream.pipe(split('\n')).subscribe({
next: (json: string) => {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
if (response.error) {
responsesReceived++;
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
responsesReceived++;
items[response.id].future.resolve(response.result);
}
},
Expand All @@ -146,7 +117,8 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
for (const { future } of items) future.reject(normalizedError);
},
complete: () => {
if (!isBatchDone) {
const streamTerminatedPrematurely = responsesReceived !== items.length;
if (streamTerminatedPrematurely) {
const error: BatchedFunctionProtocolError = {
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
Expand Down
31 changes: 0 additions & 31 deletions src/plugins/bfetch/public/batching/types.ts

This file was deleted.

2 changes: 0 additions & 2 deletions src/plugins/bfetch/public/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import { BfetchPublicPlugin } from './plugin';
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin';
export { split } from './streaming';

export { BatchedFunc } from './batching/types';

export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchPublicPlugin(initializerContext);
}
2 changes: 1 addition & 1 deletion src/plugins/bfetch/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './
import { removeLeadingSlash } from '../common';
import {
createStreamingBatchedFunction,
BatchedFunc,
StreamingBatchedFunctionParams,
} from './batching/create_streaming_batched_function';
import { BatchedFunc } from './batching/types';

// eslint-disable-next-line
export interface BfetchPublicSetupDependencies {}
Expand Down
27 changes: 0 additions & 27 deletions src/plugins/bfetch/public/streaming/fetch_streaming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,33 +132,6 @@ test('completes stream observable when request finishes', async () => {
expect(spy).toHaveBeenCalledTimes(1);
});

test('completes stream observable when aborted', async () => {
const env = setup();
const abort = new AbortController();
const { stream } = fetchStreaming({
url: 'http://example.com',
signal: abort.signal,
});

const spy = jest.fn();
stream.subscribe({
complete: spy,
});

expect(spy).toHaveBeenCalledTimes(0);

(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);

abort.abort();

(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);

expect(spy).toHaveBeenCalledTimes(1);
});

test('promise throws when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
Expand Down
4 changes: 1 addition & 3 deletions src/plugins/bfetch/public/streaming/fetch_streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export interface FetchStreamingParams {
headers?: Record<string, string>;
method?: 'GET' | 'POST';
body?: string;
signal?: AbortSignal;
}

/**
Expand All @@ -36,7 +35,6 @@ export function fetchStreaming({
headers = {},
method = 'POST',
body = '',
signal,
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();

Expand All @@ -47,7 +45,7 @@ export function fetchStreaming({
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));

const stream = fromStreamingXhr(xhr, signal);
const stream = fromStreamingXhr(xhr);

// Send the payload to the server
xhr.send(body);
Expand Down
Loading

0 comments on commit 3fb45c7

Please sign in to comment.