Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data.search.bsearch] Forward request abortSignal to search strategy #170041

Merged
merged 30 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6bc335e
[data.search.bsearch] Forward request abortSignal to search strategy …
lukasolson Oct 19, 2023
d31c579
Return result of cancel
lukasolson Oct 27, 2023
2b13f44
Fix other cancel
lukasolson Oct 27, 2023
d8395bb
Update search service cancel
lukasolson Oct 27, 2023
ba6d8e2
Swallow cancellation errors in search interceptor
lukasolson Oct 27, 2023
312ae34
Swallow errors in delete route
lukasolson Oct 28, 2023
ab50c10
Clean up changes
lukasolson Oct 28, 2023
a5cd1b3
Swallow delete errors
lukasolson Oct 28, 2023
39d6136
More async fixes
lukasolson Oct 28, 2023
22795ca
Remove unnecessary changes
lukasolson Oct 28, 2023
f3f0839
Use async/await again
lukasolson Oct 28, 2023
4450a9b
Change one little thing
lukasolson Oct 28, 2023
0a84ff1
Another test
lukasolson Oct 29, 2023
79697de
another thingy
lukasolson Oct 29, 2023
04eccf7
Re-add .then
lukasolson Oct 29, 2023
0a13dbe
Consistent handling in search strategies
lukasolson Oct 29, 2023
7835584
Swallow errors during cancelSessionSearches
lukasolson Oct 29, 2023
bc865c7
Merge branch 'main' into bsearch/abort-again
lukasolson Oct 29, 2023
b91b6b0
Merge branch 'main' into bsearch/abort-again
kibanamachine Oct 30, 2023
8623348
Merge branch 'main' into bsearch/abort-again
lukasolson Nov 2, 2023
aa3817f
Review feedback
lukasolson Nov 3, 2023
ab69247
Merge branch 'bsearch/abort-again' of github.com:lukasolson/kibana in…
lukasolson Nov 3, 2023
c0de1e0
Merge branch 'main' into bsearch/abort-again
lukasolson Nov 3, 2023
34aa4dc
Add changes to other search strategies
lukasolson Nov 3, 2023
7fc746a
Add API integration test
lukasolson Nov 3, 2023
901983a
Fix undefined check
lukasolson Nov 7, 2023
50eb538
Merge branch 'main' into bsearch/abort-again
lukasolson Nov 7, 2023
b9fcdc3
Clean up cancel methods
lukasolson Nov 7, 2023
cce4cd1
Fix overriding of keep_alive and wait_for_completion_timeout
lukasolson Nov 7, 2023
a0c3b53
Merge branch 'main' into bsearch/abort-again
lukasolson Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export type ISearchRequestParams = {
trackTotalHits?: boolean;
} & estypes.SearchRequest;

export interface IEsSearchRequest extends IKibanaSearchRequest<ISearchRequestParams> {
export interface IEsSearchRequest<T extends ISearchRequestParams = ISearchRequestParams>
extends IKibanaSearchRequest<T> {
indexType?: string;
}

Expand Down
4 changes: 3 additions & 1 deletion src/plugins/data/server/search/routes/bsearch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { catchError } from 'rxjs/operators';
import { BfetchServerSetup } from '@kbn/bfetch-plugin/server';
import type { ExecutionContextSetup } from '@kbn/core/server';
import apm from 'elastic-apm-node';
import { getRequestAbortedSignal } from '../..';
import {
IKibanaSearchRequest,
IKibanaSearchResponse,
Expand All @@ -28,6 +29,7 @@ export function registerBsearchRoute(
IKibanaSearchResponse
>('/internal/bsearch', (request) => {
const search = getScoped(request);
const abortSignal = getRequestAbortedSignal(request.events.aborted$);
return {
/**
* @param requestOptions
Expand All @@ -39,7 +41,7 @@ export function registerBsearchRoute(
apm.addLabels(executionContextService.getAsLabels());

return firstValueFrom(
search.search(requestData, restOptions).pipe(
search.search(requestData, { ...restOptions, abortSignal }).pipe(
catchError((err) => {
// Re-throw as object, to get attributes passed to the client
// eslint-disable-next-line no-throw-literal
Expand Down
14 changes: 7 additions & 7 deletions src/plugins/data/server/search/search_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
}
};

private cancel = async (
deps: SearchStrategyDependencies,
id: string,
options: ISearchOptions = {}
) => {
private cancel = (deps: SearchStrategyDependencies, id: string, options: ISearchOptions = {}) => {
const strategy = this.getSearchStrategy(options.strategy);
if (!strategy.cancel) {
throw new KbnServerError(
Expand All @@ -468,14 +464,18 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
private cancelSessionSearches = async (deps: SearchStrategyDependencies, sessionId: string) => {
const searchIdMapping = await deps.searchSessionsClient.getSearchIdMapping(sessionId);
await Promise.allSettled(
Array.from(searchIdMapping).map(([searchId, strategyName]) => {
Array.from(searchIdMapping).map(async ([searchId, strategyName]) => {
davismcphee marked this conversation as resolved.
Show resolved Hide resolved
const searchOptions = {
sessionId,
strategy: strategyName,
isStored: true,
};

return this.cancel(deps, searchId, searchOptions);
try {
await this.cancel(deps, searchId, searchOptions);
} catch (e) {
this.logger.error(`cancelSessionSearches error: ${e.message}`);
}
})
);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { getMockSearchConfig } from '../../../../config.mock';

const getMockEqlResponse = () => ({
body: {
id: 'my-search-id',
is_partial: false,
is_running: false,
took: 162,
Expand Down Expand Up @@ -54,13 +55,16 @@ describe('EQL search strategy', () => {
describe('search()', () => {
let mockEqlSearch: jest.Mock;
let mockEqlGet: jest.Mock;
let mockEqlDelete: jest.Mock;
let mockDeps: SearchStrategyDependencies;
let params: Required<EqlSearchStrategyRequest>['params'];
let options: Required<EqlSearchStrategyRequest>['options'];

beforeEach(() => {
mockEqlSearch = jest.fn().mockResolvedValueOnce(getMockEqlResponse());
mockEqlGet = jest.fn().mockResolvedValueOnce(getMockEqlResponse());
mockEqlDelete = jest.fn();

mockDeps = {
uiSettingsClient: {
get: jest.fn(),
Expand All @@ -70,6 +74,7 @@ describe('EQL search strategy', () => {
eql: {
get: mockEqlGet,
search: mockEqlSearch,
delete: mockEqlDelete,
},
},
},
Expand Down Expand Up @@ -124,6 +129,34 @@ describe('EQL search strategy', () => {
});
});

it('should delete when aborted', async () => {
const response = getMockEqlResponse();
mockEqlSearch.mockReset().mockResolvedValueOnce({
...response,
body: {
...response.body,
is_running: true,
},
});
const eqlSearch = await eqlSearchStrategyProvider(mockSearchConfig, mockLogger);
const abortController = new AbortController();
const abortSignal = abortController.signal;

// Abort after an incomplete first response is returned
setTimeout(() => abortController.abort(), 100);

let err: any;
try {
await eqlSearch.search({ options, params }, { abortSignal }, mockDeps).toPromise();
} catch (e) {
err = e;
}

expect(mockEqlSearch).toBeCalled();
expect(err).not.toBeUndefined();
expect(mockEqlDelete).toBeCalled();
});

describe('arguments', () => {
it('sends along async search options', async () => {
const eqlSearch = await eqlSearchStrategyProvider(mockSearchConfig, mockLogger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import type { TransportResult } from '@elastic/elasticsearch';
import { tap } from 'rxjs/operators';
import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { getKbnServerError } from '@kbn/kibana-utils-plugin/server';
import { SearchConfigSchema } from '../../../../config';
import {
EqlSearchStrategyRequest,
Expand All @@ -27,15 +28,19 @@ export const eqlSearchStrategyProvider = (
searchConfig: SearchConfigSchema,
logger: Logger
): ISearchStrategy<EqlSearchStrategyRequest, EqlSearchStrategyResponse> => {
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
const client = esClient.asCurrentUser.eql;
await client.delete({ id });
return client.delete({ id });
davismcphee marked this conversation as resolved.
Show resolved Hide resolved
}

return {
cancel: async (id, options, { esClient }) => {
logger.debug(`_eql/delete ${id}`);
await cancelAsyncSearch(id, esClient);
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
throw getKbnServerError(e);
}
},

search: ({ id, ...request }, options: IAsyncSearchOptions, { esClient, uiSettingsClient }) => {
Expand Down Expand Up @@ -85,8 +90,15 @@ export const eqlSearchStrategyProvider = (
};

const cancel = async () => {
if (id) {
if (!id) return;
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
// A 404 means either this search request does not exist, or that it is already cancelled
if (e.meta?.statusCode === 404) return;

// Log all other (unexpected) error messages
logger.error(`cancelEqlSearch error: ${e.message}`);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,30 @@ describe('ES search strategy', () => {
expect(request).toHaveProperty('keep_alive', '60000ms');
});

it('allows overriding keep_alive and wait_for_completion_timeout', async () => {
mockGetCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = {
index: 'logstash-*',
body: { query: {} },
wait_for_completion_timeout: '10s',
keep_alive: '5m',
};
const esSearch = await enhancedEsSearchStrategyProvider(
mockLegacyConfig$,
mockSearchConfig,
mockLogger
);

await esSearch.search({ id: 'foo', params }, {}, mockDeps).toPromise();

expect(mockGetCaller).toBeCalled();
const request = mockGetCaller.mock.calls[0][0];
expect(request.id).toEqual('foo');
expect(request).toHaveProperty('wait_for_completion_timeout', '10s');
expect(request).toHaveProperty('keep_alive', '5m');
});

it('sets transport options on POST requests', async () => {
const transportOptions = { maxRetries: 1 };
mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse);
Expand Down Expand Up @@ -260,6 +284,38 @@ describe('ES search strategy', () => {

expect(mockApiCaller).toBeCalledTimes(0);
});

it('should delete when aborted', async () => {
davismcphee marked this conversation as resolved.
Show resolved Hide resolved
mockSubmitCaller.mockResolvedValueOnce({
...mockAsyncResponse,
body: {
...mockAsyncResponse.body,
is_running: true,
},
});

const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(
mockLegacyConfig$,
mockSearchConfig,
mockLogger
);
const abortController = new AbortController();
const abortSignal = abortController.signal;

// Abort after an incomplete first response is returned
setTimeout(() => abortController.abort(), 100);

let err: KbnServerError | undefined;
try {
await esSearch.search({ params }, { abortSignal }, mockDeps).toPromise();
} catch (e) {
err = e;
}
expect(mockSubmitCaller).toBeCalled();
expect(err).not.toBeUndefined();
expect(mockDeleteCaller).toBeCalled();
});
});

describe('with sessionId', () => {
Expand Down Expand Up @@ -367,6 +423,44 @@ describe('ES search strategy', () => {
expect(request).toHaveProperty('wait_for_completion_timeout');
expect(request).not.toHaveProperty('keep_alive');
});

it('should not delete a saved session when aborted', async () => {
mockSubmitCaller.mockResolvedValueOnce({
...mockAsyncResponse,
body: {
...mockAsyncResponse.body,
is_running: true,
},
});

const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(
mockLegacyConfig$,
mockSearchConfig,
mockLogger
);
const abortController = new AbortController();
const abortSignal = abortController.signal;

// Abort after an incomplete first response is returned
setTimeout(() => abortController.abort(), 100);

let err: KbnServerError | undefined;
try {
await esSearch
.search(
{ params },
{ abortSignal, sessionId: '1', isSearchStored: true, isStored: true },
mockDeps
)
.toPromise();
} catch (e) {
err = e;
}
expect(mockSubmitCaller).toBeCalled();
expect(err).not.toBeUndefined();
expect(mockDeleteCaller).not.toBeCalled();
});
});

it('throws normalized error if ResponseError is thrown', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { catchError, tap } from 'rxjs/operators';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { firstValueFrom, from } from 'rxjs';
import { getKbnServerError } from '@kbn/kibana-utils-plugin/server';
import { IAsyncSearchRequestParams } from '../..';
import { getKbnSearchError, KbnSearchError } from '../../report_search_error';
import type { ISearchStrategy, SearchStrategyDependencies } from '../../types';
import type {
Expand Down Expand Up @@ -43,26 +44,28 @@ export const enhancedEsSearchStrategyProvider = (
logger: Logger,
usage?: SearchUsage,
useInternalUser: boolean = false
): ISearchStrategy => {
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
try {
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;
await client.asyncSearch.delete({ id });
} catch (e) {
throw getKbnServerError(e);
}
): ISearchStrategy<IEsSearchRequest<IAsyncSearchRequestParams>> => {
function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
davismcphee marked this conversation as resolved.
Show resolved Hide resolved
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;
return client.asyncSearch.delete({ id });
}

function asyncSearch(
{ id, ...request }: IEsSearchRequest,
{ id, ...request }: IEsSearchRequest<IAsyncSearchRequestParams>,
options: IAsyncSearchOptions,
{ esClient, uiSettingsClient }: SearchStrategyDependencies
) {
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;

const search = async () => {
const params = id
? getDefaultAsyncGetParams(searchConfig, options)
? {
...getDefaultAsyncGetParams(searchConfig, options),
...(request.params?.keep_alive ? { keep_alive: request.params.keep_alive } : {}),
...(request.params?.wait_for_completion_timeout
? { wait_for_completion_timeout: request.params.wait_for_completion_timeout }
: {}),
}
: {
...(await getDefaultAsyncSubmitParams(uiSettingsClient, searchConfig, options)),
...request.params,
Expand All @@ -89,8 +92,15 @@ export const enhancedEsSearchStrategyProvider = (
};

const cancel = async () => {
if (id) {
if (!id || options.isStored) return;
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
// A 404 means either this search request does not exist, or that it is already cancelled
if (e.meta?.statusCode === 404) return;

// Log all other (unexpected) error messages
logger.error(`cancelAsyncSearch error: ${e.message}`);
}
};

Expand Down Expand Up @@ -179,7 +189,11 @@ export const enhancedEsSearchStrategyProvider = (
*/
cancel: async (id, options, { esClient }) => {
logger.debug(`cancel ${id}`);
await cancelAsyncSearch(id, esClient);
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
throw getKbnServerError(e);
}
},
/**
*
Expand Down
Loading
Loading