Skip to content

Commit

Permalink
Abort cancelled search requests to Elasticsearch (#56788)
Browse files Browse the repository at this point in the history
* Update abort controller library

* Bootstrap

* Abort when the request is aborted

* Add utility and update value suggestions route

* Remove bad merge

* Revert switching abort controller libraries

* Revert package.json in lib

* Move to previous abort controller

* Fix test to use fake timers to run debounced handlers

* Fix loading bar not going away when cancelling

* Add test for loading count

* Fix test

* Fix failing test

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
lukasolson and elasticmachine committed Feb 25, 2020
1 parent aa20841 commit b35f969
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 19 deletions.
52 changes: 46 additions & 6 deletions src/plugins/data/public/search/sync_search_strategy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ describe('Sync search strategy', () => {
core: mockCoreStart,
getSearchStrategy: jest.fn(),
});
syncSearch.search(
{
serverStrategy: SYNC_SEARCH_STRATEGY,
},
{}
);
const request = { serverStrategy: SYNC_SEARCH_STRATEGY };
syncSearch.search(request, {});

expect(mockCoreStart.http.fetch.mock.calls[0][0]).toEqual({
path: `/internal/search/${SYNC_SEARCH_STRATEGY}`,
body: JSON.stringify({
Expand All @@ -50,4 +47,47 @@ describe('Sync search strategy', () => {
signal: undefined,
});
});

it('increments and decrements loading count on success', async () => {
const expectedLoadingCountValues = [0, 1, 0];
const receivedLoadingCountValues: number[] = [];

mockCoreStart.http.fetch.mockResolvedValueOnce('response');

const syncSearch = syncSearchStrategyProvider({
core: mockCoreStart,
getSearchStrategy: jest.fn(),
});
const request = { serverStrategy: SYNC_SEARCH_STRATEGY };

const loadingCount$ = mockCoreStart.http.addLoadingCountSource.mock.calls[0][0];
loadingCount$.subscribe(value => receivedLoadingCountValues.push(value));

await syncSearch.search(request, {}).toPromise();

expect(receivedLoadingCountValues).toEqual(expectedLoadingCountValues);
});

it('increments and decrements loading count on failure', async () => {
expect.assertions(1);
const expectedLoadingCountValues = [0, 1, 0];
const receivedLoadingCountValues: number[] = [];

mockCoreStart.http.fetch.mockRejectedValueOnce('error');

const syncSearch = syncSearchStrategyProvider({
core: mockCoreStart,
getSearchStrategy: jest.fn(),
});
const request = { serverStrategy: SYNC_SEARCH_STRATEGY };

const loadingCount$ = mockCoreStart.http.addLoadingCountSource.mock.calls[0][0];
loadingCount$.subscribe(value => receivedLoadingCountValues.push(value));

try {
await syncSearch.search(request, {}).toPromise();
} catch (e) {
expect(receivedLoadingCountValues).toEqual(expectedLoadingCountValues);
}
});
});
21 changes: 10 additions & 11 deletions src/plugins/data/public/search/sync_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
*/

import { BehaviorSubject, from } from 'rxjs';
import { IKibanaSearchRequest, IKibanaSearchResponse } from '../../common/search';
import { finalize } from 'rxjs/operators';
import { IKibanaSearchRequest } from '../../common/search';
import { ISearch, ISearchOptions } from './i_search';
import { TSearchStrategyProvider, ISearchStrategy, ISearchContext } from './types';

Expand All @@ -40,16 +41,14 @@ export const syncSearchStrategyProvider: TSearchStrategyProvider<typeof SYNC_SEA
) => {
loadingCount$.next(loadingCount$.getValue() + 1);

const response: Promise<IKibanaSearchResponse> = context.core.http.fetch({
path: `/internal/search/${request.serverStrategy}`,
method: 'POST',
body: JSON.stringify(request),
signal: options.signal,
});

response.then(() => loadingCount$.next(loadingCount$.getValue() - 1));

return from(response);
return from(
context.core.http.fetch({
path: `/internal/search/${request.serverStrategy}`,
method: 'POST',
body: JSON.stringify(request),
signal: options.signal,
})
).pipe(finalize(() => loadingCount$.next(loadingCount$.getValue() - 1)));
};

return { search };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { IRouter } from 'kibana/server';

import { IFieldType, Filter } from '../index';
import { findIndexPatternById, getFieldByName } from '../index_patterns';
import { getRequestAbortedSignal } from '../lib';

export function registerValueSuggestionsRoute(router: IRouter) {
router.post(
Expand Down Expand Up @@ -50,6 +51,7 @@ export function registerValueSuggestionsRoute(router: IRouter) {
const { field: fieldName, query, boolFilter } = request.body;
const { index } = request.params;
const { dataClient } = context.core.elasticsearch;
const signal = getRequestAbortedSignal(request.events.aborted$);

const autocompleteSearchOptions = {
timeout: await uiSettings.get<number>('kibana.autocompleteTimeout'),
Expand All @@ -62,7 +64,7 @@ export function registerValueSuggestionsRoute(router: IRouter) {
const body = await getBody(autocompleteSearchOptions, field || fieldName, query, boolFilter);

try {
const result = await dataClient.callAsCurrentUser('search', { index, body });
const result = await dataClient.callAsCurrentUser('search', { index, body }, { signal });

const buckets: any[] =
get(result, 'aggregations.suggestions.buckets') ||
Expand Down
45 changes: 45 additions & 0 deletions src/plugins/data/server/lib/get_request_aborted_signal.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { Subject } from 'rxjs';
import { getRequestAbortedSignal } from './get_request_aborted_signal';

describe('abortableRequestHandler', () => {
jest.useFakeTimers();

it('should call abort if disconnected', () => {
const abortedSubject = new Subject<void>();
const aborted$ = abortedSubject.asObservable();
const onAborted = jest.fn();

const signal = getRequestAbortedSignal(aborted$);
signal.addEventListener('abort', onAborted);

// Shouldn't be aborted or call onAborted prior to disconnecting
expect(signal.aborted).toBe(false);
expect(onAborted).not.toBeCalled();

abortedSubject.next();
jest.runAllTimers();

// Should be aborted and call onAborted after disconnecting
expect(signal.aborted).toBe(true);
expect(onAborted).toBeCalled();
});
});
33 changes: 33 additions & 0 deletions src/plugins/data/server/lib/get_request_aborted_signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { Observable } from 'rxjs';
// @ts-ignore not typed
import { AbortController } from 'abortcontroller-polyfill/dist/cjs-ponyfill';

/**
* A simple utility function that returns an `AbortSignal` corresponding to an `AbortController`
* which aborts when the given request is aborted.
* @param aborted$ The observable of abort events (usually `request.events.aborted$`)
*/
export function getRequestAbortedSignal(aborted$: Observable<void>): AbortSignal {
const controller = new AbortController();
aborted$.subscribe(() => controller.abort());
return controller.signal;
}
20 changes: 20 additions & 0 deletions src/plugins/data/server/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

export { getRequestAbortedSignal } from './get_request_aborted_signal';
5 changes: 4 additions & 1 deletion src/plugins/data/server/search/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import { schema } from '@kbn/config-schema';
import { IRouter } from '../../../../core/server';
import { getRequestAbortedSignal } from '../lib';

export function registerSearchRoute(router: IRouter): void {
router.post(
Expand All @@ -35,8 +36,10 @@ export function registerSearchRoute(router: IRouter): void {
async (context, request, res) => {
const searchRequest = request.body;
const strategy = request.params.strategy;
const signal = getRequestAbortedSignal(request.events.aborted$);

try {
const response = await context.search!.search(searchRequest, {}, strategy);
const response = await context.search!.search(searchRequest, { signal }, strategy);
return res.ok({ body: response });
} catch (err) {
return res.customError({
Expand Down

0 comments on commit b35f969

Please sign in to comment.