diff --git a/src/plugins/data/common/search/strategies/es_search/types.ts b/src/plugins/data/common/search/strategies/es_search/types.ts index 73bf7961fea9b..f8c3b73d995a9 100644 --- a/src/plugins/data/common/search/strategies/es_search/types.ts +++ b/src/plugins/data/common/search/strategies/es_search/types.ts @@ -15,7 +15,8 @@ export type ISearchRequestParams = { trackTotalHits?: boolean; } & estypes.SearchRequest; -export interface IEsSearchRequest extends IKibanaSearchRequest { +export interface IEsSearchRequest + extends IKibanaSearchRequest { indexType?: string; } diff --git a/src/plugins/data/server/search/routes/bsearch.ts b/src/plugins/data/server/search/routes/bsearch.ts index 8b65c5d8eb1dc..ee19c8cba30a3 100644 --- a/src/plugins/data/server/search/routes/bsearch.ts +++ b/src/plugins/data/server/search/routes/bsearch.ts @@ -11,6 +11,7 @@ import { catchError } from 'rxjs/operators'; import { BfetchServerSetup } from '@kbn/bfetch-plugin/server'; import type { ExecutionContextSetup } from '@kbn/core/server'; import apm from 'elastic-apm-node'; +import { getRequestAbortedSignal } from '../..'; import { IKibanaSearchRequest, IKibanaSearchResponse, @@ -28,6 +29,7 @@ export function registerBsearchRoute( IKibanaSearchResponse >('/internal/bsearch', (request) => { const search = getScoped(request); + const abortSignal = getRequestAbortedSignal(request.events.aborted$); return { /** * @param requestOptions @@ -39,7 +41,7 @@ export function registerBsearchRoute( apm.addLabels(executionContextService.getAsLabels()); return firstValueFrom( - search.search(requestData, restOptions).pipe( + search.search(requestData, { ...restOptions, abortSignal }).pipe( catchError((err) => { // Re-throw as object, to get attributes passed to the client // eslint-disable-next-line no-throw-literal diff --git a/src/plugins/data/server/search/search_service.ts b/src/plugins/data/server/search/search_service.ts index 71a335ce51592..188f853e6a2ce 100644 --- a/src/plugins/data/server/search/search_service.ts +++ b/src/plugins/data/server/search/search_service.ts @@ -437,11 +437,7 @@ export class SearchService implements Plugin { } }; - private cancel = async ( - deps: SearchStrategyDependencies, - id: string, - options: ISearchOptions = {} - ) => { + private cancel = (deps: SearchStrategyDependencies, id: string, options: ISearchOptions = {}) => { const strategy = this.getSearchStrategy(options.strategy); if (!strategy.cancel) { throw new KbnServerError( @@ -468,14 +464,18 @@ export class SearchService implements Plugin { private cancelSessionSearches = async (deps: SearchStrategyDependencies, sessionId: string) => { const searchIdMapping = await deps.searchSessionsClient.getSearchIdMapping(sessionId); await Promise.allSettled( - Array.from(searchIdMapping).map(([searchId, strategyName]) => { + Array.from(searchIdMapping).map(async ([searchId, strategyName]) => { const searchOptions = { sessionId, strategy: strategyName, isStored: true, }; - return this.cancel(deps, searchId, searchOptions); + try { + await this.cancel(deps, searchId, searchOptions); + } catch (e) { + this.logger.error(`cancelSessionSearches error: ${e.message}`); + } }) ); }; diff --git a/src/plugins/data/server/search/strategies/eql_search/eql_search_strategy.test.ts b/src/plugins/data/server/search/strategies/eql_search/eql_search_strategy.test.ts index 6d61f62cc79ab..475c43a5daed6 100644 --- a/src/plugins/data/server/search/strategies/eql_search/eql_search_strategy.test.ts +++ b/src/plugins/data/server/search/strategies/eql_search/eql_search_strategy.test.ts @@ -15,6 +15,7 @@ import { getMockSearchConfig } from '../../../../config.mock'; const getMockEqlResponse = () => ({ body: { + id: 'my-search-id', is_partial: false, is_running: false, took: 162, @@ -54,6 +55,7 @@ describe('EQL search strategy', () => { describe('search()', () => { let mockEqlSearch: jest.Mock; let mockEqlGet: jest.Mock; + let mockEqlDelete: jest.Mock; let mockDeps: SearchStrategyDependencies; let params: Required['params']; let options: Required['options']; @@ -61,6 +63,8 @@ describe('EQL search strategy', () => { beforeEach(() => { mockEqlSearch = jest.fn().mockResolvedValueOnce(getMockEqlResponse()); mockEqlGet = jest.fn().mockResolvedValueOnce(getMockEqlResponse()); + mockEqlDelete = jest.fn(); + mockDeps = { uiSettingsClient: { get: jest.fn(), @@ -70,6 +74,7 @@ describe('EQL search strategy', () => { eql: { get: mockEqlGet, search: mockEqlSearch, + delete: mockEqlDelete, }, }, }, @@ -124,6 +129,34 @@ describe('EQL search strategy', () => { }); }); + it('should delete when aborted', async () => { + const response = getMockEqlResponse(); + mockEqlSearch.mockReset().mockResolvedValueOnce({ + ...response, + body: { + ...response.body, + is_running: true, + }, + }); + const eqlSearch = await eqlSearchStrategyProvider(mockSearchConfig, mockLogger); + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + // Abort after an incomplete first response is returned + setTimeout(() => abortController.abort(), 100); + + let err: any; + try { + await eqlSearch.search({ options, params }, { abortSignal }, mockDeps).toPromise(); + } catch (e) { + err = e; + } + + expect(mockEqlSearch).toBeCalled(); + expect(err).not.toBeUndefined(); + expect(mockEqlDelete).toBeCalled(); + }); + describe('arguments', () => { it('sends along async search options', async () => { const eqlSearch = await eqlSearchStrategyProvider(mockSearchConfig, mockLogger); diff --git a/src/plugins/data/server/search/strategies/eql_search/eql_search_strategy.ts b/src/plugins/data/server/search/strategies/eql_search/eql_search_strategy.ts index 9dd24e6791719..00b8cfdeb52e5 100644 --- a/src/plugins/data/server/search/strategies/eql_search/eql_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/eql_search/eql_search_strategy.ts @@ -9,6 +9,7 @@ import type { TransportResult } from '@elastic/elasticsearch'; import { tap } from 'rxjs/operators'; import type { IScopedClusterClient, Logger } from '@kbn/core/server'; +import { getKbnServerError } from '@kbn/kibana-utils-plugin/server'; import { SearchConfigSchema } from '../../../../config'; import { EqlSearchStrategyRequest, @@ -27,15 +28,19 @@ export const eqlSearchStrategyProvider = ( searchConfig: SearchConfigSchema, logger: Logger ): ISearchStrategy => { - async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) { + function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) { const client = esClient.asCurrentUser.eql; - await client.delete({ id }); + return client.delete({ id }); } return { cancel: async (id, options, { esClient }) => { logger.debug(`_eql/delete ${id}`); - await cancelAsyncSearch(id, esClient); + try { + await cancelAsyncSearch(id, esClient); + } catch (e) { + throw getKbnServerError(e); + } }, search: ({ id, ...request }, options: IAsyncSearchOptions, { esClient, uiSettingsClient }) => { @@ -85,8 +90,15 @@ export const eqlSearchStrategyProvider = ( }; const cancel = async () => { - if (id) { + if (!id) return; + try { await cancelAsyncSearch(id, esClient); + } catch (e) { + // A 404 means either this search request does not exist, or that it is already cancelled + if (e.meta?.statusCode === 404) return; + + // Log all other (unexpected) error messages + logger.error(`cancelEqlSearch error: ${e.message}`); } }; diff --git a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts index 3b2c5e8e0e5c8..33987c09d88dd 100644 --- a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts +++ b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts @@ -136,6 +136,30 @@ describe('ES search strategy', () => { expect(request).toHaveProperty('keep_alive', '60000ms'); }); + it('allows overriding keep_alive and wait_for_completion_timeout', async () => { + mockGetCaller.mockResolvedValueOnce(mockAsyncResponse); + + const params = { + index: 'logstash-*', + body: { query: {} }, + wait_for_completion_timeout: '10s', + keep_alive: '5m', + }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockLegacyConfig$, + mockSearchConfig, + mockLogger + ); + + await esSearch.search({ id: 'foo', params }, {}, mockDeps).toPromise(); + + expect(mockGetCaller).toBeCalled(); + const request = mockGetCaller.mock.calls[0][0]; + expect(request.id).toEqual('foo'); + expect(request).toHaveProperty('wait_for_completion_timeout', '10s'); + expect(request).toHaveProperty('keep_alive', '5m'); + }); + it('sets transport options on POST requests', async () => { const transportOptions = { maxRetries: 1 }; mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse); @@ -260,6 +284,38 @@ describe('ES search strategy', () => { expect(mockApiCaller).toBeCalledTimes(0); }); + + it('should delete when aborted', async () => { + mockSubmitCaller.mockResolvedValueOnce({ + ...mockAsyncResponse, + body: { + ...mockAsyncResponse.body, + is_running: true, + }, + }); + + const params = { index: 'logstash-*', body: { query: {} } }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockLegacyConfig$, + mockSearchConfig, + mockLogger + ); + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + // Abort after an incomplete first response is returned + setTimeout(() => abortController.abort(), 100); + + let err: KbnServerError | undefined; + try { + await esSearch.search({ params }, { abortSignal }, mockDeps).toPromise(); + } catch (e) { + err = e; + } + expect(mockSubmitCaller).toBeCalled(); + expect(err).not.toBeUndefined(); + expect(mockDeleteCaller).toBeCalled(); + }); }); describe('with sessionId', () => { @@ -367,6 +423,44 @@ describe('ES search strategy', () => { expect(request).toHaveProperty('wait_for_completion_timeout'); expect(request).not.toHaveProperty('keep_alive'); }); + + it('should not delete a saved session when aborted', async () => { + mockSubmitCaller.mockResolvedValueOnce({ + ...mockAsyncResponse, + body: { + ...mockAsyncResponse.body, + is_running: true, + }, + }); + + const params = { index: 'logstash-*', body: { query: {} } }; + const esSearch = await enhancedEsSearchStrategyProvider( + mockLegacyConfig$, + mockSearchConfig, + mockLogger + ); + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + // Abort after an incomplete first response is returned + setTimeout(() => abortController.abort(), 100); + + let err: KbnServerError | undefined; + try { + await esSearch + .search( + { params }, + { abortSignal, sessionId: '1', isSearchStored: true, isStored: true }, + mockDeps + ) + .toPromise(); + } catch (e) { + err = e; + } + expect(mockSubmitCaller).toBeCalled(); + expect(err).not.toBeUndefined(); + expect(mockDeleteCaller).not.toBeCalled(); + }); }); it('throws normalized error if ResponseError is thrown', async () => { diff --git a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.ts b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.ts index 89699d7d58611..174f9924f1cc7 100644 --- a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.ts @@ -12,6 +12,7 @@ import { catchError, tap } from 'rxjs/operators'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { firstValueFrom, from } from 'rxjs'; import { getKbnServerError } from '@kbn/kibana-utils-plugin/server'; +import { IAsyncSearchRequestParams } from '../..'; import { getKbnSearchError, KbnSearchError } from '../../report_search_error'; import type { ISearchStrategy, SearchStrategyDependencies } from '../../types'; import type { @@ -43,18 +44,14 @@ export const enhancedEsSearchStrategyProvider = ( logger: Logger, usage?: SearchUsage, useInternalUser: boolean = false -): ISearchStrategy => { - async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) { - try { - const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser; - await client.asyncSearch.delete({ id }); - } catch (e) { - throw getKbnServerError(e); - } +): ISearchStrategy> => { + function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) { + const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser; + return client.asyncSearch.delete({ id }); } function asyncSearch( - { id, ...request }: IEsSearchRequest, + { id, ...request }: IEsSearchRequest, options: IAsyncSearchOptions, { esClient, uiSettingsClient }: SearchStrategyDependencies ) { @@ -62,7 +59,13 @@ export const enhancedEsSearchStrategyProvider = ( const search = async () => { const params = id - ? getDefaultAsyncGetParams(searchConfig, options) + ? { + ...getDefaultAsyncGetParams(searchConfig, options), + ...(request.params?.keep_alive ? { keep_alive: request.params.keep_alive } : {}), + ...(request.params?.wait_for_completion_timeout + ? { wait_for_completion_timeout: request.params.wait_for_completion_timeout } + : {}), + } : { ...(await getDefaultAsyncSubmitParams(uiSettingsClient, searchConfig, options)), ...request.params, @@ -89,8 +92,15 @@ export const enhancedEsSearchStrategyProvider = ( }; const cancel = async () => { - if (id) { + if (!id || options.isStored) return; + try { await cancelAsyncSearch(id, esClient); + } catch (e) { + // A 404 means either this search request does not exist, or that it is already cancelled + if (e.meta?.statusCode === 404) return; + + // Log all other (unexpected) error messages + logger.error(`cancelAsyncSearch error: ${e.message}`); } }; @@ -179,7 +189,11 @@ export const enhancedEsSearchStrategyProvider = ( */ cancel: async (id, options, { esClient }) => { logger.debug(`cancel ${id}`); - await cancelAsyncSearch(id, esClient); + try { + await cancelAsyncSearch(id, esClient); + } catch (e) { + throw getKbnServerError(e); + } }, /** * diff --git a/src/plugins/data/server/search/strategies/ese_search/types.ts b/src/plugins/data/server/search/strategies/ese_search/types.ts index 4116aa4380339..5ff324e1c2e4f 100644 --- a/src/plugins/data/server/search/strategies/ese_search/types.ts +++ b/src/plugins/data/server/search/strategies/ese_search/types.ts @@ -6,11 +6,21 @@ * Side Public License, v 1. */ -import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import type { + AsyncSearchGetRequest, + SearchResponse, + ShardStatistics, +} from '@elastic/elasticsearch/lib/api/types'; +import { ISearchRequestParams } from '../../../../common'; + +export interface IAsyncSearchRequestParams extends ISearchRequestParams { + keep_alive?: AsyncSearchGetRequest['keep_alive']; + wait_for_completion_timeout?: AsyncSearchGetRequest['wait_for_completion_timeout']; +} export interface AsyncSearchResponse { id?: string; - response: estypes.SearchResponse; + response: SearchResponse; start_time_in_millis: number; expiration_time_in_millis: number; is_partial: boolean; @@ -18,5 +28,5 @@ export interface AsyncSearchResponse { } export interface AsyncSearchStatusResponse extends Omit { completion_status: number; - _shards: estypes.ShardStatistics; + _shards: ShardStatistics; } diff --git a/src/plugins/data/server/search/strategies/sql_search/sql_search_strategy.test.ts b/src/plugins/data/server/search/strategies/sql_search/sql_search_strategy.test.ts index 700c658de10c0..36fb43a34894f 100644 --- a/src/plugins/data/server/search/strategies/sql_search/sql_search_strategy.test.ts +++ b/src/plugins/data/server/search/strategies/sql_search/sql_search_strategy.test.ts @@ -124,6 +124,33 @@ describe('SQL search strategy', () => { signal: undefined, }); }); + + it('should delete when aborted', async () => { + mockSqlQuery.mockResolvedValueOnce({ + ...mockSqlResponse, + body: { + ...mockSqlResponse.body, + is_running: true, + }, + }); + const esSearch = await sqlSearchStrategyProvider(mockSearchConfig, mockLogger); + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + // Abort after an incomplete first response is returned + setTimeout(() => abortController.abort(), 100); + + let err: any; + try { + await esSearch.search({ params: {} }, { abortSignal }, mockDeps).toPromise(); + } catch (e) { + err = e; + } + + expect(mockSqlQuery).toBeCalled(); + expect(err).not.toBeUndefined(); + expect(mockSqlDelete).toBeCalled(); + }); }); // skip until full search session support https://github.com/elastic/kibana/issues/127880 diff --git a/src/plugins/data/server/search/strategies/sql_search/sql_search_strategy.ts b/src/plugins/data/server/search/strategies/sql_search/sql_search_strategy.ts index 87b29f5438efb..9e04675d12247 100644 --- a/src/plugins/data/server/search/strategies/sql_search/sql_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/sql_search/sql_search_strategy.ts @@ -29,13 +29,9 @@ export const sqlSearchStrategyProvider = ( logger: Logger, useInternalUser: boolean = false ): ISearchStrategy => { - async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) { - try { - const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser; - await client.sql.deleteAsync({ id }); - } catch (e) { - throw getKbnServerError(e); - } + function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) { + const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser; + return client.sql.deleteAsync({ id }); } function asyncSearch( @@ -92,8 +88,15 @@ export const sqlSearchStrategyProvider = ( }; const cancel = async () => { - if (id) { + if (!id) return; + try { await cancelAsyncSearch(id, esClient); + } catch (e) { + // A 404 means either this search request does not exist, or that it is already cancelled + if (e.meta?.statusCode === 404) return; + + // Log all other (unexpected) error messages + logger.error(`cancelSqlSearch error: ${e.message}`); } }; @@ -130,7 +133,11 @@ export const sqlSearchStrategyProvider = ( */ cancel: async (id, options, { esClient }) => { logger.debug(`sql search: cancel async_search_id=${id}`); - await cancelAsyncSearch(id, esClient); + try { + await cancelAsyncSearch(id, esClient); + } catch (e) { + throw getKbnServerError(e); + } }, /** * diff --git a/x-pack/test/api_integration/apis/search/search.ts b/x-pack/test/api_integration/apis/search/search.ts index 391923601d7c5..15c774eef34ef 100644 --- a/x-pack/test/api_integration/apis/search/search.ts +++ b/x-pack/test/api_integration/apis/search/search.ts @@ -186,6 +186,71 @@ export default function ({ getService }: FtrProviderContext) { expect(resp2.body.isRunning).to.be(true); }); + it('should cancel an async search without server crash', async function () { + await markRequiresShardDelayAgg(this); + + const resp = await supertest + .post(`/internal/search/ese`) + .set(ELASTIC_HTTP_VERSION_HEADER, '1') + .set('kbn-xsrf', 'foo') + .send({ + params: { + body: { + query: { + match_all: {}, + }, + ...shardDelayAgg('10s'), + }, + wait_for_completion_timeout: '1ms', + }, + }) + .expect(200); + + const { id } = resp.body; + expect(id).not.to.be(undefined); + expect(resp.body.isPartial).to.be(true); + expect(resp.body.isRunning).to.be(true); + + // Send a follow-up request that waits up to 10s for completion + const req = supertest + .post(`/internal/search/ese/${id}`) + .set(ELASTIC_HTTP_VERSION_HEADER, '1') + .set('kbn-xsrf', 'foo') + .send({ params: { wait_for_completion_timeout: '10s' } }) + .expect(200); + + // After 2s, abort and send the cancellation (to result in a race towards cancellation) + // This should be swallowed and not kill the Kibana server + await new Promise((resolve) => + setTimeout(() => { + req.abort(); + resolve(null); + }, 2000) + ); + await supertest + .delete(`/internal/search/ese/${id}`) + .set(ELASTIC_HTTP_VERSION_HEADER, '1') + .set('kbn-xsrf', 'foo') + .expect(200); + + let err: Error | undefined; + try { + await req; + } catch (e) { + err = e; + } + + expect(err).not.to.be(undefined); + + // Ensure the search was succesfully cancelled + await supertest + .post(`/internal/search/ese/${id}`) + .set(ELASTIC_HTTP_VERSION_HEADER, '1') + .set('kbn-xsrf', 'foo') + .send({}) + .expect(404); + }); + it('should fail without kbn-xref header', async () => { const resp = await supertest .post(`/internal/search/ese`)