Skip to content

Commit

Permalink
[data.search.bsearch] Forward request abortSignal to search strategy (e…
Browse files Browse the repository at this point in the history
…lastic#170041)

## Summary

Attempt to resurrect elastic#169041.

Previously flaky tests:
* group1:
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/3797
* group2:
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/3798
* group3:
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/3799
* group4:
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/3800

### Checklist

Delete any items that are not applicable to this PR.

- [ ] Any text added follows [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)
- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] Any UI touched in this PR is usable by keyboard only (learn more
about [keyboard accessibility](https://webaim.org/techniques/keyboard/))
- [ ] Any UI touched in this PR does not create any new axe failures
(run axe in browser:
[FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/),
[Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US))
- [ ] If a plugin configuration key changed, check if it needs to be
allowlisted in the cloud and added to the [docker
list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)
- [ ] This renders correctly on smaller devices using a responsive
layout. (You can test this [in your
browser](https://www.browserstack.com/guide/responsive-testing-on-local-server))
- [ ] This was checked for [cross-browser
compatibility](https://www.elastic.co/support/matrix#matrix_browsers)


### Risk Matrix

Delete this section if it is not applicable to this PR.

Before closing this PR, invite QA, stakeholders, and other developers to
identify risks that should be tested prior to the change/feature
release.

When forming the risk matrix, consider some of the following examples
and how they may potentially impact the change:

| Risk | Probability | Severity | Mitigation/Notes |

|---------------------------|-------------|----------|-------------------------|
| Multiple Spaces—unexpected behavior in non-default Kibana Space.
| Low | High | Integration tests will verify that all features are still
supported in non-default Kibana Space and when user switches between
spaces. |
| Multiple nodes—Elasticsearch polling might have race conditions
when multiple Kibana nodes are polling for the same tasks. | High | Low
| Tasks are idempotent, so executing them multiple times will not result
in logical error, but will degrade performance. To test for this case we
add plenty of unit tests around this logic and document manual testing
procedure. |
| Code should gracefully handle cases when feature X or plugin Y are
disabled. | Medium | High | Unit tests will verify that any feature flag
or plugin combination still results in our service operational. |
| [See more potential risk
examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) |


### For maintainers

- [ ] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

---------

Co-authored-by: Stratoula Kalafateli <[email protected]>
Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
3 people authored Nov 10, 2023
1 parent 4b55f11 commit b84881a
Show file tree
Hide file tree
Showing 11 changed files with 302 additions and 37 deletions.
3 changes: 2 additions & 1 deletion src/plugins/data/common/search/strategies/es_search/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export type ISearchRequestParams = {
trackTotalHits?: boolean;
} & estypes.SearchRequest;

export interface IEsSearchRequest extends IKibanaSearchRequest<ISearchRequestParams> {
export interface IEsSearchRequest<T extends ISearchRequestParams = ISearchRequestParams>
extends IKibanaSearchRequest<T> {
indexType?: string;
}

Expand Down
4 changes: 3 additions & 1 deletion src/plugins/data/server/search/routes/bsearch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { catchError } from 'rxjs/operators';
import { BfetchServerSetup } from '@kbn/bfetch-plugin/server';
import type { ExecutionContextSetup } from '@kbn/core/server';
import apm from 'elastic-apm-node';
import { getRequestAbortedSignal } from '../..';
import {
IKibanaSearchRequest,
IKibanaSearchResponse,
Expand All @@ -28,6 +29,7 @@ export function registerBsearchRoute(
IKibanaSearchResponse
>('/internal/bsearch', (request) => {
const search = getScoped(request);
const abortSignal = getRequestAbortedSignal(request.events.aborted$);
return {
/**
* @param requestOptions
Expand All @@ -39,7 +41,7 @@ export function registerBsearchRoute(
apm.addLabels(executionContextService.getAsLabels());

return firstValueFrom(
search.search(requestData, restOptions).pipe(
search.search(requestData, { ...restOptions, abortSignal }).pipe(
catchError((err) => {
// Re-throw as object, to get attributes passed to the client
// eslint-disable-next-line no-throw-literal
Expand Down
14 changes: 7 additions & 7 deletions src/plugins/data/server/search/search_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
}
};

private cancel = async (
deps: SearchStrategyDependencies,
id: string,
options: ISearchOptions = {}
) => {
private cancel = (deps: SearchStrategyDependencies, id: string, options: ISearchOptions = {}) => {
const strategy = this.getSearchStrategy(options.strategy);
if (!strategy.cancel) {
throw new KbnServerError(
Expand All @@ -468,14 +464,18 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
private cancelSessionSearches = async (deps: SearchStrategyDependencies, sessionId: string) => {
const searchIdMapping = await deps.searchSessionsClient.getSearchIdMapping(sessionId);
await Promise.allSettled(
Array.from(searchIdMapping).map(([searchId, strategyName]) => {
Array.from(searchIdMapping).map(async ([searchId, strategyName]) => {
const searchOptions = {
sessionId,
strategy: strategyName,
isStored: true,
};

return this.cancel(deps, searchId, searchOptions);
try {
await this.cancel(deps, searchId, searchOptions);
} catch (e) {
this.logger.error(`cancelSessionSearches error: ${e.message}`);
}
})
);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { getMockSearchConfig } from '../../../../config.mock';

const getMockEqlResponse = () => ({
body: {
id: 'my-search-id',
is_partial: false,
is_running: false,
took: 162,
Expand Down Expand Up @@ -54,13 +55,16 @@ describe('EQL search strategy', () => {
describe('search()', () => {
let mockEqlSearch: jest.Mock;
let mockEqlGet: jest.Mock;
let mockEqlDelete: jest.Mock;
let mockDeps: SearchStrategyDependencies;
let params: Required<EqlSearchStrategyRequest>['params'];
let options: Required<EqlSearchStrategyRequest>['options'];

beforeEach(() => {
mockEqlSearch = jest.fn().mockResolvedValueOnce(getMockEqlResponse());
mockEqlGet = jest.fn().mockResolvedValueOnce(getMockEqlResponse());
mockEqlDelete = jest.fn();

mockDeps = {
uiSettingsClient: {
get: jest.fn(),
Expand All @@ -70,6 +74,7 @@ describe('EQL search strategy', () => {
eql: {
get: mockEqlGet,
search: mockEqlSearch,
delete: mockEqlDelete,
},
},
},
Expand Down Expand Up @@ -124,6 +129,34 @@ describe('EQL search strategy', () => {
});
});

it('should delete when aborted', async () => {
const response = getMockEqlResponse();
mockEqlSearch.mockReset().mockResolvedValueOnce({
...response,
body: {
...response.body,
is_running: true,
},
});
const eqlSearch = await eqlSearchStrategyProvider(mockSearchConfig, mockLogger);
const abortController = new AbortController();
const abortSignal = abortController.signal;

// Abort after an incomplete first response is returned
setTimeout(() => abortController.abort(), 100);

let err: any;
try {
await eqlSearch.search({ options, params }, { abortSignal }, mockDeps).toPromise();
} catch (e) {
err = e;
}

expect(mockEqlSearch).toBeCalled();
expect(err).not.toBeUndefined();
expect(mockEqlDelete).toBeCalled();
});

describe('arguments', () => {
it('sends along async search options', async () => {
const eqlSearch = await eqlSearchStrategyProvider(mockSearchConfig, mockLogger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import type { TransportResult } from '@elastic/elasticsearch';
import { tap } from 'rxjs/operators';
import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { getKbnServerError } from '@kbn/kibana-utils-plugin/server';
import { SearchConfigSchema } from '../../../../config';
import {
EqlSearchStrategyRequest,
Expand All @@ -27,15 +28,19 @@ export const eqlSearchStrategyProvider = (
searchConfig: SearchConfigSchema,
logger: Logger
): ISearchStrategy<EqlSearchStrategyRequest, EqlSearchStrategyResponse> => {
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
const client = esClient.asCurrentUser.eql;
await client.delete({ id });
return client.delete({ id });
}

return {
cancel: async (id, options, { esClient }) => {
logger.debug(`_eql/delete ${id}`);
await cancelAsyncSearch(id, esClient);
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
throw getKbnServerError(e);
}
},

search: ({ id, ...request }, options: IAsyncSearchOptions, { esClient, uiSettingsClient }) => {
Expand Down Expand Up @@ -85,8 +90,15 @@ export const eqlSearchStrategyProvider = (
};

const cancel = async () => {
if (id) {
if (!id) return;
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
// A 404 means either this search request does not exist, or that it is already cancelled
if (e.meta?.statusCode === 404) return;

// Log all other (unexpected) error messages
logger.error(`cancelEqlSearch error: ${e.message}`);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,30 @@ describe('ES search strategy', () => {
expect(request).toHaveProperty('keep_alive', '60000ms');
});

it('allows overriding keep_alive and wait_for_completion_timeout', async () => {
mockGetCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = {
index: 'logstash-*',
body: { query: {} },
wait_for_completion_timeout: '10s',
keep_alive: '5m',
};
const esSearch = await enhancedEsSearchStrategyProvider(
mockLegacyConfig$,
mockSearchConfig,
mockLogger
);

await esSearch.search({ id: 'foo', params }, {}, mockDeps).toPromise();

expect(mockGetCaller).toBeCalled();
const request = mockGetCaller.mock.calls[0][0];
expect(request.id).toEqual('foo');
expect(request).toHaveProperty('wait_for_completion_timeout', '10s');
expect(request).toHaveProperty('keep_alive', '5m');
});

it('sets transport options on POST requests', async () => {
const transportOptions = { maxRetries: 1 };
mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse);
Expand Down Expand Up @@ -260,6 +284,38 @@ describe('ES search strategy', () => {

expect(mockApiCaller).toBeCalledTimes(0);
});

it('should delete when aborted', async () => {
mockSubmitCaller.mockResolvedValueOnce({
...mockAsyncResponse,
body: {
...mockAsyncResponse.body,
is_running: true,
},
});

const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(
mockLegacyConfig$,
mockSearchConfig,
mockLogger
);
const abortController = new AbortController();
const abortSignal = abortController.signal;

// Abort after an incomplete first response is returned
setTimeout(() => abortController.abort(), 100);

let err: KbnServerError | undefined;
try {
await esSearch.search({ params }, { abortSignal }, mockDeps).toPromise();
} catch (e) {
err = e;
}
expect(mockSubmitCaller).toBeCalled();
expect(err).not.toBeUndefined();
expect(mockDeleteCaller).toBeCalled();
});
});

describe('with sessionId', () => {
Expand Down Expand Up @@ -367,6 +423,44 @@ describe('ES search strategy', () => {
expect(request).toHaveProperty('wait_for_completion_timeout');
expect(request).not.toHaveProperty('keep_alive');
});

it('should not delete a saved session when aborted', async () => {
mockSubmitCaller.mockResolvedValueOnce({
...mockAsyncResponse,
body: {
...mockAsyncResponse.body,
is_running: true,
},
});

const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(
mockLegacyConfig$,
mockSearchConfig,
mockLogger
);
const abortController = new AbortController();
const abortSignal = abortController.signal;

// Abort after an incomplete first response is returned
setTimeout(() => abortController.abort(), 100);

let err: KbnServerError | undefined;
try {
await esSearch
.search(
{ params },
{ abortSignal, sessionId: '1', isSearchStored: true, isStored: true },
mockDeps
)
.toPromise();
} catch (e) {
err = e;
}
expect(mockSubmitCaller).toBeCalled();
expect(err).not.toBeUndefined();
expect(mockDeleteCaller).not.toBeCalled();
});
});

it('throws normalized error if ResponseError is thrown', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { catchError, tap } from 'rxjs/operators';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { firstValueFrom, from } from 'rxjs';
import { getKbnServerError } from '@kbn/kibana-utils-plugin/server';
import { IAsyncSearchRequestParams } from '../..';
import { getKbnSearchError, KbnSearchError } from '../../report_search_error';
import type { ISearchStrategy, SearchStrategyDependencies } from '../../types';
import type {
Expand Down Expand Up @@ -43,26 +44,28 @@ export const enhancedEsSearchStrategyProvider = (
logger: Logger,
usage?: SearchUsage,
useInternalUser: boolean = false
): ISearchStrategy => {
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
try {
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;
await client.asyncSearch.delete({ id });
} catch (e) {
throw getKbnServerError(e);
}
): ISearchStrategy<IEsSearchRequest<IAsyncSearchRequestParams>> => {
function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;
return client.asyncSearch.delete({ id });
}

function asyncSearch(
{ id, ...request }: IEsSearchRequest,
{ id, ...request }: IEsSearchRequest<IAsyncSearchRequestParams>,
options: IAsyncSearchOptions,
{ esClient, uiSettingsClient }: SearchStrategyDependencies
) {
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;

const search = async () => {
const params = id
? getDefaultAsyncGetParams(searchConfig, options)
? {
...getDefaultAsyncGetParams(searchConfig, options),
...(request.params?.keep_alive ? { keep_alive: request.params.keep_alive } : {}),
...(request.params?.wait_for_completion_timeout
? { wait_for_completion_timeout: request.params.wait_for_completion_timeout }
: {}),
}
: {
...(await getDefaultAsyncSubmitParams(uiSettingsClient, searchConfig, options)),
...request.params,
Expand All @@ -89,8 +92,15 @@ export const enhancedEsSearchStrategyProvider = (
};

const cancel = async () => {
if (id) {
if (!id || options.isStored) return;
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
// A 404 means either this search request does not exist, or that it is already cancelled
if (e.meta?.statusCode === 404) return;

// Log all other (unexpected) error messages
logger.error(`cancelAsyncSearch error: ${e.message}`);
}
};

Expand Down Expand Up @@ -179,7 +189,11 @@ export const enhancedEsSearchStrategyProvider = (
*/
cancel: async (id, options, { esClient }) => {
logger.debug(`cancel ${id}`);
await cancelAsyncSearch(id, esClient);
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
throw getKbnServerError(e);
}
},
/**
*
Expand Down
Loading

0 comments on commit b84881a

Please sign in to comment.