diff --git a/src/plugins/data/public/search/sync_search_strategy.test.ts b/src/plugins/data/public/search/sync_search_strategy.test.ts index 9378e5833f8d7..31a1adfa01c75 100644 --- a/src/plugins/data/public/search/sync_search_strategy.test.ts +++ b/src/plugins/data/public/search/sync_search_strategy.test.ts @@ -35,12 +35,9 @@ describe('Sync search strategy', () => { core: mockCoreStart, getSearchStrategy: jest.fn(), }); - syncSearch.search( - { - serverStrategy: SYNC_SEARCH_STRATEGY, - }, - {} - ); + const request = { serverStrategy: SYNC_SEARCH_STRATEGY }; + syncSearch.search(request, {}); + expect(mockCoreStart.http.fetch.mock.calls[0][0]).toEqual({ path: `/internal/search/${SYNC_SEARCH_STRATEGY}`, body: JSON.stringify({ @@ -50,4 +47,47 @@ describe('Sync search strategy', () => { signal: undefined, }); }); + + it('increments and decrements loading count on success', async () => { + const expectedLoadingCountValues = [0, 1, 0]; + const receivedLoadingCountValues: number[] = []; + + mockCoreStart.http.fetch.mockResolvedValueOnce('response'); + + const syncSearch = syncSearchStrategyProvider({ + core: mockCoreStart, + getSearchStrategy: jest.fn(), + }); + const request = { serverStrategy: SYNC_SEARCH_STRATEGY }; + + const loadingCount$ = mockCoreStart.http.addLoadingCountSource.mock.calls[0][0]; + loadingCount$.subscribe(value => receivedLoadingCountValues.push(value)); + + await syncSearch.search(request, {}).toPromise(); + + expect(receivedLoadingCountValues).toEqual(expectedLoadingCountValues); + }); + + it('increments and decrements loading count on failure', async () => { + expect.assertions(1); + const expectedLoadingCountValues = [0, 1, 0]; + const receivedLoadingCountValues: number[] = []; + + mockCoreStart.http.fetch.mockRejectedValueOnce('error'); + + const syncSearch = syncSearchStrategyProvider({ + core: mockCoreStart, + getSearchStrategy: jest.fn(), + }); + const request = { serverStrategy: SYNC_SEARCH_STRATEGY }; + + const loadingCount$ = mockCoreStart.http.addLoadingCountSource.mock.calls[0][0]; + loadingCount$.subscribe(value => receivedLoadingCountValues.push(value)); + + try { + await syncSearch.search(request, {}).toPromise(); + } catch (e) { + expect(receivedLoadingCountValues).toEqual(expectedLoadingCountValues); + } + }); }); diff --git a/src/plugins/data/public/search/sync_search_strategy.ts b/src/plugins/data/public/search/sync_search_strategy.ts index c2cc180af546e..860ce593ae217 100644 --- a/src/plugins/data/public/search/sync_search_strategy.ts +++ b/src/plugins/data/public/search/sync_search_strategy.ts @@ -18,7 +18,8 @@ */ import { BehaviorSubject, from } from 'rxjs'; -import { IKibanaSearchRequest, IKibanaSearchResponse } from '../../common/search'; +import { finalize } from 'rxjs/operators'; +import { IKibanaSearchRequest } from '../../common/search'; import { ISearch, ISearchOptions } from './i_search'; import { TSearchStrategyProvider, ISearchStrategy, ISearchContext } from './types'; @@ -40,16 +41,14 @@ export const syncSearchStrategyProvider: TSearchStrategyProvider { loadingCount$.next(loadingCount$.getValue() + 1); - const response: Promise = context.core.http.fetch({ - path: `/internal/search/${request.serverStrategy}`, - method: 'POST', - body: JSON.stringify(request), - signal: options.signal, - }); - - response.then(() => loadingCount$.next(loadingCount$.getValue() - 1)); - - return from(response); + return from( + context.core.http.fetch({ + path: `/internal/search/${request.serverStrategy}`, + method: 'POST', + body: JSON.stringify(request), + signal: options.signal, + }) + ).pipe(finalize(() => loadingCount$.next(loadingCount$.getValue() - 1))); }; return { search }; diff --git a/src/plugins/data/server/autocomplete/value_suggestions_route.ts b/src/plugins/data/server/autocomplete/value_suggestions_route.ts index f032890e98901..02a5e0921fe4f 100644 --- a/src/plugins/data/server/autocomplete/value_suggestions_route.ts +++ b/src/plugins/data/server/autocomplete/value_suggestions_route.ts @@ -23,6 +23,7 @@ import { IRouter } from 'kibana/server'; import { IFieldType, Filter } from '../index'; import { findIndexPatternById, getFieldByName } from '../index_patterns'; +import { getRequestAbortedSignal } from '../lib'; export function registerValueSuggestionsRoute(router: IRouter) { router.post( @@ -50,6 +51,7 @@ export function registerValueSuggestionsRoute(router: IRouter) { const { field: fieldName, query, boolFilter } = request.body; const { index } = request.params; const { dataClient } = context.core.elasticsearch; + const signal = getRequestAbortedSignal(request.events.aborted$); const autocompleteSearchOptions = { timeout: await uiSettings.get('kibana.autocompleteTimeout'), @@ -62,7 +64,7 @@ export function registerValueSuggestionsRoute(router: IRouter) { const body = await getBody(autocompleteSearchOptions, field || fieldName, query, boolFilter); try { - const result = await dataClient.callAsCurrentUser('search', { index, body }); + const result = await dataClient.callAsCurrentUser('search', { index, body }, { signal }); const buckets: any[] = get(result, 'aggregations.suggestions.buckets') || diff --git a/src/plugins/data/server/lib/get_request_aborted_signal.test.ts b/src/plugins/data/server/lib/get_request_aborted_signal.test.ts new file mode 100644 index 0000000000000..3c1e20dbcb158 --- /dev/null +++ b/src/plugins/data/server/lib/get_request_aborted_signal.test.ts @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Subject } from 'rxjs'; +import { getRequestAbortedSignal } from './get_request_aborted_signal'; + +describe('abortableRequestHandler', () => { + jest.useFakeTimers(); + + it('should call abort if disconnected', () => { + const abortedSubject = new Subject(); + const aborted$ = abortedSubject.asObservable(); + const onAborted = jest.fn(); + + const signal = getRequestAbortedSignal(aborted$); + signal.addEventListener('abort', onAborted); + + // Shouldn't be aborted or call onAborted prior to disconnecting + expect(signal.aborted).toBe(false); + expect(onAborted).not.toBeCalled(); + + abortedSubject.next(); + jest.runAllTimers(); + + // Should be aborted and call onAborted after disconnecting + expect(signal.aborted).toBe(true); + expect(onAborted).toBeCalled(); + }); +}); diff --git a/src/plugins/data/server/lib/get_request_aborted_signal.ts b/src/plugins/data/server/lib/get_request_aborted_signal.ts new file mode 100644 index 0000000000000..d1541f1df9384 --- /dev/null +++ b/src/plugins/data/server/lib/get_request_aborted_signal.ts @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Observable } from 'rxjs'; +// @ts-ignore not typed +import { AbortController } from 'abortcontroller-polyfill/dist/cjs-ponyfill'; + +/** + * A simple utility function that returns an `AbortSignal` corresponding to an `AbortController` + * which aborts when the given request is aborted. + * @param aborted$ The observable of abort events (usually `request.events.aborted$`) + */ +export function getRequestAbortedSignal(aborted$: Observable): AbortSignal { + const controller = new AbortController(); + aborted$.subscribe(() => controller.abort()); + return controller.signal; +} diff --git a/src/plugins/data/server/lib/index.ts b/src/plugins/data/server/lib/index.ts new file mode 100644 index 0000000000000..a2af456846e14 --- /dev/null +++ b/src/plugins/data/server/lib/index.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export { getRequestAbortedSignal } from './get_request_aborted_signal'; diff --git a/src/plugins/data/server/search/routes.ts b/src/plugins/data/server/search/routes.ts index 6f726771c41b2..11879d14931ae 100644 --- a/src/plugins/data/server/search/routes.ts +++ b/src/plugins/data/server/search/routes.ts @@ -19,6 +19,7 @@ import { schema } from '@kbn/config-schema'; import { IRouter } from '../../../../core/server'; +import { getRequestAbortedSignal } from '../lib'; export function registerSearchRoute(router: IRouter): void { router.post( @@ -35,8 +36,10 @@ export function registerSearchRoute(router: IRouter): void { async (context, request, res) => { const searchRequest = request.body; const strategy = request.params.strategy; + const signal = getRequestAbortedSignal(request.events.aborted$); + try { - const response = await context.search!.search(searchRequest, {}, strategy); + const response = await context.search!.search(searchRequest, { signal }, strategy); return res.ok({ body: response }); } catch (err) { return res.customError({