From 20bb3518d0d71949504d571601e671b42c7c7bce Mon Sep 17 00:00:00 2001 From: Liza K Date: Tue, 10 Nov 2020 14:45:21 +0200 Subject: [PATCH 01/25] Use bfetch for search (no abort behavior) --- src/plugins/bfetch/public/index.ts | 2 ++ src/plugins/data/kibana.json | 1 + src/plugins/data/public/plugin.ts | 3 +- .../public/search/search_interceptor.test.ts | 2 ++ .../data/public/search/search_interceptor.ts | 34 +++++++++++++------ .../data/public/search/search_service.ts | 5 ++- src/plugins/data/public/types.ts | 2 ++ src/plugins/data/server/plugin.ts | 5 ++- .../data/server/search/search_service.ts | 30 ++++++++++++++-- x-pack/plugins/data_enhanced/kibana.json | 1 + x-pack/plugins/data_enhanced/public/plugin.ts | 5 ++- .../public/search/search_interceptor.test.ts | 4 +++ 12 files changed, 77 insertions(+), 17 deletions(-) diff --git a/src/plugins/bfetch/public/index.ts b/src/plugins/bfetch/public/index.ts index 8707e5a438159..9aaa7804fe844 100644 --- a/src/plugins/bfetch/public/index.ts +++ b/src/plugins/bfetch/public/index.ts @@ -23,6 +23,8 @@ import { BfetchPublicPlugin } from './plugin'; export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin'; export { split } from './streaming'; +export { BatchedFunc } from './batching/create_streaming_batched_function'; + export function plugin(initializerContext: PluginInitializerContext) { return new BfetchPublicPlugin(initializerContext); } diff --git a/src/plugins/data/kibana.json b/src/plugins/data/kibana.json index 9cb9b1745373a..0aa05deaa28c1 100644 --- a/src/plugins/data/kibana.json +++ b/src/plugins/data/kibana.json @@ -4,6 +4,7 @@ "server": true, "ui": true, "requiredPlugins": [ + "bfetch", "expressions", "uiActions" ], diff --git a/src/plugins/data/public/plugin.ts b/src/plugins/data/public/plugin.ts index 5abf4d3648af7..45fa271c96ca7 100644 --- a/src/plugins/data/public/plugin.ts +++ b/src/plugins/data/public/plugin.ts @@ -105,7 +105,7 @@ export class DataPublicPlugin public setup( core: CoreSetup, - { expressions, uiActions, usageCollection }: DataSetupDependencies + { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies ): DataPublicPluginSetup { const startServices = createStartServicesGetter(core.getStartServices); @@ -136,6 +136,7 @@ export class DataPublicPlugin ); const searchService = this.searchService.setup(core, { + bfetch, usageCollection, expressions, }); diff --git a/src/plugins/data/public/search/search_interceptor.test.ts b/src/plugins/data/public/search/search_interceptor.test.ts index 472caa5e4f45f..997d929f7b168 100644 --- a/src/plugins/data/public/search/search_interceptor.test.ts +++ b/src/plugins/data/public/search/search_interceptor.test.ts @@ -25,6 +25,7 @@ import { AbortError } from '../../common'; import { SearchTimeoutError, PainlessError, TimeoutErrorMode } from './errors'; import { searchServiceMock } from './mocks'; import { ISearchStart } from '.'; +import { bfetchPluginMock } from '../../../bfetch/public/mocks'; let searchInterceptor: SearchInterceptor; let mockCoreSetup: MockedKeys; @@ -40,6 +41,7 @@ describe('SearchInterceptor', () => { mockCoreStart = coreMock.createStart(); searchMock = searchServiceMock.createStartContract(); searchInterceptor = new SearchInterceptor({ + bfetch: bfetchPluginMock.createSetupContract(), toasts: mockCoreSetup.notifications.toasts, startServices: new Promise((resolve) => { resolve([mockCoreStart, {}, {}]); diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts index 3584d75ab86bb..1f25151f4896b 100644 --- a/src/plugins/data/public/search/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -23,14 +23,15 @@ import { catchError, finalize } from 'rxjs/operators'; import { PublicMethodsOf } from '@kbn/utility-types'; import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public'; import { i18n } from '@kbn/i18n'; +import { BatchedFunc, BfetchPublicSetup } from 'src/plugins/bfetch/public'; import { AbortError, IKibanaSearchRequest, IKibanaSearchResponse, ISearchOptions, - ES_SEARCH_STRATEGY, ISessionService, getCombinedSignal, + ES_SEARCH_STRATEGY, } from '../../common'; import { SearchUsageCollector } from './collectors'; import { @@ -45,6 +46,7 @@ import { import { toMountPoint } from '../../../kibana_react/public'; export interface SearchInterceptorDeps { + bfetch: BfetchPublicSetup; http: CoreSetup['http']; uiSettings: CoreSetup['uiSettings']; startServices: Promise<[CoreStart, any, unknown]>; @@ -70,6 +72,10 @@ export class SearchInterceptor { * @internal */ protected application!: CoreStart['application']; + protected batchedFetch?: BatchedFunc< + { request: IKibanaSearchRequest; strategy?: string }, + IKibanaSearchResponse + >; /* * @internal @@ -80,6 +86,10 @@ export class SearchInterceptor { this.deps.startServices.then(([coreStart]) => { this.application = coreStart.application; }); + + this.batchedFetch = deps.bfetch.batchedFunction({ + url: '/internal/bsearch', + }); } /* @@ -130,16 +140,20 @@ export class SearchInterceptor { signal: AbortSignal, strategy?: string ): Promise { - const { id, ...searchRequest } = request; - const path = trimEnd(`/internal/search/${strategy || ES_SEARCH_STRATEGY}/${id || ''}`, '/'); - const body = JSON.stringify(searchRequest); + if (this.batchedFetch) { + return this.batchedFetch({ request, strategy }); + } else { + const { id, ...searchRequest } = request; + const path = trimEnd(`/internal/search/${strategy || ES_SEARCH_STRATEGY}/${id || ''}`, '/'); + const body = JSON.stringify(searchRequest); - return this.deps.http.fetch({ - method: 'POST', - path, - body, - signal, - }); + return this.deps.http.fetch({ + method: 'POST', + path, + body, + signal, + }); + } } /** diff --git a/src/plugins/data/public/search/search_service.ts b/src/plugins/data/public/search/search_service.ts index e5a50077518af..e6a3b8c458308 100644 --- a/src/plugins/data/public/search/search_service.ts +++ b/src/plugins/data/public/search/search_service.ts @@ -19,6 +19,7 @@ import { Plugin, CoreSetup, CoreStart, PluginInitializerContext } from 'src/core/public'; import { BehaviorSubject } from 'rxjs'; +import { BfetchPublicSetup } from 'src/plugins/bfetch/public'; import { ISearchSetup, ISearchStart, SearchEnhancements } from './types'; import { handleResponse } from './fetch'; @@ -46,6 +47,7 @@ import { aggShardDelay } from '../../common/search/aggs/buckets/shard_delay_fn'; /** @internal */ export interface SearchServiceSetupDependencies { + bfetch: BfetchPublicSetup; expressions: ExpressionsSetup; usageCollection?: UsageCollectionSetup; } @@ -67,7 +69,7 @@ export class SearchService implements Plugin { public setup( { http, getStartServices, notifications, uiSettings }: CoreSetup, - { expressions, usageCollection }: SearchServiceSetupDependencies + { bfetch, expressions, usageCollection }: SearchServiceSetupDependencies ): ISearchSetup { this.usageCollector = createUsageCollector(getStartServices, usageCollection); @@ -77,6 +79,7 @@ export class SearchService implements Plugin { * all pending search requests, as well as getting the number of pending search requests. */ this.searchInterceptor = new SearchInterceptor({ + bfetch, toasts: notifications.toasts, http, uiSettings, diff --git a/src/plugins/data/public/types.ts b/src/plugins/data/public/types.ts index 21a03a49fe058..4082fbe55094c 100644 --- a/src/plugins/data/public/types.ts +++ b/src/plugins/data/public/types.ts @@ -19,6 +19,7 @@ import React from 'react'; import { CoreStart } from 'src/core/public'; +import { BfetchPublicSetup } from 'src/plugins/bfetch/public'; import { IStorageWrapper } from 'src/plugins/kibana_utils/public'; import { ExpressionsSetup } from 'src/plugins/expressions/public'; import { UiActionsSetup, UiActionsStart } from 'src/plugins/ui_actions/public'; @@ -36,6 +37,7 @@ export interface DataPublicPluginEnhancements { } export interface DataSetupDependencies { + bfetch: BfetchPublicSetup; expressions: ExpressionsSetup; uiActions: UiActionsSetup; usageCollection?: UsageCollectionSetup; diff --git a/src/plugins/data/server/plugin.ts b/src/plugins/data/server/plugin.ts index 88f24b7ca5a70..018606f4c0fdc 100644 --- a/src/plugins/data/server/plugin.ts +++ b/src/plugins/data/server/plugin.ts @@ -19,6 +19,7 @@ import { PluginInitializerContext, CoreSetup, CoreStart, Plugin, Logger } from 'src/core/server'; import { ExpressionsServerSetup } from 'src/plugins/expressions/server'; +import { BfetchServerSetup } from 'src/plugins/bfetch/server'; import { ConfigSchema } from '../config'; import { IndexPatternsService, IndexPatternsServiceStart } from './index_patterns'; import { ISearchSetup, ISearchStart, SearchEnhancements } from './search'; @@ -51,6 +52,7 @@ export interface DataPluginStart { } export interface DataPluginSetupDependencies { + bfetch: BfetchServerSetup; expressions: ExpressionsServerSetup; usageCollection?: UsageCollectionSetup; } @@ -85,7 +87,7 @@ export class DataServerPlugin public setup( core: CoreSetup, - { expressions, usageCollection }: DataPluginSetupDependencies + { bfetch, expressions, usageCollection }: DataPluginSetupDependencies ) { this.indexPatterns.setup(core); this.scriptsService.setup(core); @@ -96,6 +98,7 @@ export class DataServerPlugin core.uiSettings.register(getUiSettings()); const searchSetup = this.searchService.setup(core, { + bfetch, registerFunction: expressions.registerFunction, usageCollection, }); diff --git a/src/plugins/data/server/search/search_service.ts b/src/plugins/data/server/search/search_service.ts index c500c62914c0b..1fcac41c55c8f 100644 --- a/src/plugins/data/server/search/search_service.ts +++ b/src/plugins/data/server/search/search_service.ts @@ -30,6 +30,7 @@ import { StartServicesAccessor, } from 'src/core/server'; import { first } from 'rxjs/operators'; +import { BfetchServerSetup } from 'src/plugins/bfetch/server'; import { ISearchSetup, ISearchStart, @@ -77,6 +78,7 @@ type StrategyMap = Record>; /** @internal */ export interface SearchServiceSetupDependencies { + bfetch: BfetchServerSetup; registerFunction: AggsSetupDependencies['registerFunction']; usageCollection?: UsageCollectionSetup; } @@ -98,6 +100,7 @@ export class SearchService implements Plugin { private readonly searchSourceService = new SearchSourceService(); private defaultSearchStrategyName: string = ES_SEARCH_STRATEGY; private searchStrategies: StrategyMap = {}; + private coreStart?: CoreStart; constructor( private initializerContext: PluginInitializerContext, @@ -106,7 +109,7 @@ export class SearchService implements Plugin { public setup( core: CoreSetup<{}, DataPluginStart>, - { registerFunction, usageCollection }: SearchServiceSetupDependencies + { bfetch, registerFunction, usageCollection }: SearchServiceSetupDependencies ): ISearchSetup { const usage = usageCollection ? usageProvider(core) : undefined; @@ -118,9 +121,12 @@ export class SearchService implements Plugin { registerSearchRoute(router); registerMsearchRoute(router, routeDependencies); + core.getStartServices().then(([coreStart]) => { + this.coreStart = coreStart; + }); + core.http.registerRouteHandlerContext('search', async (context, request) => { - const [coreStart] = await core.getStartServices(); - return this.asScopedProvider(coreStart)(request); + return this.asScopedProvider(this.coreStart!)(request); }); this.registerSearchStrategy( @@ -132,6 +138,24 @@ export class SearchService implements Plugin { ) ); + bfetch.addBatchProcessingRoute<{ request: any; strategy?: string }, any>( + '/internal/bsearch', + (request) => { + const search = this.asScopedProvider(this.coreStart!)(request); + + return { + onBatchItem: async ({ request: requestData, strategy }) => { + return search + .search(requestData, { + strategy, + }) + .pipe(first()) + .toPromise(); + }, + }; + } + ); + core.savedObjects.registerType(searchTelemetry); if (usageCollection) { registerUsageCollector(usageCollection, this.initializerContext); diff --git a/x-pack/plugins/data_enhanced/kibana.json b/x-pack/plugins/data_enhanced/kibana.json index 5ded0f8f0dec3..884121f2f0a0d 100644 --- a/x-pack/plugins/data_enhanced/kibana.json +++ b/x-pack/plugins/data_enhanced/kibana.json @@ -6,6 +6,7 @@ "xpack", "data_enhanced" ], "requiredPlugins": [ + "bfetch", "data", "features" ], diff --git a/x-pack/plugins/data_enhanced/public/plugin.ts b/x-pack/plugins/data_enhanced/public/plugin.ts index 43ad4a9ed9b8b..bd4a45d27f9cf 100644 --- a/x-pack/plugins/data_enhanced/public/plugin.ts +++ b/x-pack/plugins/data_enhanced/public/plugin.ts @@ -6,12 +6,14 @@ import { CoreSetup, CoreStart, Plugin } from 'src/core/public'; import { DataPublicPluginSetup, DataPublicPluginStart } from '../../../../src/plugins/data/public'; +import { BfetchPublicSetup } from '../../../../src/plugins/bfetch/public'; import { setAutocompleteService } from './services'; import { setupKqlQuerySuggestionProvider, KUERY_LANGUAGE_NAME } from './autocomplete'; import { EnhancedSearchInterceptor } from './search/search_interceptor'; export interface DataEnhancedSetupDependencies { + bfetch: BfetchPublicSetup; data: DataPublicPluginSetup; } export interface DataEnhancedStartDependencies { @@ -27,7 +29,7 @@ export class DataEnhancedPlugin public setup( core: CoreSetup, - { data }: DataEnhancedSetupDependencies + { bfetch, data }: DataEnhancedSetupDependencies ) { data.autocomplete.addQuerySuggestionProvider( KUERY_LANGUAGE_NAME, @@ -35,6 +37,7 @@ export class DataEnhancedPlugin ); this.enhancedSearchInterceptor = new EnhancedSearchInterceptor({ + bfetch, toasts: core.notifications.toasts, http: core.http, uiSettings: core.uiSettings, diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts index f47d2b39a89a9..fbbbbf92ca1ea 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts @@ -10,6 +10,7 @@ import { CoreSetup, CoreStart } from 'kibana/public'; import { AbortError, UI_SETTINGS } from '../../../../../src/plugins/data/common'; import { SearchTimeoutError } from 'src/plugins/data/public'; import { dataPluginMock } from '../../../../../src/plugins/data/public/mocks'; +import { bfetchPluginMock } from '../../../../../src/plugins/bfetch/public/mocks'; const timeTravel = (msToRun = 0) => { jest.advanceTimersByTime(msToRun); @@ -73,7 +74,10 @@ describe('EnhancedSearchInterceptor', () => { ]); }); + const bfetchMock = bfetchPluginMock.createSetupContract(); + searchInterceptor = new EnhancedSearchInterceptor({ + bfetch: bfetchMock, toasts: mockCoreSetup.notifications.toasts, startServices: mockPromise as any, http: mockCoreSetup.http, From 8b4eb1c8d5dad6f9b99c052a95d5c8ee07470d12 Mon Sep 17 00:00:00 2001 From: Liza K Date: Tue, 10 Nov 2020 20:21:22 +0200 Subject: [PATCH 02/25] fix merge --- .../plugins/data_enhanced/public/search/search_interceptor.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts index 4cafcdb29ae8d..d36741ae6576d 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts @@ -13,7 +13,7 @@ import { SearchInterceptorDeps, UI_SETTINGS, } from '../../../../../src/plugins/data/public'; -import { AbortError, toPromise } from '../../../../../src/plugins/data/common'; +import { AbortError, abortSignalToPromise } from '../../../../../src/plugins/kibana_utils/public'; import { IAsyncSearchRequest, @@ -70,7 +70,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { abortSignal: options.abortSignal, timeout: this.searchTimeout, }); - const abortedPromise = toPromise(combinedSignal); + const abortedPromise = abortSignalToPromise(combinedSignal); const strategy = options?.strategy ?? ENHANCED_ES_SEARCH_STRATEGY; this.pendingCount$.next(this.pendingCount$.getValue() + 1); From aaa3122277ccd563175466174abd8e0848cdb8ec Mon Sep 17 00:00:00 2001 From: Liza K Date: Mon, 16 Nov 2020 14:22:02 +0200 Subject: [PATCH 03/25] Handle request abortion + unit tests --- .../public/batching/batch_utils.test.ts | 199 ++++++++++++++++++ .../bfetch/public/batching/batch_utils.ts | 47 +++++ .../create_streaming_batched_function.test.ts | 62 +++++- .../create_streaming_batched_function.ts | 40 ++-- src/plugins/bfetch/public/batching/types.ts | 32 +++ src/plugins/bfetch/public/index.ts | 2 +- src/plugins/bfetch/public/plugin.ts | 2 +- .../public/streaming/fetch_streaming.test.ts | 28 +++ .../public/streaming/fetch_streaming.ts | 5 +- .../streaming/from_streaming_xhr.test.ts | 35 +++ .../public/streaming/from_streaming_xhr.ts | 19 +- .../data/public/search/search_interceptor.ts | 2 +- 12 files changed, 451 insertions(+), 22 deletions(-) create mode 100644 src/plugins/bfetch/public/batching/batch_utils.test.ts create mode 100644 src/plugins/bfetch/public/batching/batch_utils.ts create mode 100644 src/plugins/bfetch/public/batching/types.ts diff --git a/src/plugins/bfetch/public/batching/batch_utils.test.ts b/src/plugins/bfetch/public/batching/batch_utils.test.ts new file mode 100644 index 0000000000000..b57ec16801fbd --- /dev/null +++ b/src/plugins/bfetch/public/batching/batch_utils.test.ts @@ -0,0 +1,199 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { BatchItem } from './types'; +import { getBatchDone$ } from './batch_utils'; +import { defer } from 'src/plugins/kibana_utils/common'; + +const tick = () => new Promise((resolve) => setTimeout(resolve, 1)); + +describe('getBatchDone$()', () => { + test('Triggers when all are aborted', async () => { + const abortControllers = [new AbortController(), new AbortController()]; + const items: Array> = [ + { + future: defer(), + payload: null, + done: false, + signal: abortControllers[0].signal, + }, + { + future: defer(), + payload: null, + done: false, + signal: abortControllers[1].signal, + }, + ]; + const b = getBatchDone$(items); + + const spy = { + next: jest.fn(), + complete: jest.fn(), + }; + + b.subscribe(spy); + + abortControllers[0].abort(); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(0); + + abortControllers[1].abort(); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(1); + expect(spy.complete).toHaveBeenCalledTimes(1); + }); + + test('Triggers when all are resolved', async () => { + const items: Array> = [ + { + future: defer(), + payload: null, + done: false, + }, + { + future: defer(), + payload: null, + done: false, + }, + ]; + const b = getBatchDone$(items); + + const spy = { + next: jest.fn(), + complete: jest.fn(), + }; + + b.subscribe(spy); + + items[0].future.resolve(null); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(0); + + items[1].future.resolve(null); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(1); + expect(spy.complete).toHaveBeenCalledTimes(1); + }); + + test('Triggers when its a mix', async () => { + const abortController = new AbortController(); + const items: Array> = [ + { + future: defer(), + payload: null, + done: false, + }, + { + future: defer(), + payload: null, + done: false, + signal: abortController.signal, + }, + ]; + const b = getBatchDone$(items); + + const spy = { + next: jest.fn(), + complete: jest.fn(), + }; + + b.subscribe(spy); + + items[0].future.resolve(null); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(0); + + abortController.abort(); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(1); + expect(spy.complete).toHaveBeenCalledTimes(1); + }); + + test('Triggers correctly if an item is resolved then aborted', async () => { + const abortController = new AbortController(); + const items: Array> = [ + { + future: defer(), + payload: null, + done: false, + signal: abortController.signal, + }, + { + future: defer(), + payload: null, + done: false, + }, + ]; + const b = getBatchDone$(items); + + const spy = { + next: jest.fn(), + complete: jest.fn(), + }; + + b.subscribe(spy); + + items[0].future.resolve(null); + await tick(); + abortController.abort(); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(0); + + items[1].future.resolve(null); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(1); + expect(spy.complete).toHaveBeenCalledTimes(1); + }); + + test('Triggers correctly if an item is aborted then resolved', async () => { + const abortController = new AbortController(); + const items: Array> = [ + { + future: defer(), + payload: null, + done: false, + }, + { + future: defer(), + payload: null, + done: false, + signal: abortController.signal, + }, + ]; + const b = getBatchDone$(items); + + const spy = { + next: jest.fn(), + complete: jest.fn(), + }; + + b.subscribe(spy); + + items[0].future.resolve(null); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(0); + + abortController.abort(); + await tick(); + items[1].future.resolve(null); + await tick(); + expect(spy.next).toHaveBeenCalledTimes(1); + expect(spy.complete).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/plugins/bfetch/public/batching/batch_utils.ts b/src/plugins/bfetch/public/batching/batch_utils.ts new file mode 100644 index 0000000000000..64455e6259f2d --- /dev/null +++ b/src/plugins/bfetch/public/batching/batch_utils.ts @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { filter, mergeMap } from 'rxjs/operators'; +import { from } from 'rxjs'; +import { BatchItem } from './types'; + +export function isBatchDone(items: Array>): boolean { + return items.every((item) => item.done); +} + +export function getBatchDone$(items: Array>) { + // Triggers when all requests were resolved, rejected or aborted + return from(items).pipe( + mergeMap((item) => { + return new Promise((resolve) => { + const onDone = () => { + if (item.done) return; + + item.done = true; + item.signal?.removeEventListener('abort', onDone); + resolve(isBatchDone(items)); + }; + + item.signal?.addEventListener('abort', onDone); + item.future.promise.finally(onDone); + }); + }), + filter((allDone) => allDone) + ); +} diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts index da6c940c48d0a..7638966966ff0 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts @@ -19,7 +19,7 @@ import { createStreamingBatchedFunction } from './create_streaming_batched_function'; import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming'; -import { defer, of } from '../../../kibana_utils/public'; +import { AbortError, defer, of } from '../../../kibana_utils/public'; import { Subject } from 'rxjs'; const getPromiseState = (promise: Promise): Promise<'resolved' | 'rejected' | 'pending'> => @@ -168,6 +168,28 @@ describe('createStreamingBatchedFunction()', () => { expect(fetchStreaming).toHaveBeenCalledTimes(1); }); + test('ignores a request with an aborted signal', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const abortController = new AbortController(); + abortController.abort(); + + of(fn({ foo: 'bar' }, abortController.signal)); + fn({ baz: 'quix' }); + + await new Promise((r) => setTimeout(r, 6)); + const { body } = fetchStreaming.mock.calls[0][0]; + expect(JSON.parse(body)).toEqual({ + batch: [{ baz: 'quix' }], + }); + }); + test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => { const { fetchStreaming } = setup(); const fn = createStreamingBatchedFunction({ @@ -423,6 +445,44 @@ describe('createStreamingBatchedFunction()', () => { expect(result3).toEqual({ b: '3' }); }); + describe('when requests are aborted', () => { + test('rejects promise on abort and lets others continue', async () => { + const { fetchStreaming, stream } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const abortController = new AbortController(); + const promise = fn({ a: '1' }, abortController.signal); + const promise2 = fn({ a: '2' }); + await new Promise((r) => setTimeout(r, 6)); + + expect(await isPending(promise)).toBe(true); + + abortController.abort(); + await new Promise((r) => setTimeout(r, 6)); + + expect(await isPending(promise)).toBe(false); + const [, error] = await of(promise); + expect(error).toBeInstanceOf(AbortError); + + stream.next( + JSON.stringify({ + id: 1, + result: { b: '2' }, + }) + '\n' + ); + + await new Promise((r) => setTimeout(r, 1)); + + const [result2] = await of(promise2); + expect(result2).toEqual({ b: '2' }); + }); + }); + describe('when stream closes prematurely', () => { test('rejects pending promises with CONNECTION error code', async () => { const { fetchStreaming, stream } = setup(); diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index 89793fff6b325..1c62388a01a09 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -17,7 +17,7 @@ * under the License. */ -import { defer, Defer } from '../../../kibana_utils/public'; +import { AbortError, defer } from '../../../kibana_utils/public'; import { ItemBufferParams, TimedItemBufferParams, @@ -27,13 +27,8 @@ import { } from '../../common'; import { fetchStreaming, split } from '../streaming'; import { normalizeError } from '../../common'; - -export interface BatchItem { - payload: Payload; - future: Defer; -} - -export type BatchedFunc = (payload: Payload) => Promise; +import { BatchedFunc, BatchItem } from './types'; +import { isBatchDone, getBatchDone$ } from './batch_utils'; export interface BatchedFunctionProtocolError extends ErrorLike { code: string; @@ -83,31 +78,48 @@ export const createStreamingBatchedFunction = ( maxItemAge = 10, } = params; const [fn] = createBatchedFunction, BatchItem>({ - onCall: (payload: Payload) => { + onCall: (payload: Payload, signal?: AbortSignal) => { const future = defer(); const entry: BatchItem = { payload, future, + signal, + done: false, }; return [future.promise, entry]; }, onBatch: async (items) => { try { - let responsesReceived = 0; - const batch = items.map(({ payload }) => payload); + // Filter out and reject any items who's signal is already aborted + items = items.filter((item) => { + if (item.signal?.aborted) { + item.future.reject(new AbortError()); + } + return !item.signal?.aborted; + }); + + // Prepare batch + const batch = items.map((item) => { + const rejectAborted = () => { + item.future.reject(new AbortError()); + item.signal?.removeEventListener('abort', rejectAborted); + }; + item.signal?.addEventListener('abort', rejectAborted); + return item.payload; + }); + const { stream } = fetchStreamingInjected({ url, body: JSON.stringify({ batch }), method: 'POST', + abort$: getBatchDone$(items), }); stream.pipe(split('\n')).subscribe({ next: (json: string) => { const response = JSON.parse(json) as BatchResponseItem; if (response.error) { - responsesReceived++; items[response.id].future.reject(response.error); } else if (response.result !== undefined) { - responsesReceived++; items[response.id].future.resolve(response.result); } }, @@ -117,7 +129,7 @@ export const createStreamingBatchedFunction = ( for (const { future } of items) future.reject(normalizedError); }, complete: () => { - const streamTerminatedPrematurely = responsesReceived !== items.length; + const streamTerminatedPrematurely = !isBatchDone(items); if (streamTerminatedPrematurely) { const error: BatchedFunctionProtocolError = { message: 'Connection terminated prematurely.', diff --git a/src/plugins/bfetch/public/batching/types.ts b/src/plugins/bfetch/public/batching/types.ts new file mode 100644 index 0000000000000..d59d6b3d5d641 --- /dev/null +++ b/src/plugins/bfetch/public/batching/types.ts @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Defer } from '../../../kibana_utils/public'; + +export interface BatchItem { + payload: Payload; + future: Defer; + signal?: AbortSignal; + done: boolean; +} + +export type BatchedFunc = ( + payload: Payload, + signal?: AbortSignal +) => Promise; diff --git a/src/plugins/bfetch/public/index.ts b/src/plugins/bfetch/public/index.ts index 9aaa7804fe844..7ff110105faa0 100644 --- a/src/plugins/bfetch/public/index.ts +++ b/src/plugins/bfetch/public/index.ts @@ -23,7 +23,7 @@ import { BfetchPublicPlugin } from './plugin'; export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin'; export { split } from './streaming'; -export { BatchedFunc } from './batching/create_streaming_batched_function'; +export { BatchedFunc } from './batching/types'; export function plugin(initializerContext: PluginInitializerContext) { return new BfetchPublicPlugin(initializerContext); diff --git a/src/plugins/bfetch/public/plugin.ts b/src/plugins/bfetch/public/plugin.ts index 5f01957c0908e..72aaa862b0ad2 100644 --- a/src/plugins/bfetch/public/plugin.ts +++ b/src/plugins/bfetch/public/plugin.ts @@ -22,9 +22,9 @@ import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './ import { removeLeadingSlash } from '../common'; import { createStreamingBatchedFunction, - BatchedFunc, StreamingBatchedFunctionParams, } from './batching/create_streaming_batched_function'; +import { BatchedFunc } from './batching/types'; // eslint-disable-next-line export interface BfetchPublicSetupDependencies {} diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts index 27adc6dc8b549..48f29c2827acf 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts @@ -19,6 +19,7 @@ import { fetchStreaming } from './fetch_streaming'; import { mockXMLHttpRequest } from '../test_helpers/xhr'; +import { Subject } from 'rxjs'; const tick = () => new Promise((resolve) => setTimeout(resolve, 1)); @@ -132,6 +133,33 @@ test('completes stream observable when request finishes', async () => { expect(spy).toHaveBeenCalledTimes(1); }); +test('completes stream observable when aborted', async () => { + const env = setup(); + const abort$ = new Subject(); + const { stream } = fetchStreaming({ + url: 'http://example.com', + abort$, + }); + + const spy = jest.fn(); + stream.subscribe({ + complete: spy, + }); + + expect(spy).toHaveBeenCalledTimes(0); + + (env.xhr as any).responseText = 'foo'; + env.xhr.onprogress!({} as any); + + abort$.next(true); + + (env.xhr as any).readyState = 4; + (env.xhr as any).status = 200; + env.xhr.onreadystatechange!({} as any); + + expect(spy).toHaveBeenCalledTimes(1); +}); + test('promise throws when request errors', async () => { const env = setup(); const { stream } = fetchStreaming({ diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.ts index 899e8a1824a41..2a9d4ac894b3e 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.ts @@ -17,6 +17,7 @@ * under the License. */ +import { Observable } from 'rxjs'; import { fromStreamingXhr } from './from_streaming_xhr'; export interface FetchStreamingParams { @@ -24,6 +25,7 @@ export interface FetchStreamingParams { headers?: Record; method?: 'GET' | 'POST'; body?: string; + abort$?: Observable; } /** @@ -35,6 +37,7 @@ export function fetchStreaming({ headers = {}, method = 'POST', body = '', + abort$, }: FetchStreamingParams) { const xhr = new window.XMLHttpRequest(); @@ -45,7 +48,7 @@ export function fetchStreaming({ // Set the HTTP headers Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v)); - const stream = fromStreamingXhr(xhr); + const stream = fromStreamingXhr(xhr, abort$); // Send the payload to the server xhr.send(body); diff --git a/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts b/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts index 40eb3d5e2556b..3e51c73815804 100644 --- a/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts +++ b/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts @@ -17,10 +17,12 @@ * under the License. */ +import { Subject } from 'rxjs'; import { fromStreamingXhr } from './from_streaming_xhr'; const createXhr = (): XMLHttpRequest => (({ + abort: () => {}, onprogress: () => {}, onreadystatechange: () => {}, readyState: 0, @@ -100,6 +102,39 @@ test('completes observable when request reaches end state', () => { expect(complete).toHaveBeenCalledTimes(1); }); +test('completes observable when aborted', () => { + const xhr = createXhr(); + const abort$ = new Subject(); + const observable = fromStreamingXhr(xhr, abort$); + + const next = jest.fn(); + const complete = jest.fn(); + observable.subscribe({ + next, + complete, + }); + + (xhr as any).responseText = '1'; + xhr.onprogress!({} as any); + + (xhr as any).responseText = '2'; + xhr.onprogress!({} as any); + + expect(complete).toHaveBeenCalledTimes(0); + + (xhr as any).readyState = 2; + abort$.next(true); + + expect(complete).toHaveBeenCalledTimes(1); + + // Shouldn't trigger additional events + (xhr as any).readyState = 4; + (xhr as any).status = 200; + xhr.onreadystatechange!({} as any); + + expect(complete).toHaveBeenCalledTimes(1); +}); + test('errors observable if request returns with error', () => { const xhr = createXhr(); const observable = fromStreamingXhr(xhr); diff --git a/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts index bba8151958492..819ddc62ab72b 100644 --- a/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts +++ b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts @@ -26,13 +26,17 @@ import { Observable, Subject } from 'rxjs'; export const fromStreamingXhr = ( xhr: Pick< XMLHttpRequest, - 'onprogress' | 'onreadystatechange' | 'readyState' | 'status' | 'responseText' - > + 'onprogress' | 'onreadystatechange' | 'readyState' | 'status' | 'responseText' | 'abort' + >, + abort$?: Observable ): Observable => { const subject = new Subject(); let index = 0; + let aborted = false; const processBatch = () => { + if (aborted) return; + const { responseText } = xhr; if (index >= responseText.length) return; subject.next(responseText.substr(index)); @@ -41,6 +45,15 @@ export const fromStreamingXhr = ( xhr.onprogress = processBatch; + const sub = abort$?.subscribe((done) => { + if (done && xhr.readyState !== 4) { + aborted = true; + xhr.abort(); + subject.complete(); + sub?.unsubscribe(); + } + }); + xhr.onreadystatechange = () => { // Older browsers don't support onprogress, so we need // to call this here, too. It's safe to call this multiple @@ -48,7 +61,7 @@ export const fromStreamingXhr = ( processBatch(); // 4 is the magic number that means the request is done - if (xhr.readyState === 4) { + if (!aborted && xhr.readyState === 4) { // 0 indicates a network failure. 400+ messages are considered server errors if (xhr.status === 0 || xhr.status >= 400) { subject.error(new Error(`Batch request failed with status ${xhr.status}`)); diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts index 313b2b4eba878..603882a5952a6 100644 --- a/src/plugins/data/public/search/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -140,7 +140,7 @@ export class SearchInterceptor { strategy?: string ): Promise { if (this.batchedFetch) { - return this.batchedFetch({ request, strategy }); + return this.batchedFetch({ request, strategy }, signal); } else { const { id, ...searchRequest } = request; const path = trimEnd(`/internal/search/${strategy || ES_SEARCH_STRATEGY}/${id || ''}`, '/'); From 2cdfc472b982be515a6e63b8ac2d72b34f7c2169 Mon Sep 17 00:00:00 2001 From: Liza K Date: Mon, 16 Nov 2020 21:29:28 +0200 Subject: [PATCH 04/25] fix jest --- .../data/server/search/search_service.test.ts | 18 +++++++++++++++++- src/plugins/kibana_utils/common/defer.ts | 4 ++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/plugins/data/server/search/search_service.test.ts b/src/plugins/data/server/search/search_service.test.ts index 0700afd8d6c83..8a52d1d415f9b 100644 --- a/src/plugins/data/server/search/search_service.test.ts +++ b/src/plugins/data/server/search/search_service.test.ts @@ -25,6 +25,8 @@ import { createFieldFormatsStartMock } from '../field_formats/mocks'; import { createIndexPatternsStartMock } from '../index_patterns/mocks'; import { SearchService, SearchServiceSetupDependencies } from './search_service'; +import { bfetchPluginMock } from '../../../bfetch/server/mocks'; +import { of } from 'rxjs'; describe('Search service', () => { let plugin: SearchService; @@ -35,15 +37,29 @@ describe('Search service', () => { const mockLogger: any = { debug: () => {}, }; - plugin = new SearchService(coreMock.createPluginInitializerContext({}), mockLogger); + const context = coreMock.createPluginInitializerContext({}); + context.config.create = jest.fn().mockImplementation(() => { + return of({ + search: { + aggs: { + shardDelay: { + enabled: true, + }, + }, + }, + }); + }); + plugin = new SearchService(context, mockLogger); mockCoreSetup = coreMock.createSetup(); mockCoreStart = coreMock.createStart(); }); describe('setup()', () => { it('exposes proper contract', async () => { + const bfetch = bfetchPluginMock.createSetupContract(); const setup = plugin.setup(mockCoreSetup, ({ packageInfo: { version: '8' }, + bfetch, expressions: { registerFunction: jest.fn(), registerType: jest.fn(), diff --git a/src/plugins/kibana_utils/common/defer.ts b/src/plugins/kibana_utils/common/defer.ts index bf8fa836ed172..bf7f09e712ee0 100644 --- a/src/plugins/kibana_utils/common/defer.ts +++ b/src/plugins/kibana_utils/common/defer.ts @@ -32,10 +32,10 @@ export class Defer { public readonly resolve!: (data: T) => void; public readonly reject!: (error: any) => void; - public readonly promise: Promise = new Promise((resolve, reject) => { + public readonly promise: Promise = new Promise((resolve, reject) => { (this as any).resolve = resolve; (this as any).reject = reject; - }); + }).catch(() => {}); } export const defer = () => new Defer(); From e8fc98f797a228ce3d9080b58d598dfedc0768a8 Mon Sep 17 00:00:00 2001 From: Liza K Date: Mon, 16 Nov 2020 21:42:25 +0200 Subject: [PATCH 05/25] shim totals in oss --- .../data/server/search/es_search/es_search_strategy.ts | 7 ++++++- src/plugins/data/server/search/routes/search.ts | 8 +------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/plugins/data/server/search/es_search/es_search_strategy.ts b/src/plugins/data/server/search/es_search/es_search_strategy.ts index 3e2d415eac16f..1671990f77d92 100644 --- a/src/plugins/data/server/search/es_search/es_search_strategy.ts +++ b/src/plugins/data/server/search/es_search/es_search_strategy.ts @@ -17,7 +17,7 @@ * under the License. */ import { Observable } from 'rxjs'; -import { first } from 'rxjs/operators'; +import { first, map } from 'rxjs/operators'; import type { Logger } from 'kibana/server'; import type { ApiResponse } from '@elastic/elasticsearch'; @@ -30,6 +30,7 @@ import { getDefaultSearchParams, getShardTimeout } from '../es_search'; import type { ISearchStrategy } from '../types'; import type { SearchUsage } from '../collectors/usage'; import type { IEsRawSearchResponse } from '../../../common'; +import { shimHitsTotal } from '..'; export const esSearchStrategyProvider = ( config$: Observable, @@ -54,6 +55,10 @@ export const esSearchStrategyProvider = ( return esClient.asCurrentUser.search(params); }, abortSignal).pipe( toKibanaSearchResponse(), + map((response) => ({ + ...response, + rawResponse: shimHitsTotal(response.rawResponse), + })), trackSearchStatus(logger, usage), includeTotalLoaded() ); diff --git a/src/plugins/data/server/search/routes/search.ts b/src/plugins/data/server/search/routes/search.ts index a4161fe47b388..68c897757a62e 100644 --- a/src/plugins/data/server/search/routes/search.ts +++ b/src/plugins/data/server/search/routes/search.ts @@ -21,7 +21,6 @@ import { first } from 'rxjs/operators'; import { schema } from '@kbn/config-schema'; import type { IRouter } from 'src/core/server'; import { getRequestAbortedSignal } from '../../lib'; -import { shimHitsTotal } from './shim_hits_total'; export function registerSearchRoute(router: IRouter): void { router.post( @@ -56,12 +55,7 @@ export function registerSearchRoute(router: IRouter): void { .toPromise(); return res.ok({ - body: { - ...response, - ...{ - rawResponse: shimHitsTotal(response.rawResponse), - }, - }, + body: response, }); } catch (err) { return res.customError({ From 993e385d559ac9265a8d7bb94b2c721789cd9d45 Mon Sep 17 00:00:00 2001 From: Liza K Date: Tue, 17 Nov 2020 12:57:25 +0200 Subject: [PATCH 06/25] proper formatting for errors --- .../data/server/search/search_service.ts | 20 ++++++++++++++++--- src/plugins/kibana_utils/common/defer.ts | 4 ++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/plugins/data/server/search/search_service.ts b/src/plugins/data/server/search/search_service.ts index 46390d3db45b2..6499ab0f59583 100644 --- a/src/plugins/data/server/search/search_service.ts +++ b/src/plugins/data/server/search/search_service.ts @@ -17,7 +17,7 @@ * under the License. */ -import { BehaviorSubject, Observable } from 'rxjs'; +import { BehaviorSubject, Observable, of } from 'rxjs'; import { pick } from 'lodash'; import { CoreSetup, @@ -29,7 +29,7 @@ import { SharedGlobalConfig, StartServicesAccessor, } from 'src/core/server'; -import { first } from 'rxjs/operators'; +import { catchError, first } from 'rxjs/operators'; import { BfetchServerSetup } from 'src/plugins/bfetch/server'; import { ExpressionsServerSetup } from 'src/plugins/expressions/server'; import { @@ -153,7 +153,21 @@ export class SearchService implements Plugin { .search(requestData, { strategy, }) - .pipe(first()) + .pipe( + first(), + catchError((err) => { + // eslint-disable-next-line no-throw-literal + throw { + statusCode: err.statusCode || 500, + body: { + message: err.message, + attributes: { + error: err.body?.error || err.message, + }, + }, + }; + }) + ) .toPromise(); }, }; diff --git a/src/plugins/kibana_utils/common/defer.ts b/src/plugins/kibana_utils/common/defer.ts index bf7f09e712ee0..bf8fa836ed172 100644 --- a/src/plugins/kibana_utils/common/defer.ts +++ b/src/plugins/kibana_utils/common/defer.ts @@ -32,10 +32,10 @@ export class Defer { public readonly resolve!: (data: T) => void; public readonly reject!: (error: any) => void; - public readonly promise: Promise = new Promise((resolve, reject) => { + public readonly promise: Promise = new Promise((resolve, reject) => { (this as any).resolve = resolve; (this as any).reject = reject; - }).catch(() => {}); + }); } export const defer = () => new Defer(); From e89a46f693d4bacb82f44c79689b4c620ef395e6 Mon Sep 17 00:00:00 2001 From: Liza K Date: Tue, 17 Nov 2020 13:17:32 +0200 Subject: [PATCH 07/25] jest, types and docs --- .../kibana-plugin-plugins-data-public.plugin.setup.md | 4 ++-- ...lugins-data-public.searchinterceptordeps.bfetch.md | 11 +++++++++++ ...lugin-plugins-data-public.searchinterceptordeps.md | 1 + .../kibana-plugin-plugins-data-server.plugin.setup.md | 4 ++-- src/plugins/data/public/public.api.md | 5 ++++- src/plugins/data/public/search/search_interceptor.ts | 2 +- src/plugins/data/public/search/search_service.test.ts | 3 +++ src/plugins/data/server/search/search_service.ts | 2 +- src/plugins/data/server/server.api.md | 5 +++-- 9 files changed, 28 insertions(+), 9 deletions(-) create mode 100644 docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md diff --git a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.plugin.setup.md b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.plugin.setup.md index a0c9b38792825..1ed6059c23062 100644 --- a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.plugin.setup.md +++ b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.plugin.setup.md @@ -7,7 +7,7 @@ Signature: ```typescript -setup(core: CoreSetup, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup; +setup(core: CoreSetup, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup; ``` ## Parameters @@ -15,7 +15,7 @@ setup(core: CoreSetup, { expressio | Parameter | Type | Description | | --- | --- | --- | | core | CoreSetup<DataStartDependencies, DataPublicPluginStart> | | -| { expressions, uiActions, usageCollection } | DataSetupDependencies | | +| { bfetch, expressions, uiActions, usageCollection } | DataSetupDependencies | | Returns: diff --git a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md new file mode 100644 index 0000000000000..5b7c635c71529 --- /dev/null +++ b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-plugins-data-public](./kibana-plugin-plugins-data-public.md) > [SearchInterceptorDeps](./kibana-plugin-plugins-data-public.searchinterceptordeps.md) > [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) + +## SearchInterceptorDeps.bfetch property + +Signature: + +```typescript +bfetch: BfetchPublicSetup; +``` diff --git a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.md b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.md index 3653394d28b92..543566b783c23 100644 --- a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.md +++ b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.md @@ -14,6 +14,7 @@ export interface SearchInterceptorDeps | Property | Type | Description | | --- | --- | --- | +| [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) | BfetchPublicSetup | | | [http](./kibana-plugin-plugins-data-public.searchinterceptordeps.http.md) | CoreSetup['http'] | | | [session](./kibana-plugin-plugins-data-public.searchinterceptordeps.session.md) | ISessionService | | | [startServices](./kibana-plugin-plugins-data-public.searchinterceptordeps.startservices.md) | Promise<[CoreStart, any, unknown]> | | diff --git a/docs/development/plugins/data/server/kibana-plugin-plugins-data-server.plugin.setup.md b/docs/development/plugins/data/server/kibana-plugin-plugins-data-server.plugin.setup.md index 43129891c5412..b90018c3d9cdd 100644 --- a/docs/development/plugins/data/server/kibana-plugin-plugins-data-server.plugin.setup.md +++ b/docs/development/plugins/data/server/kibana-plugin-plugins-data-server.plugin.setup.md @@ -7,7 +7,7 @@ Signature: ```typescript -setup(core: CoreSetup, { expressions, usageCollection }: DataPluginSetupDependencies): { +setup(core: CoreSetup, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): { __enhance: (enhancements: DataEnhancements) => void; search: ISearchSetup; fieldFormats: { @@ -21,7 +21,7 @@ setup(core: CoreSetup, { expressio | Parameter | Type | Description | | --- | --- | --- | | core | CoreSetup<DataPluginStartDependencies, DataPluginStart> | | -| { expressions, usageCollection } | DataPluginSetupDependencies | | +| { bfetch, expressions, usageCollection } | DataPluginSetupDependencies | | Returns: diff --git a/src/plugins/data/public/public.api.md b/src/plugins/data/public/public.api.md index 78b974758f8c0..b82e235811650 100644 --- a/src/plugins/data/public/public.api.md +++ b/src/plugins/data/public/public.api.md @@ -12,6 +12,7 @@ import { ApiResponse as ApiResponse_2 } from '@elastic/elasticsearch/lib/Transpo import { ApplicationStart } from 'kibana/public'; import { Assign } from '@kbn/utility-types'; import { BehaviorSubject } from 'rxjs'; +import { BfetchPublicSetup } from 'src/plugins/bfetch/public'; import Boom from '@hapi/boom'; import { CoreSetup } from 'src/core/public'; import { CoreSetup as CoreSetup_2 } from 'kibana/public'; @@ -1690,7 +1691,7 @@ export class Plugin implements Plugin_2); // (undocumented) - setup(core: CoreSetup, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup; + setup(core: CoreSetup, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup; // (undocumented) start(core: CoreStart_2, { uiActions }: DataStartDependencies): DataPublicPluginStart; // (undocumented) @@ -2065,6 +2066,8 @@ export class SearchInterceptor { // // @public (undocumented) export interface SearchInterceptorDeps { + // (undocumented) + bfetch: BfetchPublicSetup; // (undocumented) http: CoreSetup_2['http']; // (undocumented) diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts index 603882a5952a6..2255ac536bd51 100644 --- a/src/plugins/data/public/search/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -71,7 +71,7 @@ export class SearchInterceptor { * @internal */ protected application!: CoreStart['application']; - protected batchedFetch?: BatchedFunc< + private batchedFetch?: BatchedFunc< { request: IKibanaSearchRequest; strategy?: string }, IKibanaSearchResponse >; diff --git a/src/plugins/data/public/search/search_service.test.ts b/src/plugins/data/public/search/search_service.test.ts index 20041a02067d9..3179da4d03a1a 100644 --- a/src/plugins/data/public/search/search_service.test.ts +++ b/src/plugins/data/public/search/search_service.test.ts @@ -21,6 +21,7 @@ import { coreMock } from '../../../../core/public/mocks'; import { CoreSetup, CoreStart } from '../../../../core/public'; import { SearchService, SearchServiceSetupDependencies } from './search_service'; +import { bfetchPluginMock } from '../../../bfetch/public/mocks'; describe('Search service', () => { let searchService: SearchService; @@ -39,8 +40,10 @@ describe('Search service', () => { describe('setup()', () => { it('exposes proper contract', async () => { + const bfetch = bfetchPluginMock.createSetupContract(); const setup = searchService.setup(mockCoreSetup, ({ packageInfo: { version: '8' }, + bfetch, expressions: { registerFunction: jest.fn(), registerType: jest.fn() }, } as unknown) as SearchServiceSetupDependencies); expect(setup).toHaveProperty('aggs'); diff --git a/src/plugins/data/server/search/search_service.ts b/src/plugins/data/server/search/search_service.ts index 6499ab0f59583..bebb35c74e3b0 100644 --- a/src/plugins/data/server/search/search_service.ts +++ b/src/plugins/data/server/search/search_service.ts @@ -17,7 +17,7 @@ * under the License. */ -import { BehaviorSubject, Observable, of } from 'rxjs'; +import { BehaviorSubject, Observable } from 'rxjs'; import { pick } from 'lodash'; import { CoreSetup, diff --git a/src/plugins/data/server/server.api.md b/src/plugins/data/server/server.api.md index bb7a8f58c926c..061315bae5906 100644 --- a/src/plugins/data/server/server.api.md +++ b/src/plugins/data/server/server.api.md @@ -9,6 +9,7 @@ import { Adapters } from 'src/plugins/inspector/common'; import { ApiResponse } from '@elastic/elasticsearch'; import { Assign } from '@kbn/utility-types'; import { BehaviorSubject } from 'rxjs'; +import { BfetchServerSetup } from 'src/plugins/bfetch/server'; import { ConfigDeprecationProvider } from '@kbn/config'; import { CoreSetup } from 'src/core/server'; import { CoreSetup as CoreSetup_2 } from 'kibana/server'; @@ -936,7 +937,7 @@ export function parseInterval(interval: string): moment.Duration | null; export class Plugin implements Plugin_2 { constructor(initializerContext: PluginInitializerContext_2); // (undocumented) - setup(core: CoreSetup, { expressions, usageCollection }: DataPluginSetupDependencies): { + setup(core: CoreSetup, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): { __enhance: (enhancements: DataEnhancements) => void; search: ISearchSetup; fieldFormats: { @@ -1234,7 +1235,7 @@ export function usageProvider(core: CoreSetup_2): SearchUsage; // src/plugins/data/server/index.ts:284:1 - (ae-forgotten-export) The symbol "propFilter" needs to be exported by the entry point index.d.ts // src/plugins/data/server/index.ts:287:1 - (ae-forgotten-export) The symbol "toAbsoluteDates" needs to be exported by the entry point index.d.ts // src/plugins/data/server/index_patterns/index_patterns_service.ts:58:14 - (ae-forgotten-export) The symbol "IndexPatternsService" needs to be exported by the entry point index.d.ts -// src/plugins/data/server/plugin.ts:88:66 - (ae-forgotten-export) The symbol "DataEnhancements" needs to be exported by the entry point index.d.ts +// src/plugins/data/server/plugin.ts:90:74 - (ae-forgotten-export) The symbol "DataEnhancements" needs to be exported by the entry point index.d.ts // src/plugins/data/server/search/types.ts:104:5 - (ae-forgotten-export) The symbol "ISearchStartSearchSource" needs to be exported by the entry point index.d.ts // (No @packageDocumentation comment for this package) From 3de838c8996b2a00f8d96de9bdac1d0174aad616 Mon Sep 17 00:00:00 2001 From: Liza K Date: Tue, 17 Nov 2020 16:29:48 +0200 Subject: [PATCH 08/25] Fix doc --- src/plugins/embeddable/public/public.api.md | 1 + 1 file changed, 1 insertion(+) diff --git a/src/plugins/embeddable/public/public.api.md b/src/plugins/embeddable/public/public.api.md index 6a2565edf2f67..7b31b04862192 100644 --- a/src/plugins/embeddable/public/public.api.md +++ b/src/plugins/embeddable/public/public.api.md @@ -12,6 +12,7 @@ import { ApiResponse as ApiResponse_2 } from '@elastic/elasticsearch'; import { ApplicationStart as ApplicationStart_2 } from 'kibana/public'; import { Assign } from '@kbn/utility-types'; import { BehaviorSubject } from 'rxjs'; +import { BfetchPublicSetup } from 'src/plugins/bfetch/public'; import Boom from '@hapi/boom'; import { CoreSetup as CoreSetup_2 } from 'src/core/public'; import { CoreSetup as CoreSetup_3 } from 'kibana/public'; From 17de9fa257a63e66912d98f3b6feec1d26a8a752 Mon Sep 17 00:00:00 2001 From: Liza K Date: Tue, 17 Nov 2020 16:52:11 +0200 Subject: [PATCH 09/25] Remove old search code and rename UI Setting --- .../data/public/search/search_interceptor.ts | 20 +-- .../data/server/search/routes/search.test.ts | 119 ------------------ .../data/server/search/routes/search.ts | 50 -------- src/plugins/data/server/ui_settings.ts | 7 +- 4 files changed, 6 insertions(+), 190 deletions(-) delete mode 100644 src/plugins/data/server/search/routes/search.test.ts diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts index 2255ac536bd51..acc19339fd0c6 100644 --- a/src/plugins/data/public/search/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -17,7 +17,7 @@ * under the License. */ -import { get, memoize, trimEnd } from 'lodash'; +import { get, memoize } from 'lodash'; import { BehaviorSubject, throwError, timer, defer, from, Observable, NEVER } from 'rxjs'; import { catchError, finalize } from 'rxjs/operators'; import { PublicMethodsOf } from '@kbn/utility-types'; @@ -29,7 +29,6 @@ import { IKibanaSearchResponse, ISearchOptions, ISessionService, - ES_SEARCH_STRATEGY, } from '../../common'; import { SearchUsageCollector } from './collectors'; import { @@ -71,7 +70,7 @@ export class SearchInterceptor { * @internal */ protected application!: CoreStart['application']; - private batchedFetch?: BatchedFunc< + private batchedFetch!: BatchedFunc< { request: IKibanaSearchRequest; strategy?: string }, IKibanaSearchResponse >; @@ -139,20 +138,7 @@ export class SearchInterceptor { signal: AbortSignal, strategy?: string ): Promise { - if (this.batchedFetch) { - return this.batchedFetch({ request, strategy }, signal); - } else { - const { id, ...searchRequest } = request; - const path = trimEnd(`/internal/search/${strategy || ES_SEARCH_STRATEGY}/${id || ''}`, '/'); - const body = JSON.stringify(searchRequest); - - return this.deps.http.fetch({ - method: 'POST', - path, - body, - signal, - }); - } + return this.batchedFetch({ request, strategy }, signal); } /** diff --git a/src/plugins/data/server/search/routes/search.test.ts b/src/plugins/data/server/search/routes/search.test.ts deleted file mode 100644 index 495cb1c9ea770..0000000000000 --- a/src/plugins/data/server/search/routes/search.test.ts +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import type { MockedKeys } from '@kbn/utility-types/jest'; -import { from } from 'rxjs'; -import { CoreSetup, RequestHandlerContext } from 'src/core/server'; -import { coreMock, httpServerMock } from '../../../../../../src/core/server/mocks'; -import { registerSearchRoute } from './search'; -import { DataPluginStart } from '../../plugin'; - -describe('Search service', () => { - let mockCoreSetup: MockedKeys>; - - beforeEach(() => { - mockCoreSetup = coreMock.createSetup(); - }); - - it('handler calls context.search.search with the given request and strategy', async () => { - const response = { - id: 'yay', - rawResponse: { - took: 100, - timed_out: true, - _shards: { - total: 0, - successful: 0, - failed: 0, - skipped: 0, - }, - hits: { - total: 0, - max_score: 0, - hits: [], - }, - }, - }; - - const mockContext = { - search: { - search: jest.fn().mockReturnValue(from(Promise.resolve(response))), - }, - }; - - const mockBody = { id: undefined, params: {} }; - const mockParams = { strategy: 'foo' }; - const mockRequest = httpServerMock.createKibanaRequest({ - body: mockBody, - params: mockParams, - }); - const mockResponse = httpServerMock.createResponseFactory(); - - registerSearchRoute(mockCoreSetup.http.createRouter()); - - const mockRouter = mockCoreSetup.http.createRouter.mock.results[0].value; - const handler = mockRouter.post.mock.calls[0][1]; - await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse); - - expect(mockContext.search.search).toBeCalled(); - expect(mockContext.search.search.mock.calls[0][0]).toStrictEqual(mockBody); - expect(mockResponse.ok).toBeCalled(); - expect(mockResponse.ok.mock.calls[0][0]).toEqual({ - body: response, - }); - }); - - it('handler throws an error if the search throws an error', async () => { - const rejectedValue = from( - Promise.reject({ - message: 'oh no', - body: { - error: 'oops', - }, - }) - ); - - const mockContext = { - search: { - search: jest.fn().mockReturnValue(rejectedValue), - }, - }; - - const mockBody = { id: undefined, params: {} }; - const mockParams = { strategy: 'foo' }; - const mockRequest = httpServerMock.createKibanaRequest({ - body: mockBody, - params: mockParams, - }); - const mockResponse = httpServerMock.createResponseFactory(); - - registerSearchRoute(mockCoreSetup.http.createRouter()); - - const mockRouter = mockCoreSetup.http.createRouter.mock.results[0].value; - const handler = mockRouter.post.mock.calls[0][1]; - await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse); - - expect(mockContext.search.search).toBeCalled(); - expect(mockContext.search.search.mock.calls[0][0]).toStrictEqual(mockBody); - expect(mockResponse.customError).toBeCalled(); - const error: any = mockResponse.customError.mock.calls[0][0]; - expect(error.body.message).toBe('oh no'); - expect(error.body.attributes.error).toBe('oops'); - }); -}); diff --git a/src/plugins/data/server/search/routes/search.ts b/src/plugins/data/server/search/routes/search.ts index 68c897757a62e..8f3634751d191 100644 --- a/src/plugins/data/server/search/routes/search.ts +++ b/src/plugins/data/server/search/routes/search.ts @@ -17,60 +17,10 @@ * under the License. */ -import { first } from 'rxjs/operators'; import { schema } from '@kbn/config-schema'; import type { IRouter } from 'src/core/server'; -import { getRequestAbortedSignal } from '../../lib'; export function registerSearchRoute(router: IRouter): void { - router.post( - { - path: '/internal/search/{strategy}/{id?}', - validate: { - params: schema.object({ - strategy: schema.string(), - id: schema.maybe(schema.string()), - }), - - query: schema.object({}, { unknowns: 'allow' }), - - body: schema.object({}, { unknowns: 'allow' }), - }, - }, - async (context, request, res) => { - const searchRequest = request.body; - const { strategy, id } = request.params; - const abortSignal = getRequestAbortedSignal(request.events.aborted$); - - try { - const response = await context - .search!.search( - { ...searchRequest, id }, - { - abortSignal, - strategy, - } - ) - .pipe(first()) - .toPromise(); - - return res.ok({ - body: response, - }); - } catch (err) { - return res.customError({ - statusCode: err.statusCode || 500, - body: { - message: err.message, - attributes: { - error: err.body?.error || err.message, - }, - }, - }); - } - } - ); - router.delete( { path: '/internal/search/{strategy}/{id}', diff --git a/src/plugins/data/server/ui_settings.ts b/src/plugins/data/server/ui_settings.ts index 9393700a0e771..f5360f626ac66 100644 --- a/src/plugins/data/server/ui_settings.ts +++ b/src/plugins/data/server/ui_settings.ts @@ -267,14 +267,13 @@ export function getUiSettings(): Record> { }, [UI_SETTINGS.COURIER_BATCH_SEARCHES]: { name: i18n.translate('data.advancedSettings.courier.batchSearchesTitle', { - defaultMessage: 'Batch concurrent searches', + defaultMessage: 'Use legacy search', }), value: false, type: 'boolean', description: i18n.translate('data.advancedSettings.courier.batchSearchesText', { - defaultMessage: `When disabled, dashboard panels will load individually, and search requests will terminate when users navigate - away or update the query. When enabled, dashboard panels will load together when all of the data is loaded, and - searches will not terminate.`, + defaultMessage: `Kibana uses a new search and batching infrastructure. + Enable this option if you prefer to fallback to the legacy synchronous behavior`, }), deprecation: { message: i18n.translate('data.advancedSettings.courier.batchSearchesTextDeprecation', { From 56e7a466d322ae8f77d28fa409e8c328fe2d16b9 Mon Sep 17 00:00:00 2001 From: Liza K Date: Tue, 17 Nov 2020 18:00:06 +0200 Subject: [PATCH 10/25] jest mocks --- .../public/search/search_interceptor.test.ts | 26 ++++++++++++------- .../public/search/search_interceptor.test.ts | 21 ++++++++------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/plugins/data/public/search/search_interceptor.test.ts b/src/plugins/data/public/search/search_interceptor.test.ts index bc405ada0b864..bb5d3f58e6c1e 100644 --- a/src/plugins/data/public/search/search_interceptor.test.ts +++ b/src/plugins/data/public/search/search_interceptor.test.ts @@ -26,9 +26,12 @@ import { SearchTimeoutError, PainlessError, TimeoutErrorMode } from './errors'; import { searchServiceMock } from './mocks'; import { ISearchStart } from '.'; import { bfetchPluginMock } from '../../../bfetch/public/mocks'; +import { BfetchPublicSetup } from 'src/plugins/bfetch/public'; let searchInterceptor: SearchInterceptor; let mockCoreSetup: MockedKeys; +let bfetchSetup: jest.Mocked; +let fetchMock: jest.Mock; const flushPromises = () => new Promise((resolve) => setImmediate(resolve)); jest.useFakeTimers(); @@ -40,8 +43,11 @@ describe('SearchInterceptor', () => { mockCoreSetup = coreMock.createSetup(); mockCoreStart = coreMock.createStart(); searchMock = searchServiceMock.createStartContract(); + fetchMock = jest.fn(); + bfetchSetup = bfetchPluginMock.createSetupContract(); + bfetchSetup.batchedFunction.mockReturnValue(fetchMock); searchInterceptor = new SearchInterceptor({ - bfetch: bfetchPluginMock.createSetupContract(), + bfetch: bfetchSetup, toasts: mockCoreSetup.notifications.toasts, startServices: new Promise((resolve) => { resolve([mockCoreStart, {}, {}]); @@ -96,7 +102,7 @@ describe('SearchInterceptor', () => { describe('search', () => { test('Observable should resolve if fetch is successful', async () => { const mockResponse: any = { result: 200 }; - mockCoreSetup.http.fetch.mockResolvedValueOnce(mockResponse); + fetchMock.mockResolvedValueOnce(mockResponse); const mockRequest: IEsSearchRequest = { params: {}, }; @@ -107,7 +113,7 @@ describe('SearchInterceptor', () => { describe('Should throw typed errors', () => { test('Observable should fail if fetch has an internal error', async () => { const mockResponse: any = new Error('Internal Error'); - mockCoreSetup.http.fetch.mockRejectedValue(mockResponse); + fetchMock.mockRejectedValue(mockResponse); const mockRequest: IEsSearchRequest = { params: {}, }; @@ -123,7 +129,7 @@ describe('SearchInterceptor', () => { message: 'Request timed out', }, }; - mockCoreSetup.http.fetch.mockRejectedValueOnce(mockResponse); + fetchMock.mockRejectedValueOnce(mockResponse); const mockRequest: IEsSearchRequest = { params: {}, }; @@ -139,7 +145,7 @@ describe('SearchInterceptor', () => { message: 'Request timed out', }, }; - mockCoreSetup.http.fetch.mockRejectedValue(mockResponse); + fetchMock.mockRejectedValue(mockResponse); const mockRequest: IEsSearchRequest = { params: {}, }; @@ -160,7 +166,7 @@ describe('SearchInterceptor', () => { message: 'Request timed out', }, }; - mockCoreSetup.http.fetch.mockRejectedValue(mockResponse); + fetchMock.mockRejectedValue(mockResponse); const mockRequest: IEsSearchRequest = { params: {}, }; @@ -181,7 +187,7 @@ describe('SearchInterceptor', () => { message: 'Request timed out', }, }; - mockCoreSetup.http.fetch.mockRejectedValue(mockResponse); + fetchMock.mockRejectedValue(mockResponse); const mockRequest: IEsSearchRequest = { params: {}, }; @@ -214,7 +220,7 @@ describe('SearchInterceptor', () => { }, }, }; - mockCoreSetup.http.fetch.mockRejectedValueOnce(mockResponse); + fetchMock.mockRejectedValueOnce(mockResponse); const mockRequest: IEsSearchRequest = { params: {}, }; @@ -224,7 +230,7 @@ describe('SearchInterceptor', () => { test('Observable should fail if user aborts (test merged signal)', async () => { const abortController = new AbortController(); - mockCoreSetup.http.fetch.mockImplementationOnce((options: any) => { + fetchMock.mockImplementationOnce((options: any) => { return new Promise((resolve, reject) => { options.signal.addEventListener('abort', () => { reject(new AbortError()); @@ -262,7 +268,7 @@ describe('SearchInterceptor', () => { const error = (e: any) => { expect(e).toBeInstanceOf(AbortError); - expect(mockCoreSetup.http.fetch).not.toBeCalled(); + expect(fetchMock).not.toBeCalled(); done(); }; response.subscribe({ error }); diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts index 9b6632c6f4c8c..0dbdd613d1275 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts @@ -25,12 +25,13 @@ const complete = jest.fn(); let searchInterceptor: EnhancedSearchInterceptor; let mockCoreSetup: MockedKeys; let mockCoreStart: MockedKeys; +let fetchMock: jest.Mock; jest.useFakeTimers(); function mockFetchImplementation(responses: any[]) { let i = 0; - mockCoreSetup.http.fetch.mockImplementation(() => { + fetchMock.mockImplementation(() => { const { time = 0, value = {}, isError = false } = responses[i++]; return new Promise((resolve, reject) => setTimeout(() => { @@ -47,6 +48,7 @@ describe('EnhancedSearchInterceptor', () => { mockCoreSetup = coreMock.createSetup(); mockCoreStart = coreMock.createStart(); const dataPluginMockStart = dataPluginMock.createStartContract(); + fetchMock = jest.fn(); mockCoreSetup.uiSettings.get.mockImplementation((name: string) => { switch (name) { @@ -76,6 +78,7 @@ describe('EnhancedSearchInterceptor', () => { }); const bfetchMock = bfetchPluginMock.createSetupContract(); + bfetchMock.batchedFunction.mockReturnValue(fetchMock); searchInterceptor = new EnhancedSearchInterceptor({ bfetch: bfetchMock, @@ -251,7 +254,7 @@ describe('EnhancedSearchInterceptor', () => { expect(error).toHaveBeenCalled(); expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError); - expect(mockCoreSetup.http.fetch).toHaveBeenCalledTimes(2); + expect(fetchMock).toHaveBeenCalledTimes(2); expect(mockCoreSetup.http.delete).toHaveBeenCalled(); }); @@ -275,7 +278,7 @@ describe('EnhancedSearchInterceptor', () => { expect(error).toHaveBeenCalled(); expect(error.mock.calls[0][0]).toBeInstanceOf(SearchTimeoutError); - expect(mockCoreSetup.http.fetch).toHaveBeenCalled(); + expect(fetchMock).toHaveBeenCalled(); expect(mockCoreSetup.http.delete).not.toHaveBeenCalled(); }); @@ -307,7 +310,7 @@ describe('EnhancedSearchInterceptor', () => { expect(next).toHaveBeenCalled(); expect(error).not.toHaveBeenCalled(); - expect(mockCoreSetup.http.fetch).toHaveBeenCalled(); + expect(fetchMock).toHaveBeenCalled(); expect(mockCoreSetup.http.delete).not.toHaveBeenCalled(); // Long enough to reach the timeout but not long enough to reach the next response @@ -315,7 +318,7 @@ describe('EnhancedSearchInterceptor', () => { expect(error).toHaveBeenCalled(); expect(error.mock.calls[0][0]).toBeInstanceOf(SearchTimeoutError); - expect(mockCoreSetup.http.fetch).toHaveBeenCalledTimes(2); + expect(fetchMock).toHaveBeenCalledTimes(2); expect(mockCoreSetup.http.delete).toHaveBeenCalled(); }); @@ -349,7 +352,7 @@ describe('EnhancedSearchInterceptor', () => { expect(next).toHaveBeenCalled(); expect(error).not.toHaveBeenCalled(); - expect(mockCoreSetup.http.fetch).toHaveBeenCalled(); + expect(fetchMock).toHaveBeenCalled(); expect(mockCoreSetup.http.delete).not.toHaveBeenCalled(); // Long enough to reach the timeout but not long enough to reach the next response @@ -357,7 +360,7 @@ describe('EnhancedSearchInterceptor', () => { expect(error).toHaveBeenCalled(); expect(error.mock.calls[0][0]).toBe(responses[1].value); - expect(mockCoreSetup.http.fetch).toHaveBeenCalledTimes(2); + expect(fetchMock).toHaveBeenCalledTimes(2); expect(mockCoreSetup.http.delete).toHaveBeenCalled(); }); }); @@ -389,9 +392,7 @@ describe('EnhancedSearchInterceptor', () => { await timeTravel(); - const areAllRequestsAborted = mockCoreSetup.http.fetch.mock.calls.every( - ([{ signal }]) => signal?.aborted - ); + const areAllRequestsAborted = fetchMock.mock.calls.every(([{ signal }]) => signal?.aborted); expect(areAllRequestsAborted).toBe(true); expect(mockUsageCollector.trackQueriesCancelled).toBeCalledTimes(1); }); From 5aaeefa7e5af5c7e12e57db0b53a9942020c6cee Mon Sep 17 00:00:00 2001 From: Liza K Date: Tue, 17 Nov 2020 19:09:39 +0200 Subject: [PATCH 11/25] jest --- .../data_enhanced/public/search/search_interceptor.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts index 0dbdd613d1275..6b3f498019255 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts @@ -392,7 +392,7 @@ describe('EnhancedSearchInterceptor', () => { await timeTravel(); - const areAllRequestsAborted = fetchMock.mock.calls.every(([{ signal }]) => signal?.aborted); + const areAllRequestsAborted = fetchMock.mock.calls.every((callArgs) => callArgs[1]?.aborted); expect(areAllRequestsAborted).toBe(true); expect(mockUsageCollector.trackQueriesCancelled).toBeCalledTimes(1); }); From c3934165b18665eea4001c53a1e15190ba788929 Mon Sep 17 00:00:00 2001 From: Liza K Date: Wed, 18 Nov 2020 00:02:15 +0200 Subject: [PATCH 12/25] Solve unhanled error --- src/plugins/bfetch/public/batching/batch_utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/bfetch/public/batching/batch_utils.ts b/src/plugins/bfetch/public/batching/batch_utils.ts index 64455e6259f2d..5c6399214594a 100644 --- a/src/plugins/bfetch/public/batching/batch_utils.ts +++ b/src/plugins/bfetch/public/batching/batch_utils.ts @@ -39,7 +39,7 @@ export function getBatchDone$(items: Array>) { }; item.signal?.addEventListener('abort', onDone); - item.future.promise.finally(onDone); + item.future.promise.then(onDone, onDone); }); }), filter((allDone) => allDone) From a1b29799a7ca60851cdc09626a74128a2085ebb8 Mon Sep 17 00:00:00 2001 From: Liza K Date: Wed, 18 Nov 2020 00:10:32 +0200 Subject: [PATCH 13/25] Use AbortSignal --- .../batching/create_streaming_batched_function.ts | 6 +++++- .../bfetch/public/streaming/fetch_streaming.test.ts | 6 +++--- .../bfetch/public/streaming/fetch_streaming.ts | 6 +++--- .../public/streaming/from_streaming_xhr.test.ts | 6 +++--- .../bfetch/public/streaming/from_streaming_xhr.ts | 12 +++++++----- 5 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index 1c62388a01a09..2e120208c672c 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -108,11 +108,15 @@ export const createStreamingBatchedFunction = ( return item.payload; }); + // Prepare abort controller + const abortController = new AbortController(); + getBatchDone$(items).subscribe(() => abortController.abort()); + const { stream } = fetchStreamingInjected({ url, body: JSON.stringify({ batch }), method: 'POST', - abort$: getBatchDone$(items), + signal: abortController.signal, }); stream.pipe(split('\n')).subscribe({ next: (json: string) => { diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts index 48f29c2827acf..22c344b07677a 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts @@ -135,10 +135,10 @@ test('completes stream observable when request finishes', async () => { test('completes stream observable when aborted', async () => { const env = setup(); - const abort$ = new Subject(); + const abort = new AbortController(); const { stream } = fetchStreaming({ url: 'http://example.com', - abort$, + signal: abort.signal, }); const spy = jest.fn(); @@ -151,7 +151,7 @@ test('completes stream observable when aborted', async () => { (env.xhr as any).responseText = 'foo'; env.xhr.onprogress!({} as any); - abort$.next(true); + abort.abort(); (env.xhr as any).readyState = 4; (env.xhr as any).status = 200; diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.ts index 2a9d4ac894b3e..60471063b9d28 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.ts @@ -25,7 +25,7 @@ export interface FetchStreamingParams { headers?: Record; method?: 'GET' | 'POST'; body?: string; - abort$?: Observable; + signal?: AbortSignal; } /** @@ -37,7 +37,7 @@ export function fetchStreaming({ headers = {}, method = 'POST', body = '', - abort$, + signal, }: FetchStreamingParams) { const xhr = new window.XMLHttpRequest(); @@ -48,7 +48,7 @@ export function fetchStreaming({ // Set the HTTP headers Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v)); - const stream = fromStreamingXhr(xhr, abort$); + const stream = fromStreamingXhr(xhr, signal); // Send the payload to the server xhr.send(body); diff --git a/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts b/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts index 3e51c73815804..8b6f9d76a9b05 100644 --- a/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts +++ b/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts @@ -104,8 +104,8 @@ test('completes observable when request reaches end state', () => { test('completes observable when aborted', () => { const xhr = createXhr(); - const abort$ = new Subject(); - const observable = fromStreamingXhr(xhr, abort$); + const abortController = new AbortController(); + const observable = fromStreamingXhr(xhr, abortController.signal); const next = jest.fn(); const complete = jest.fn(); @@ -123,7 +123,7 @@ test('completes observable when aborted', () => { expect(complete).toHaveBeenCalledTimes(0); (xhr as any).readyState = 2; - abort$.next(true); + abortController.abort(); expect(complete).toHaveBeenCalledTimes(1); diff --git a/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts index 819ddc62ab72b..88bedfde60287 100644 --- a/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts +++ b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts @@ -28,7 +28,7 @@ export const fromStreamingXhr = ( XMLHttpRequest, 'onprogress' | 'onreadystatechange' | 'readyState' | 'status' | 'responseText' | 'abort' >, - abort$?: Observable + signal?: AbortSignal ): Observable => { const subject = new Subject(); let index = 0; @@ -45,14 +45,16 @@ export const fromStreamingXhr = ( xhr.onprogress = processBatch; - const sub = abort$?.subscribe((done) => { - if (done && xhr.readyState !== 4) { + const onBatchAbort = () => { + if (xhr.readyState !== 4) { aborted = true; xhr.abort(); subject.complete(); - sub?.unsubscribe(); + signal?.removeEventListener('abort', onBatchAbort); } - }); + }; + + signal?.addEventListener('abort', onBatchAbort); xhr.onreadystatechange = () => { // Older browsers don't support onprogress, so we need From 162473d35ddc7436ca563f66c79465b17a5c63ef Mon Sep 17 00:00:00 2001 From: Liza K Date: Wed, 18 Nov 2020 10:19:11 +0200 Subject: [PATCH 14/25] ts --- src/plugins/bfetch/public/streaming/fetch_streaming.test.ts | 1 - src/plugins/bfetch/public/streaming/fetch_streaming.ts | 1 - src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts | 1 - 3 files changed, 3 deletions(-) diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts index 22c344b07677a..7a6827b8fee8e 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts @@ -19,7 +19,6 @@ import { fetchStreaming } from './fetch_streaming'; import { mockXMLHttpRequest } from '../test_helpers/xhr'; -import { Subject } from 'rxjs'; const tick = () => new Promise((resolve) => setTimeout(resolve, 1)); diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.ts index 60471063b9d28..3deee0cf66add 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.ts @@ -17,7 +17,6 @@ * under the License. */ -import { Observable } from 'rxjs'; import { fromStreamingXhr } from './from_streaming_xhr'; export interface FetchStreamingParams { diff --git a/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts b/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts index 8b6f9d76a9b05..b15bf9bdfbbb0 100644 --- a/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts +++ b/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts @@ -17,7 +17,6 @@ * under the License. */ -import { Subject } from 'rxjs'; import { fromStreamingXhr } from './from_streaming_xhr'; const createXhr = (): XMLHttpRequest => From 662b810ef854901ef00b153df87681bf5749f80c Mon Sep 17 00:00:00 2001 From: Liza K Date: Wed, 18 Nov 2020 11:59:37 +0200 Subject: [PATCH 15/25] code review - use abort controller instead of observable --- .../public/batching/batch_utils.test.ts | 184 ++++-------------- .../bfetch/public/batching/batch_utils.ts | 30 +-- .../create_streaming_batched_function.ts | 16 +- 3 files changed, 55 insertions(+), 175 deletions(-) diff --git a/src/plugins/bfetch/public/batching/batch_utils.test.ts b/src/plugins/bfetch/public/batching/batch_utils.test.ts index b57ec16801fbd..22923d5af92ca 100644 --- a/src/plugins/bfetch/public/batching/batch_utils.test.ts +++ b/src/plugins/bfetch/public/batching/batch_utils.test.ts @@ -18,182 +18,66 @@ */ import { BatchItem } from './types'; -import { getBatchDone$ } from './batch_utils'; +import { getDonePromise } from './batch_utils'; import { defer } from 'src/plugins/kibana_utils/common'; const tick = () => new Promise((resolve) => setTimeout(resolve, 1)); -describe('getBatchDone$()', () => { - test('Triggers when all are aborted', async () => { - const abortControllers = [new AbortController(), new AbortController()]; - const items: Array> = [ - { - future: defer(), - payload: null, - done: false, - signal: abortControllers[0].signal, - }, - { - future: defer(), - payload: null, - done: false, - signal: abortControllers[1].signal, - }, - ]; - const b = getBatchDone$(items); - - const spy = { - next: jest.fn(), - complete: jest.fn(), - }; - - b.subscribe(spy); - - abortControllers[0].abort(); - await tick(); - expect(spy.next).toHaveBeenCalledTimes(0); - - abortControllers[1].abort(); - await tick(); - expect(spy.next).toHaveBeenCalledTimes(1); - expect(spy.complete).toHaveBeenCalledTimes(1); - }); - - test('Triggers when all are resolved', async () => { - const items: Array> = [ - { - future: defer(), - payload: null, - done: false, - }, - { - future: defer(), - payload: null, - done: false, - }, - ]; - const b = getBatchDone$(items); - - const spy = { - next: jest.fn(), - complete: jest.fn(), - }; - - b.subscribe(spy); - - items[0].future.resolve(null); - await tick(); - expect(spy.next).toHaveBeenCalledTimes(0); - - items[1].future.resolve(null); - await tick(); - expect(spy.next).toHaveBeenCalledTimes(1); - expect(spy.complete).toHaveBeenCalledTimes(1); - }); - - test('Triggers when its a mix', async () => { +describe('getDonePromise()', () => { + test('Triggers when aborted', async () => { const abortController = new AbortController(); - const items: Array> = [ - { - future: defer(), - payload: null, - done: false, - }, - { - future: defer(), - payload: null, - done: false, - signal: abortController.signal, - }, - ]; - const b = getBatchDone$(items); - - const spy = { - next: jest.fn(), - complete: jest.fn(), + const item: BatchItem = { + future: defer(), + payload: null, + done: false, + signal: abortController.signal, }; + const b = getDonePromise(item); - b.subscribe(spy); + const spy = jest.fn(); - items[0].future.resolve(null); - await tick(); - expect(spy.next).toHaveBeenCalledTimes(0); + b.then(spy); abortController.abort(); await tick(); - expect(spy.next).toHaveBeenCalledTimes(1); - expect(spy.complete).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledTimes(1); }); - test('Triggers correctly if an item is resolved then aborted', async () => { + test('Triggers when resolved', async () => { const abortController = new AbortController(); - const items: Array> = [ - { - future: defer(), - payload: null, - done: false, - signal: abortController.signal, - }, - { - future: defer(), - payload: null, - done: false, - }, - ]; - const b = getBatchDone$(items); - - const spy = { - next: jest.fn(), - complete: jest.fn(), + const item: BatchItem = { + future: defer(), + payload: null, + done: false, + signal: abortController.signal, }; + const b = getDonePromise(item); - b.subscribe(spy); + const spy = jest.fn(); - items[0].future.resolve(null); - await tick(); - abortController.abort(); - await tick(); - expect(spy.next).toHaveBeenCalledTimes(0); + b.then(spy); - items[1].future.resolve(null); + item.future.resolve(null); await tick(); - expect(spy.next).toHaveBeenCalledTimes(1); - expect(spy.complete).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledTimes(1); }); - test('Triggers correctly if an item is aborted then resolved', async () => { + test('Triggers when rejected', async () => { const abortController = new AbortController(); - const items: Array> = [ - { - future: defer(), - payload: null, - done: false, - }, - { - future: defer(), - payload: null, - done: false, - signal: abortController.signal, - }, - ]; - const b = getBatchDone$(items); - - const spy = { - next: jest.fn(), - complete: jest.fn(), + const item: BatchItem = { + future: defer(), + payload: null, + done: false, + signal: abortController.signal, }; + const b = getDonePromise(item); - b.subscribe(spy); + const spy = jest.fn(); - items[0].future.resolve(null); - await tick(); - expect(spy.next).toHaveBeenCalledTimes(0); + b.then(spy); - abortController.abort(); - await tick(); - items[1].future.resolve(null); + item.future.reject(null); await tick(); - expect(spy.next).toHaveBeenCalledTimes(1); - expect(spy.complete).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledTimes(1); }); }); diff --git a/src/plugins/bfetch/public/batching/batch_utils.ts b/src/plugins/bfetch/public/batching/batch_utils.ts index 5c6399214594a..7ec3a44a6da9e 100644 --- a/src/plugins/bfetch/public/batching/batch_utils.ts +++ b/src/plugins/bfetch/public/batching/batch_utils.ts @@ -17,31 +17,19 @@ * under the License. */ -import { filter, mergeMap } from 'rxjs/operators'; -import { from } from 'rxjs'; import { BatchItem } from './types'; export function isBatchDone(items: Array>): boolean { return items.every((item) => item.done); } -export function getBatchDone$(items: Array>) { - // Triggers when all requests were resolved, rejected or aborted - return from(items).pipe( - mergeMap((item) => { - return new Promise((resolve) => { - const onDone = () => { - if (item.done) return; - - item.done = true; - item.signal?.removeEventListener('abort', onDone); - resolve(isBatchDone(items)); - }; - - item.signal?.addEventListener('abort', onDone); - item.future.promise.then(onDone, onDone); - }); - }), - filter((allDone) => allDone) - ); +export function getDonePromise(item: BatchItem) { + return new Promise((resolve) => { + const onDone = () => { + item.signal?.removeEventListener('abort', onDone); + resolve(); + }; + item.future.promise.then(onDone, onDone); + item.signal?.addEventListener('abort', onDone); + }); } diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index 2e120208c672c..e2ef35160eef8 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -28,7 +28,7 @@ import { import { fetchStreaming, split } from '../streaming'; import { normalizeError } from '../../common'; import { BatchedFunc, BatchItem } from './types'; -import { isBatchDone, getBatchDone$ } from './batch_utils'; +import { isBatchDone, getDonePromise } from './batch_utils'; export interface BatchedFunctionProtocolError extends ErrorLike { code: string; @@ -90,6 +90,9 @@ export const createStreamingBatchedFunction = ( }, onBatch: async (items) => { try { + const promises: Array> = []; + const abortController = new AbortController(); + // Filter out and reject any items who's signal is already aborted items = items.filter((item) => { if (item.signal?.aborted) { @@ -100,17 +103,22 @@ export const createStreamingBatchedFunction = ( // Prepare batch const batch = items.map((item) => { + // Subscribe to reject promise on abort const rejectAborted = () => { item.future.reject(new AbortError()); item.signal?.removeEventListener('abort', rejectAborted); }; item.signal?.addEventListener('abort', rejectAborted); + + // Track batch progress + promises.push(getDonePromise(item)); + + // Return payload to be sent return item.payload; }); - // Prepare abort controller - const abortController = new AbortController(); - getBatchDone$(items).subscribe(() => abortController.abort()); + // abort when all items were either resolved, rejected or aborted + Promise.all(promises).then(() => abortController.abort()); const { stream } = fetchStreamingInjected({ url, From 41f6406358167a4af609230a9dc34cfac968d4d9 Mon Sep 17 00:00:00 2001 From: Liza K Date: Wed, 18 Nov 2020 13:44:54 +0200 Subject: [PATCH 16/25] Revert "Remove old search code and rename UI Setting" This reverts commit 17de9fa257a63e66912d98f3b6feec1d26a8a752. --- .../data/public/search/search_interceptor.ts | 20 ++- .../data/server/search/routes/search.test.ts | 119 ++++++++++++++++++ .../data/server/search/routes/search.ts | 50 ++++++++ src/plugins/data/server/ui_settings.ts | 7 +- 4 files changed, 190 insertions(+), 6 deletions(-) create mode 100644 src/plugins/data/server/search/routes/search.test.ts diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts index acc19339fd0c6..2255ac536bd51 100644 --- a/src/plugins/data/public/search/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -17,7 +17,7 @@ * under the License. */ -import { get, memoize } from 'lodash'; +import { get, memoize, trimEnd } from 'lodash'; import { BehaviorSubject, throwError, timer, defer, from, Observable, NEVER } from 'rxjs'; import { catchError, finalize } from 'rxjs/operators'; import { PublicMethodsOf } from '@kbn/utility-types'; @@ -29,6 +29,7 @@ import { IKibanaSearchResponse, ISearchOptions, ISessionService, + ES_SEARCH_STRATEGY, } from '../../common'; import { SearchUsageCollector } from './collectors'; import { @@ -70,7 +71,7 @@ export class SearchInterceptor { * @internal */ protected application!: CoreStart['application']; - private batchedFetch!: BatchedFunc< + private batchedFetch?: BatchedFunc< { request: IKibanaSearchRequest; strategy?: string }, IKibanaSearchResponse >; @@ -138,7 +139,20 @@ export class SearchInterceptor { signal: AbortSignal, strategy?: string ): Promise { - return this.batchedFetch({ request, strategy }, signal); + if (this.batchedFetch) { + return this.batchedFetch({ request, strategy }, signal); + } else { + const { id, ...searchRequest } = request; + const path = trimEnd(`/internal/search/${strategy || ES_SEARCH_STRATEGY}/${id || ''}`, '/'); + const body = JSON.stringify(searchRequest); + + return this.deps.http.fetch({ + method: 'POST', + path, + body, + signal, + }); + } } /** diff --git a/src/plugins/data/server/search/routes/search.test.ts b/src/plugins/data/server/search/routes/search.test.ts new file mode 100644 index 0000000000000..495cb1c9ea770 --- /dev/null +++ b/src/plugins/data/server/search/routes/search.test.ts @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import type { MockedKeys } from '@kbn/utility-types/jest'; +import { from } from 'rxjs'; +import { CoreSetup, RequestHandlerContext } from 'src/core/server'; +import { coreMock, httpServerMock } from '../../../../../../src/core/server/mocks'; +import { registerSearchRoute } from './search'; +import { DataPluginStart } from '../../plugin'; + +describe('Search service', () => { + let mockCoreSetup: MockedKeys>; + + beforeEach(() => { + mockCoreSetup = coreMock.createSetup(); + }); + + it('handler calls context.search.search with the given request and strategy', async () => { + const response = { + id: 'yay', + rawResponse: { + took: 100, + timed_out: true, + _shards: { + total: 0, + successful: 0, + failed: 0, + skipped: 0, + }, + hits: { + total: 0, + max_score: 0, + hits: [], + }, + }, + }; + + const mockContext = { + search: { + search: jest.fn().mockReturnValue(from(Promise.resolve(response))), + }, + }; + + const mockBody = { id: undefined, params: {} }; + const mockParams = { strategy: 'foo' }; + const mockRequest = httpServerMock.createKibanaRequest({ + body: mockBody, + params: mockParams, + }); + const mockResponse = httpServerMock.createResponseFactory(); + + registerSearchRoute(mockCoreSetup.http.createRouter()); + + const mockRouter = mockCoreSetup.http.createRouter.mock.results[0].value; + const handler = mockRouter.post.mock.calls[0][1]; + await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse); + + expect(mockContext.search.search).toBeCalled(); + expect(mockContext.search.search.mock.calls[0][0]).toStrictEqual(mockBody); + expect(mockResponse.ok).toBeCalled(); + expect(mockResponse.ok.mock.calls[0][0]).toEqual({ + body: response, + }); + }); + + it('handler throws an error if the search throws an error', async () => { + const rejectedValue = from( + Promise.reject({ + message: 'oh no', + body: { + error: 'oops', + }, + }) + ); + + const mockContext = { + search: { + search: jest.fn().mockReturnValue(rejectedValue), + }, + }; + + const mockBody = { id: undefined, params: {} }; + const mockParams = { strategy: 'foo' }; + const mockRequest = httpServerMock.createKibanaRequest({ + body: mockBody, + params: mockParams, + }); + const mockResponse = httpServerMock.createResponseFactory(); + + registerSearchRoute(mockCoreSetup.http.createRouter()); + + const mockRouter = mockCoreSetup.http.createRouter.mock.results[0].value; + const handler = mockRouter.post.mock.calls[0][1]; + await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse); + + expect(mockContext.search.search).toBeCalled(); + expect(mockContext.search.search.mock.calls[0][0]).toStrictEqual(mockBody); + expect(mockResponse.customError).toBeCalled(); + const error: any = mockResponse.customError.mock.calls[0][0]; + expect(error.body.message).toBe('oh no'); + expect(error.body.attributes.error).toBe('oops'); + }); +}); diff --git a/src/plugins/data/server/search/routes/search.ts b/src/plugins/data/server/search/routes/search.ts index 8f3634751d191..68c897757a62e 100644 --- a/src/plugins/data/server/search/routes/search.ts +++ b/src/plugins/data/server/search/routes/search.ts @@ -17,10 +17,60 @@ * under the License. */ +import { first } from 'rxjs/operators'; import { schema } from '@kbn/config-schema'; import type { IRouter } from 'src/core/server'; +import { getRequestAbortedSignal } from '../../lib'; export function registerSearchRoute(router: IRouter): void { + router.post( + { + path: '/internal/search/{strategy}/{id?}', + validate: { + params: schema.object({ + strategy: schema.string(), + id: schema.maybe(schema.string()), + }), + + query: schema.object({}, { unknowns: 'allow' }), + + body: schema.object({}, { unknowns: 'allow' }), + }, + }, + async (context, request, res) => { + const searchRequest = request.body; + const { strategy, id } = request.params; + const abortSignal = getRequestAbortedSignal(request.events.aborted$); + + try { + const response = await context + .search!.search( + { ...searchRequest, id }, + { + abortSignal, + strategy, + } + ) + .pipe(first()) + .toPromise(); + + return res.ok({ + body: response, + }); + } catch (err) { + return res.customError({ + statusCode: err.statusCode || 500, + body: { + message: err.message, + attributes: { + error: err.body?.error || err.message, + }, + }, + }); + } + } + ); + router.delete( { path: '/internal/search/{strategy}/{id}', diff --git a/src/plugins/data/server/ui_settings.ts b/src/plugins/data/server/ui_settings.ts index f5360f626ac66..9393700a0e771 100644 --- a/src/plugins/data/server/ui_settings.ts +++ b/src/plugins/data/server/ui_settings.ts @@ -267,13 +267,14 @@ export function getUiSettings(): Record> { }, [UI_SETTINGS.COURIER_BATCH_SEARCHES]: { name: i18n.translate('data.advancedSettings.courier.batchSearchesTitle', { - defaultMessage: 'Use legacy search', + defaultMessage: 'Batch concurrent searches', }), value: false, type: 'boolean', description: i18n.translate('data.advancedSettings.courier.batchSearchesText', { - defaultMessage: `Kibana uses a new search and batching infrastructure. - Enable this option if you prefer to fallback to the legacy synchronous behavior`, + defaultMessage: `When disabled, dashboard panels will load individually, and search requests will terminate when users navigate + away or update the query. When enabled, dashboard panels will load together when all of the data is loaded, and + searches will not terminate.`, }), deprecation: { message: i18n.translate('data.advancedSettings.courier.batchSearchesTextDeprecation', { From cbd8106563b969a811b703a17214702e906a280a Mon Sep 17 00:00:00 2001 From: Liza K Date: Tue, 17 Nov 2020 16:52:11 +0200 Subject: [PATCH 17/25] Remove old search code and rename UI Setting --- .../data/public/search/search_interceptor.ts | 20 +-- .../data/server/search/routes/search.test.ts | 119 ------------------ .../data/server/search/routes/search.ts | 50 -------- src/plugins/data/server/ui_settings.ts | 7 +- 4 files changed, 6 insertions(+), 190 deletions(-) delete mode 100644 src/plugins/data/server/search/routes/search.test.ts diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts index 2255ac536bd51..acc19339fd0c6 100644 --- a/src/plugins/data/public/search/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -17,7 +17,7 @@ * under the License. */ -import { get, memoize, trimEnd } from 'lodash'; +import { get, memoize } from 'lodash'; import { BehaviorSubject, throwError, timer, defer, from, Observable, NEVER } from 'rxjs'; import { catchError, finalize } from 'rxjs/operators'; import { PublicMethodsOf } from '@kbn/utility-types'; @@ -29,7 +29,6 @@ import { IKibanaSearchResponse, ISearchOptions, ISessionService, - ES_SEARCH_STRATEGY, } from '../../common'; import { SearchUsageCollector } from './collectors'; import { @@ -71,7 +70,7 @@ export class SearchInterceptor { * @internal */ protected application!: CoreStart['application']; - private batchedFetch?: BatchedFunc< + private batchedFetch!: BatchedFunc< { request: IKibanaSearchRequest; strategy?: string }, IKibanaSearchResponse >; @@ -139,20 +138,7 @@ export class SearchInterceptor { signal: AbortSignal, strategy?: string ): Promise { - if (this.batchedFetch) { - return this.batchedFetch({ request, strategy }, signal); - } else { - const { id, ...searchRequest } = request; - const path = trimEnd(`/internal/search/${strategy || ES_SEARCH_STRATEGY}/${id || ''}`, '/'); - const body = JSON.stringify(searchRequest); - - return this.deps.http.fetch({ - method: 'POST', - path, - body, - signal, - }); - } + return this.batchedFetch({ request, strategy }, signal); } /** diff --git a/src/plugins/data/server/search/routes/search.test.ts b/src/plugins/data/server/search/routes/search.test.ts deleted file mode 100644 index 495cb1c9ea770..0000000000000 --- a/src/plugins/data/server/search/routes/search.test.ts +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import type { MockedKeys } from '@kbn/utility-types/jest'; -import { from } from 'rxjs'; -import { CoreSetup, RequestHandlerContext } from 'src/core/server'; -import { coreMock, httpServerMock } from '../../../../../../src/core/server/mocks'; -import { registerSearchRoute } from './search'; -import { DataPluginStart } from '../../plugin'; - -describe('Search service', () => { - let mockCoreSetup: MockedKeys>; - - beforeEach(() => { - mockCoreSetup = coreMock.createSetup(); - }); - - it('handler calls context.search.search with the given request and strategy', async () => { - const response = { - id: 'yay', - rawResponse: { - took: 100, - timed_out: true, - _shards: { - total: 0, - successful: 0, - failed: 0, - skipped: 0, - }, - hits: { - total: 0, - max_score: 0, - hits: [], - }, - }, - }; - - const mockContext = { - search: { - search: jest.fn().mockReturnValue(from(Promise.resolve(response))), - }, - }; - - const mockBody = { id: undefined, params: {} }; - const mockParams = { strategy: 'foo' }; - const mockRequest = httpServerMock.createKibanaRequest({ - body: mockBody, - params: mockParams, - }); - const mockResponse = httpServerMock.createResponseFactory(); - - registerSearchRoute(mockCoreSetup.http.createRouter()); - - const mockRouter = mockCoreSetup.http.createRouter.mock.results[0].value; - const handler = mockRouter.post.mock.calls[0][1]; - await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse); - - expect(mockContext.search.search).toBeCalled(); - expect(mockContext.search.search.mock.calls[0][0]).toStrictEqual(mockBody); - expect(mockResponse.ok).toBeCalled(); - expect(mockResponse.ok.mock.calls[0][0]).toEqual({ - body: response, - }); - }); - - it('handler throws an error if the search throws an error', async () => { - const rejectedValue = from( - Promise.reject({ - message: 'oh no', - body: { - error: 'oops', - }, - }) - ); - - const mockContext = { - search: { - search: jest.fn().mockReturnValue(rejectedValue), - }, - }; - - const mockBody = { id: undefined, params: {} }; - const mockParams = { strategy: 'foo' }; - const mockRequest = httpServerMock.createKibanaRequest({ - body: mockBody, - params: mockParams, - }); - const mockResponse = httpServerMock.createResponseFactory(); - - registerSearchRoute(mockCoreSetup.http.createRouter()); - - const mockRouter = mockCoreSetup.http.createRouter.mock.results[0].value; - const handler = mockRouter.post.mock.calls[0][1]; - await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse); - - expect(mockContext.search.search).toBeCalled(); - expect(mockContext.search.search.mock.calls[0][0]).toStrictEqual(mockBody); - expect(mockResponse.customError).toBeCalled(); - const error: any = mockResponse.customError.mock.calls[0][0]; - expect(error.body.message).toBe('oh no'); - expect(error.body.attributes.error).toBe('oops'); - }); -}); diff --git a/src/plugins/data/server/search/routes/search.ts b/src/plugins/data/server/search/routes/search.ts index 68c897757a62e..8f3634751d191 100644 --- a/src/plugins/data/server/search/routes/search.ts +++ b/src/plugins/data/server/search/routes/search.ts @@ -17,60 +17,10 @@ * under the License. */ -import { first } from 'rxjs/operators'; import { schema } from '@kbn/config-schema'; import type { IRouter } from 'src/core/server'; -import { getRequestAbortedSignal } from '../../lib'; export function registerSearchRoute(router: IRouter): void { - router.post( - { - path: '/internal/search/{strategy}/{id?}', - validate: { - params: schema.object({ - strategy: schema.string(), - id: schema.maybe(schema.string()), - }), - - query: schema.object({}, { unknowns: 'allow' }), - - body: schema.object({}, { unknowns: 'allow' }), - }, - }, - async (context, request, res) => { - const searchRequest = request.body; - const { strategy, id } = request.params; - const abortSignal = getRequestAbortedSignal(request.events.aborted$); - - try { - const response = await context - .search!.search( - { ...searchRequest, id }, - { - abortSignal, - strategy, - } - ) - .pipe(first()) - .toPromise(); - - return res.ok({ - body: response, - }); - } catch (err) { - return res.customError({ - statusCode: err.statusCode || 500, - body: { - message: err.message, - attributes: { - error: err.body?.error || err.message, - }, - }, - }); - } - } - ); - router.delete( { path: '/internal/search/{strategy}/{id}', diff --git a/src/plugins/data/server/ui_settings.ts b/src/plugins/data/server/ui_settings.ts index 9393700a0e771..f5360f626ac66 100644 --- a/src/plugins/data/server/ui_settings.ts +++ b/src/plugins/data/server/ui_settings.ts @@ -267,14 +267,13 @@ export function getUiSettings(): Record> { }, [UI_SETTINGS.COURIER_BATCH_SEARCHES]: { name: i18n.translate('data.advancedSettings.courier.batchSearchesTitle', { - defaultMessage: 'Batch concurrent searches', + defaultMessage: 'Use legacy search', }), value: false, type: 'boolean', description: i18n.translate('data.advancedSettings.courier.batchSearchesText', { - defaultMessage: `When disabled, dashboard panels will load individually, and search requests will terminate when users navigate - away or update the query. When enabled, dashboard panels will load together when all of the data is loaded, and - searches will not terminate.`, + defaultMessage: `Kibana uses a new search and batching infrastructure. + Enable this option if you prefer to fallback to the legacy synchronous behavior`, }), deprecation: { message: i18n.translate('data.advancedSettings.courier.batchSearchesTextDeprecation', { From 1ec890ad81008a65c235af4f18b113fafb11634c Mon Sep 17 00:00:00 2001 From: Liza K Date: Wed, 18 Nov 2020 13:49:05 +0200 Subject: [PATCH 18/25] revert search route --- .../data/server/search/routes/search.test.ts | 119 ++++++++++++++++++ .../data/server/search/routes/search.ts | 56 +++++++++ 2 files changed, 175 insertions(+) create mode 100644 src/plugins/data/server/search/routes/search.test.ts diff --git a/src/plugins/data/server/search/routes/search.test.ts b/src/plugins/data/server/search/routes/search.test.ts new file mode 100644 index 0000000000000..495cb1c9ea770 --- /dev/null +++ b/src/plugins/data/server/search/routes/search.test.ts @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import type { MockedKeys } from '@kbn/utility-types/jest'; +import { from } from 'rxjs'; +import { CoreSetup, RequestHandlerContext } from 'src/core/server'; +import { coreMock, httpServerMock } from '../../../../../../src/core/server/mocks'; +import { registerSearchRoute } from './search'; +import { DataPluginStart } from '../../plugin'; + +describe('Search service', () => { + let mockCoreSetup: MockedKeys>; + + beforeEach(() => { + mockCoreSetup = coreMock.createSetup(); + }); + + it('handler calls context.search.search with the given request and strategy', async () => { + const response = { + id: 'yay', + rawResponse: { + took: 100, + timed_out: true, + _shards: { + total: 0, + successful: 0, + failed: 0, + skipped: 0, + }, + hits: { + total: 0, + max_score: 0, + hits: [], + }, + }, + }; + + const mockContext = { + search: { + search: jest.fn().mockReturnValue(from(Promise.resolve(response))), + }, + }; + + const mockBody = { id: undefined, params: {} }; + const mockParams = { strategy: 'foo' }; + const mockRequest = httpServerMock.createKibanaRequest({ + body: mockBody, + params: mockParams, + }); + const mockResponse = httpServerMock.createResponseFactory(); + + registerSearchRoute(mockCoreSetup.http.createRouter()); + + const mockRouter = mockCoreSetup.http.createRouter.mock.results[0].value; + const handler = mockRouter.post.mock.calls[0][1]; + await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse); + + expect(mockContext.search.search).toBeCalled(); + expect(mockContext.search.search.mock.calls[0][0]).toStrictEqual(mockBody); + expect(mockResponse.ok).toBeCalled(); + expect(mockResponse.ok.mock.calls[0][0]).toEqual({ + body: response, + }); + }); + + it('handler throws an error if the search throws an error', async () => { + const rejectedValue = from( + Promise.reject({ + message: 'oh no', + body: { + error: 'oops', + }, + }) + ); + + const mockContext = { + search: { + search: jest.fn().mockReturnValue(rejectedValue), + }, + }; + + const mockBody = { id: undefined, params: {} }; + const mockParams = { strategy: 'foo' }; + const mockRequest = httpServerMock.createKibanaRequest({ + body: mockBody, + params: mockParams, + }); + const mockResponse = httpServerMock.createResponseFactory(); + + registerSearchRoute(mockCoreSetup.http.createRouter()); + + const mockRouter = mockCoreSetup.http.createRouter.mock.results[0].value; + const handler = mockRouter.post.mock.calls[0][1]; + await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse); + + expect(mockContext.search.search).toBeCalled(); + expect(mockContext.search.search.mock.calls[0][0]).toStrictEqual(mockBody); + expect(mockResponse.customError).toBeCalled(); + const error: any = mockResponse.customError.mock.calls[0][0]; + expect(error.body.message).toBe('oh no'); + expect(error.body.attributes.error).toBe('oops'); + }); +}); diff --git a/src/plugins/data/server/search/routes/search.ts b/src/plugins/data/server/search/routes/search.ts index 8f3634751d191..a4161fe47b388 100644 --- a/src/plugins/data/server/search/routes/search.ts +++ b/src/plugins/data/server/search/routes/search.ts @@ -17,10 +17,66 @@ * under the License. */ +import { first } from 'rxjs/operators'; import { schema } from '@kbn/config-schema'; import type { IRouter } from 'src/core/server'; +import { getRequestAbortedSignal } from '../../lib'; +import { shimHitsTotal } from './shim_hits_total'; export function registerSearchRoute(router: IRouter): void { + router.post( + { + path: '/internal/search/{strategy}/{id?}', + validate: { + params: schema.object({ + strategy: schema.string(), + id: schema.maybe(schema.string()), + }), + + query: schema.object({}, { unknowns: 'allow' }), + + body: schema.object({}, { unknowns: 'allow' }), + }, + }, + async (context, request, res) => { + const searchRequest = request.body; + const { strategy, id } = request.params; + const abortSignal = getRequestAbortedSignal(request.events.aborted$); + + try { + const response = await context + .search!.search( + { ...searchRequest, id }, + { + abortSignal, + strategy, + } + ) + .pipe(first()) + .toPromise(); + + return res.ok({ + body: { + ...response, + ...{ + rawResponse: shimHitsTotal(response.rawResponse), + }, + }, + }); + } catch (err) { + return res.customError({ + statusCode: err.statusCode || 500, + body: { + message: err.message, + attributes: { + error: err.body?.error || err.message, + }, + }, + }); + } + } + ); + router.delete( { path: '/internal/search/{strategy}/{id}', From 3e334a763190326804c1944e1b78187827c9dbeb Mon Sep 17 00:00:00 2001 From: Liza K Date: Wed, 18 Nov 2020 19:19:48 +0200 Subject: [PATCH 19/25] fix event unsubscribe --- src/plugins/bfetch/public/batching/batch_utils.ts | 12 ++++++++++++ .../batching/create_streaming_batched_function.ts | 8 ++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/plugins/bfetch/public/batching/batch_utils.ts b/src/plugins/bfetch/public/batching/batch_utils.ts index 7ec3a44a6da9e..96694e7530dfb 100644 --- a/src/plugins/bfetch/public/batching/batch_utils.ts +++ b/src/plugins/bfetch/public/batching/batch_utils.ts @@ -18,6 +18,7 @@ */ import { BatchItem } from './types'; +import { AbortError } from '../../../kibana_utils/public'; export function isBatchDone(items: Array>): boolean { return items.every((item) => item.done); @@ -33,3 +34,14 @@ export function getDonePromise(item: BatchItem) { item.signal?.addEventListener('abort', onDone); }); } + +export function rejectOnAbort(item: BatchItem) { + const cleanup = () => item.signal?.removeEventListener('abort', rejectAborted); + const rejectAborted = () => { + item.future.reject(new AbortError()); + cleanup(); + }; + + item.signal?.addEventListener('abort', rejectAborted); + item.future.promise.then(cleanup, cleanup); +} diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index e2ef35160eef8..9965852d551e7 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -28,7 +28,7 @@ import { import { fetchStreaming, split } from '../streaming'; import { normalizeError } from '../../common'; import { BatchedFunc, BatchItem } from './types'; -import { isBatchDone, getDonePromise } from './batch_utils'; +import { isBatchDone, getDonePromise, rejectOnAbort } from './batch_utils'; export interface BatchedFunctionProtocolError extends ErrorLike { code: string; @@ -104,11 +104,7 @@ export const createStreamingBatchedFunction = ( // Prepare batch const batch = items.map((item) => { // Subscribe to reject promise on abort - const rejectAborted = () => { - item.future.reject(new AbortError()); - item.signal?.removeEventListener('abort', rejectAborted); - }; - item.signal?.addEventListener('abort', rejectAborted); + rejectOnAbort(item); // Track batch progress promises.push(getDonePromise(item)); From 8abff48101391a17120947fff453caa5d5d1e4cf Mon Sep 17 00:00:00 2001 From: Liza K Date: Thu, 19 Nov 2020 16:00:20 +0200 Subject: [PATCH 20/25] code review 2 --- .../public/batching/batch_utils.test.ts | 3 ++ .../bfetch/public/batching/batch_utils.ts | 11 ++++--- .../create_streaming_batched_function.ts | 32 +++++++++---------- .../public/streaming/from_streaming_xhr.ts | 6 ++-- .../public/search/search_interceptor.test.ts | 2 +- 5 files changed, 31 insertions(+), 23 deletions(-) diff --git a/src/plugins/bfetch/public/batching/batch_utils.test.ts b/src/plugins/bfetch/public/batching/batch_utils.test.ts index 22923d5af92ca..bc42bee97ec8e 100644 --- a/src/plugins/bfetch/public/batching/batch_utils.test.ts +++ b/src/plugins/bfetch/public/batching/batch_utils.test.ts @@ -41,6 +41,7 @@ describe('getDonePromise()', () => { abortController.abort(); await tick(); expect(spy).toHaveBeenCalledTimes(1); + expect(item.done).toBeTruthy(); }); test('Triggers when resolved', async () => { @@ -60,6 +61,7 @@ describe('getDonePromise()', () => { item.future.resolve(null); await tick(); expect(spy).toHaveBeenCalledTimes(1); + expect(item.done).toBeTruthy(); }); test('Triggers when rejected', async () => { @@ -79,5 +81,6 @@ describe('getDonePromise()', () => { item.future.reject(null); await tick(); expect(spy).toHaveBeenCalledTimes(1); + expect(item.done).toBeTruthy(); }); }); diff --git a/src/plugins/bfetch/public/batching/batch_utils.ts b/src/plugins/bfetch/public/batching/batch_utils.ts index 96694e7530dfb..1d3b627699443 100644 --- a/src/plugins/bfetch/public/batching/batch_utils.ts +++ b/src/plugins/bfetch/public/batching/batch_utils.ts @@ -27,21 +27,24 @@ export function isBatchDone(items: Array>): boolean { export function getDonePromise(item: BatchItem) { return new Promise((resolve) => { const onDone = () => { - item.signal?.removeEventListener('abort', onDone); + item.done = true; + if (item.signal) item.signal.removeEventListener('abort', onDone); resolve(); }; item.future.promise.then(onDone, onDone); - item.signal?.addEventListener('abort', onDone); + if (item.signal) item.signal.addEventListener('abort', onDone); }); } export function rejectOnAbort(item: BatchItem) { - const cleanup = () => item.signal?.removeEventListener('abort', rejectAborted); + const cleanup = () => { + if (item.signal) item.signal.removeEventListener('abort', rejectAborted); + }; const rejectAborted = () => { item.future.reject(new AbortError()); cleanup(); }; - item.signal?.addEventListener('abort', rejectAborted); + if (item.signal) item.signal.addEventListener('abort', rejectAborted); item.future.promise.then(cleanup, cleanup); } diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index 9965852d551e7..b3baa9da75c81 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -93,25 +93,25 @@ export const createStreamingBatchedFunction = ( const promises: Array> = []; const abortController = new AbortController(); - // Filter out and reject any items who's signal is already aborted - items = items.filter((item) => { - if (item.signal?.aborted) { - item.future.reject(new AbortError()); - } - return !item.signal?.aborted; - }); - // Prepare batch - const batch = items.map((item) => { - // Subscribe to reject promise on abort - rejectOnAbort(item); + const batch = items + // Filter out and reject any items who's signal is already aborted + .filter((item) => { + if (item.signal?.aborted) { + item.future.reject(new AbortError()); + } + return !item.signal?.aborted; + }) + .map((item) => { + // Subscribe to reject promise on abort + rejectOnAbort(item); - // Track batch progress - promises.push(getDonePromise(item)); + // Track batch progress + promises.push(getDonePromise(item)); - // Return payload to be sent - return item.payload; - }); + // Return payload to be sent + return item.payload; + }); // abort when all items were either resolved, rejected or aborted Promise.all(promises).then(() => abortController.abort()); diff --git a/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts index 88bedfde60287..0d87a25712742 100644 --- a/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts +++ b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts @@ -50,11 +50,11 @@ export const fromStreamingXhr = ( aborted = true; xhr.abort(); subject.complete(); - signal?.removeEventListener('abort', onBatchAbort); + if (signal) signal.removeEventListener('abort', onBatchAbort); } }; - signal?.addEventListener('abort', onBatchAbort); + if (signal) signal.addEventListener('abort', onBatchAbort); xhr.onreadystatechange = () => { // Older browsers don't support onprogress, so we need @@ -64,6 +64,8 @@ export const fromStreamingXhr = ( // 4 is the magic number that means the request is done if (!aborted && xhr.readyState === 4) { + if (signal) signal.removeEventListener('abort', onBatchAbort); + // 0 indicates a network failure. 400+ messages are considered server errors if (xhr.status === 0 || xhr.status >= 400) { subject.error(new Error(`Batch request failed with status ${xhr.status}`)); diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts index 6b3f498019255..3659c1a04b2b4 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts @@ -392,7 +392,7 @@ describe('EnhancedSearchInterceptor', () => { await timeTravel(); - const areAllRequestsAborted = fetchMock.mock.calls.every((callArgs) => callArgs[1]?.aborted); + const areAllRequestsAborted = fetchMock.mock.calls.every(([_, signal]) => signal?.aborted); expect(areAllRequestsAborted).toBe(true); expect(mockUsageCollector.trackQueriesCancelled).toBeCalledTimes(1); }); From 9d164397c58fcf7b193bf76d53b2fbc6cd1d4273 Mon Sep 17 00:00:00 2001 From: Liza K Date: Thu, 19 Nov 2020 18:02:08 +0200 Subject: [PATCH 21/25] revert filter --- .../public/batching/create_streaming_batched_function.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index b3baa9da75c81..ba27d0446610e 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -93,9 +93,16 @@ export const createStreamingBatchedFunction = ( const promises: Array> = []; const abortController = new AbortController(); + // Filter out and reject any items who's signal is already aborted + items = items.filter((item) => { + if (item.signal?.aborted) { + item.future.reject(new AbortError()); + } + return !item.signal?.aborted; + }); + // Prepare batch const batch = items - // Filter out and reject any items who's signal is already aborted .filter((item) => { if (item.signal?.aborted) { item.future.reject(new AbortError()); From d57b8dcf9e5016fa364df754fdcc7483c2102c14 Mon Sep 17 00:00:00 2001 From: Liza K Date: Sun, 22 Nov 2020 11:40:27 +0200 Subject: [PATCH 22/25] simplify batch done logic --- .../public/batching/batch_utils.test.ts | 86 ------------------- .../bfetch/public/batching/batch_utils.ts | 50 ----------- .../create_streaming_batched_function.ts | 54 +++++------- src/plugins/bfetch/public/batching/types.ts | 1 - 4 files changed, 21 insertions(+), 170 deletions(-) delete mode 100644 src/plugins/bfetch/public/batching/batch_utils.test.ts delete mode 100644 src/plugins/bfetch/public/batching/batch_utils.ts diff --git a/src/plugins/bfetch/public/batching/batch_utils.test.ts b/src/plugins/bfetch/public/batching/batch_utils.test.ts deleted file mode 100644 index bc42bee97ec8e..0000000000000 --- a/src/plugins/bfetch/public/batching/batch_utils.test.ts +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import { BatchItem } from './types'; -import { getDonePromise } from './batch_utils'; -import { defer } from 'src/plugins/kibana_utils/common'; - -const tick = () => new Promise((resolve) => setTimeout(resolve, 1)); - -describe('getDonePromise()', () => { - test('Triggers when aborted', async () => { - const abortController = new AbortController(); - const item: BatchItem = { - future: defer(), - payload: null, - done: false, - signal: abortController.signal, - }; - const b = getDonePromise(item); - - const spy = jest.fn(); - - b.then(spy); - - abortController.abort(); - await tick(); - expect(spy).toHaveBeenCalledTimes(1); - expect(item.done).toBeTruthy(); - }); - - test('Triggers when resolved', async () => { - const abortController = new AbortController(); - const item: BatchItem = { - future: defer(), - payload: null, - done: false, - signal: abortController.signal, - }; - const b = getDonePromise(item); - - const spy = jest.fn(); - - b.then(spy); - - item.future.resolve(null); - await tick(); - expect(spy).toHaveBeenCalledTimes(1); - expect(item.done).toBeTruthy(); - }); - - test('Triggers when rejected', async () => { - const abortController = new AbortController(); - const item: BatchItem = { - future: defer(), - payload: null, - done: false, - signal: abortController.signal, - }; - const b = getDonePromise(item); - - const spy = jest.fn(); - - b.then(spy); - - item.future.reject(null); - await tick(); - expect(spy).toHaveBeenCalledTimes(1); - expect(item.done).toBeTruthy(); - }); -}); diff --git a/src/plugins/bfetch/public/batching/batch_utils.ts b/src/plugins/bfetch/public/batching/batch_utils.ts deleted file mode 100644 index 1d3b627699443..0000000000000 --- a/src/plugins/bfetch/public/batching/batch_utils.ts +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import { BatchItem } from './types'; -import { AbortError } from '../../../kibana_utils/public'; - -export function isBatchDone(items: Array>): boolean { - return items.every((item) => item.done); -} - -export function getDonePromise(item: BatchItem) { - return new Promise((resolve) => { - const onDone = () => { - item.done = true; - if (item.signal) item.signal.removeEventListener('abort', onDone); - resolve(); - }; - item.future.promise.then(onDone, onDone); - if (item.signal) item.signal.addEventListener('abort', onDone); - }); -} - -export function rejectOnAbort(item: BatchItem) { - const cleanup = () => { - if (item.signal) item.signal.removeEventListener('abort', rejectAborted); - }; - const rejectAborted = () => { - item.future.reject(new AbortError()); - cleanup(); - }; - - if (item.signal) item.signal.addEventListener('abort', rejectAborted); - item.future.promise.then(cleanup, cleanup); -} diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index ba27d0446610e..7bd258f3cc7e3 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -17,7 +17,7 @@ * under the License. */ -import { AbortError, defer } from '../../../kibana_utils/public'; +import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public'; import { ItemBufferParams, TimedItemBufferParams, @@ -28,7 +28,6 @@ import { import { fetchStreaming, split } from '../streaming'; import { normalizeError } from '../../common'; import { BatchedFunc, BatchItem } from './types'; -import { isBatchDone, getDonePromise, rejectOnAbort } from './batch_utils'; export interface BatchedFunctionProtocolError extends ErrorLike { code: string; @@ -77,51 +76,41 @@ export const createStreamingBatchedFunction = ( flushOnMaxItems = 25, maxItemAge = 10, } = params; - const [fn] = createBatchedFunction, BatchItem>({ + const [fn] = createBatchedFunction({ onCall: (payload: Payload, signal?: AbortSignal) => { const future = defer(); const entry: BatchItem = { payload, future, signal, - done: false, }; return [future.promise, entry]; }, onBatch: async (items) => { try { - const promises: Array> = []; - const abortController = new AbortController(); + const donePromises: Array> = items.map((item) => { + if (!item.signal) return item.future.promise.catch(() => {}); - // Filter out and reject any items who's signal is already aborted - items = items.filter((item) => { - if (item.signal?.aborted) { + // Reject promise if aborted + const { promise: abortPromise, cleanup } = abortSignalToPromise(item.signal); + abortPromise.catch(() => { item.future.reject(new AbortError()); - } - return !item.signal?.aborted; - }); - - // Prepare batch - const batch = items - .filter((item) => { - if (item.signal?.aborted) { - item.future.reject(new AbortError()); - } - return !item.signal?.aborted; - }) - .map((item) => { - // Subscribe to reject promise on abort - rejectOnAbort(item); - - // Track batch progress - promises.push(getDonePromise(item)); - - // Return payload to be sent - return item.payload; + cleanup(); }); + return item.future.promise.then(cleanup, cleanup); + }); // abort when all items were either resolved, rejected or aborted - Promise.all(promises).then(() => abortController.abort()); + const abortController = new AbortController(); + let isBatchDone = false; + Promise.all(donePromises).then(() => { + isBatchDone = true; + abortController.abort(); + }); + const batch = items + // Filter out any items whose signal is already aborted + .filter((item) => !item.signal?.aborted) + .map((item) => item.payload); const { stream } = fetchStreamingInjected({ url, @@ -144,8 +133,7 @@ export const createStreamingBatchedFunction = ( for (const { future } of items) future.reject(normalizedError); }, complete: () => { - const streamTerminatedPrematurely = !isBatchDone(items); - if (streamTerminatedPrematurely) { + if (!isBatchDone) { const error: BatchedFunctionProtocolError = { message: 'Connection terminated prematurely.', code: 'CONNECTION', diff --git a/src/plugins/bfetch/public/batching/types.ts b/src/plugins/bfetch/public/batching/types.ts index d59d6b3d5d641..68860c5d9eedf 100644 --- a/src/plugins/bfetch/public/batching/types.ts +++ b/src/plugins/bfetch/public/batching/types.ts @@ -23,7 +23,6 @@ export interface BatchItem { payload: Payload; future: Defer; signal?: AbortSignal; - done: boolean; } export type BatchedFunc = ( From 6b9a4f8ba96fc15133e4d9f564144a676a3716f0 Mon Sep 17 00:00:00 2001 From: Liza K Date: Sun, 22 Nov 2020 12:15:41 +0200 Subject: [PATCH 23/25] code review --- .../create_streaming_batched_function.test.ts | 29 +++++++++++++++++++ .../public/streaming/from_streaming_xhr.ts | 3 +- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts index 7638966966ff0..57a7d7fc4f8cb 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts @@ -446,6 +446,35 @@ describe('createStreamingBatchedFunction()', () => { }); describe('when requests are aborted', () => { + test('aborts stream when all are aborted', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const abortController = new AbortController(); + const promise = fn({ a: '1' }, abortController.signal); + const promise2 = fn({ a: '2' }, abortController.signal); + await new Promise((r) => setTimeout(r, 6)); + + expect(await isPending(promise)).toBe(true); + expect(await isPending(promise2)).toBe(true); + + abortController.abort(); + await new Promise((r) => setTimeout(r, 6)); + + expect(await isPending(promise)).toBe(false); + expect(await isPending(promise2)).toBe(false); + const [, error] = await of(promise); + const [, error2] = await of(promise2); + expect(error).toBeInstanceOf(AbortError); + expect(error2).toBeInstanceOf(AbortError); + expect(fetchStreaming.mock.calls[0][0].signal.aborted).toBeTruthy(); + }); + test('rejects promise on abort and lets others continue', async () => { const { fetchStreaming, stream } = setup(); const fn = createStreamingBatchedFunction({ diff --git a/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts index 0d87a25712742..5df1f5258cb2d 100644 --- a/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts +++ b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts @@ -57,13 +57,14 @@ export const fromStreamingXhr = ( if (signal) signal.addEventListener('abort', onBatchAbort); xhr.onreadystatechange = () => { + if (aborted) return; // Older browsers don't support onprogress, so we need // to call this here, too. It's safe to call this multiple // times even for the same progress event. processBatch(); // 4 is the magic number that means the request is done - if (!aborted && xhr.readyState === 4) { + if (xhr.readyState === 4) { if (signal) signal.removeEventListener('abort', onBatchAbort); // 0 indicates a network failure. 400+ messages are considered server errors From e9103b77ef03e6e295e4037f62468f159a468f07 Mon Sep 17 00:00:00 2001 From: Liza K Date: Sun, 22 Nov 2020 14:14:39 +0200 Subject: [PATCH 24/25] filter items in the beginning --- .../create_streaming_batched_function.ts | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index 7bd258f3cc7e3..fe5771bef795b 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -88,16 +88,28 @@ export const createStreamingBatchedFunction = ( }, onBatch: async (items) => { try { + // Filter out any items whose signal is already aborted + items = items.filter((item) => { + if (item.signal?.aborted) item.future.reject(new AbortError()); + return !item.signal?.aborted; + }); + const donePromises: Array> = items.map((item) => { - if (!item.signal) return item.future.promise.catch(() => {}); + return new Promise((resolve) => { + const { promise: abortPromise, cleanup } = item.signal + ? abortSignalToPromise(item.signal) + : { + promise: undefined, + cleanup: () => {}, + }; - // Reject promise if aborted - const { promise: abortPromise, cleanup } = abortSignalToPromise(item.signal); - abortPromise.catch(() => { - item.future.reject(new AbortError()); - cleanup(); + const onDone = () => { + resolve(); + cleanup(); + }; + if (abortPromise) abortPromise.catch(onDone); + item.future.promise.then(onDone, onDone); }); - return item.future.promise.then(cleanup, cleanup); }); // abort when all items were either resolved, rejected or aborted @@ -107,10 +119,7 @@ export const createStreamingBatchedFunction = ( isBatchDone = true; abortController.abort(); }); - const batch = items - // Filter out any items whose signal is already aborted - .filter((item) => !item.signal?.aborted) - .map((item) => item.payload); + const batch = items.map((item) => item.payload); const { stream } = fetchStreamingInjected({ url, From 3fcac6e67ad7f1eb91f6a680bbd78e064b69161e Mon Sep 17 00:00:00 2001 From: Liza K Date: Sun, 22 Nov 2020 14:45:00 +0200 Subject: [PATCH 25/25] jest --- .../public/batching/create_streaming_batched_function.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index fe5771bef795b..4ff12dda1f98e 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -107,7 +107,11 @@ export const createStreamingBatchedFunction = ( resolve(); cleanup(); }; - if (abortPromise) abortPromise.catch(onDone); + if (abortPromise) + abortPromise.catch(() => { + item.future.reject(new AbortError()); + onDone(); + }); item.future.promise.then(onDone, onDone); }); });