Skip to content

Commit

Permalink
[Search service] Asynchronous ES search strategy (elastic#53538) (ela…
Browse files Browse the repository at this point in the history
…stic#60469)

* Add async search strategy

* Add async search

* Fix async strategy and add tests

* Move types to separate file

* Revert changes to demo search

* Update demo search strategy to use async

* Add async es search strategy

* Return response as rawResponse

* Poll after initial request

* Add cancellation to search strategies

* Add tests

* Simplify async search strategy

* Move loadingCount to search strategy

* Update abort controller library

* Bootstrap

* Abort when the request is aborted

* Add utility and update value suggestions route

* Fix bad merge conflict

* Update tests

* Move to data_enhanced plugin

* Remove bad merge

* Revert switching abort controller libraries

* Revert package.json in lib

* Move to previous abort controller

* Add support for frozen indices

* Fix test to use fake timers to run debounced handlers

* Revert changes to example plugin

* Fix loading bar not going away when cancelling

* Call getSearchStrategy instead of passing  directly

* Add async demo search strategy

* Fix error with setting state

* Update how aborting works

* Fix type checks

* Add test for loading count

* Attempt to fix broken example test

* Revert changes to test

* Fix test

* Update name to camelCase

* Fix failing test

* Don't require data_enhanced in example plugin

* Actually send DELETE request

* Use waitForCompletion parameter

* Use default search params

* Add support for rollups

* Only make changes needed for frozen indices/rollups

* Only make changes needed for frozen indices/rollups

* Add back in async functionality

* Fix tests/types

* Fix issue with sending empty body in GET

* Don't include skipped in loaded/total

* Don't wait before polling the next time

* Simplify search logic

* Fix merge error

* Review feedback

* Fix issue with hits.total

Co-authored-by: Elastic Machine <[email protected]>

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
lukasolson and elasticmachine authored Mar 20, 2020
1 parent f03aa7b commit 3bfabb0
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 40 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) &gt; [ISearchCancel](./kibana-plugin-plugins-data-server.isearchcancel.md)

## ISearchCancel type

<b>Signature:</b>

```typescript
export declare type ISearchCancel<T extends TStrategyTypes> = (id: string) => Promise<void>;
```
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@
| Type Alias | Description |
| --- | --- |
| [FieldFormatsGetConfigFn](./kibana-plugin-plugins-data-server.fieldformatsgetconfigfn.md) | |
| [ICancel](./kibana-plugin-plugins-data-server.icancel.md) | |
| [IFieldFormatsRegistry](./kibana-plugin-plugins-data-server.ifieldformatsregistry.md) | |
| [ISearch](./kibana-plugin-plugins-data-server.isearch.md) | |
| [ISearchCancel](./kibana-plugin-plugins-data-server.isearchcancel.md) | |
| [ParsedInterval](./kibana-plugin-plugins-data-server.parsedinterval.md) | |
| [TSearchStrategyProvider](./kibana-plugin-plugins-data-server.tsearchstrategyprovider.md) | Search strategy provider creates an instance of a search strategy with the request handler context bound to it. This way every search strategy can use whatever information they require from the request context. |

2 changes: 1 addition & 1 deletion src/plugins/data/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ export { ParsedInterval } from '../common';

export {
ISearch,
ICancel,
ISearchCancel,
ISearchOptions,
IRequestTypesMap,
IResponseTypesMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
* under the License.
*/

import { ISearchGeneric, ICancelGeneric } from './i_search';
import { ISearchGeneric, ISearchCancelGeneric } from './i_search';

export interface IRouteHandlerSearchContext {
search: ISearchGeneric;
cancel: ICancelGeneric;
cancel: ISearchCancelGeneric;
}
4 changes: 2 additions & 2 deletions src/plugins/data/server/search/i_search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export type ISearchGeneric = <T extends TStrategyTypes = typeof ES_SEARCH_STRATE
strategy?: T
) => Promise<IResponseTypesMap[T]>;

export type ICancelGeneric = <T extends TStrategyTypes = typeof ES_SEARCH_STRATEGY>(
export type ISearchCancelGeneric = <T extends TStrategyTypes = typeof ES_SEARCH_STRATEGY>(
id: string,
strategy?: T
) => Promise<void>;
Expand All @@ -52,4 +52,4 @@ export type ISearch<T extends TStrategyTypes> = (
options?: ISearchOptions
) => Promise<IResponseTypesMap[T]>;

export type ICancel<T extends TStrategyTypes> = (id: string) => Promise<void>;
export type ISearchCancel<T extends TStrategyTypes> = (id: string) => Promise<void>;
4 changes: 2 additions & 2 deletions src/plugins/data/server/search/i_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import { APICaller } from 'kibana/server';
import { ISearch, ICancel, ISearchGeneric } from './i_search';
import { ISearch, ISearchCancel, ISearchGeneric } from './i_search';
import { TStrategyTypes } from './strategy_types';
import { ISearchContext } from './i_search_context';

Expand All @@ -28,7 +28,7 @@ import { ISearchContext } from './i_search_context';
*/
export interface ISearchStrategy<T extends TStrategyTypes> {
search: ISearch<T>;
cancel?: ICancel<T>;
cancel?: ISearchCancel<T>;
}

/**
Expand Down
8 changes: 7 additions & 1 deletion src/plugins/data/server/search/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ export { ISearchSetup } from './i_search_setup';

export { ISearchContext } from './i_search_context';

export { ISearch, ICancel, ISearchOptions, IRequestTypesMap, IResponseTypesMap } from './i_search';
export {
ISearch,
ISearchCancel,
ISearchOptions,
IRequestTypesMap,
IResponseTypesMap,
} from './i_search';

export { TStrategyTypes } from './strategy_types';

Expand Down
12 changes: 6 additions & 6 deletions src/plugins/data/server/server.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,6 @@ export function getDefaultSearchParams(config: SharedGlobalConfig): {
restTotalHitsAsInt: boolean;
};

// Warning: (ae-forgotten-export) The symbol "TStrategyTypes" needs to be exported by the entry point index.d.ts
// Warning: (ae-missing-release-tag) "ICancel" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export type ICancel<T extends TStrategyTypes> = (id: string) => Promise<void>;

// Warning: (ae-missing-release-tag) "IFieldFormatsRegistry" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
Expand Down Expand Up @@ -509,11 +503,17 @@ export interface IResponseTypesMap {
[ES_SEARCH_STRATEGY]: IEsSearchResponse;
}

// Warning: (ae-forgotten-export) The symbol "TStrategyTypes" needs to be exported by the entry point index.d.ts
// Warning: (ae-missing-release-tag) "ISearch" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export type ISearch<T extends TStrategyTypes> = (request: IRequestTypesMap[T], options?: ISearchOptions) => Promise<IResponseTypesMap[T]>;

// Warning: (ae-missing-release-tag) "ISearchCancel" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export type ISearchCancel<T extends TStrategyTypes> = (id: string) => Promise<void>;

// Warning: (ae-missing-release-tag) "ISearchContext" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
Expand Down
16 changes: 10 additions & 6 deletions x-pack/plugins/data_enhanced/public/search/es_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import {
TSearchStrategyProvider,
ISearchContext,
ISearch,
SYNC_SEARCH_STRATEGY,
getEsPreference,
} from '../../../../../src/plugins/data/public';
import { IEnhancedEsSearchRequest, EnhancedSearchParams } from '../../common';
import { ASYNC_SEARCH_STRATEGY } from './async_search_strategy';
import { IAsyncSearchOptions } from './types';

export const enhancedEsSearchStrategyProvider: TSearchStrategyProvider<typeof ES_SEARCH_STRATEGY> = (
context: ISearchContext
) => {
const syncStrategyProvider = context.getSearchStrategy(SYNC_SEARCH_STRATEGY);
const { search: syncSearch } = syncStrategyProvider(context);
const asyncStrategyProvider = context.getSearchStrategy(ASYNC_SEARCH_STRATEGY);
const { search: asyncSearch } = asyncStrategyProvider(context);

const search: ISearch<typeof ES_SEARCH_STRATEGY> = (
request: IEnhancedEsSearchRequest,
Expand All @@ -32,9 +33,12 @@ export const enhancedEsSearchStrategyProvider: TSearchStrategyProvider<typeof ES
};
request.params = params;

return syncSearch({ ...request, serverStrategy: ES_SEARCH_STRATEGY }, options) as Observable<
IEsSearchResponse
>;
const asyncOptions: IAsyncSearchOptions = { pollInterval: 0, ...options };

return asyncSearch(
{ ...request, serverStrategy: ES_SEARCH_STRATEGY },
asyncOptions
) as Observable<IEsSearchResponse>;
};

return { search };
Expand Down
56 changes: 48 additions & 8 deletions x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@ import {
TSearchStrategyProvider,
ISearch,
ISearchOptions,
ISearchCancel,
getDefaultSearchParams,
} from '../../../../../src/plugins/data/server';
import { IEnhancedEsSearchRequest } from '../../common';

export interface AsyncSearchResponse<T> {
id: string;
response: SearchResponse<T>;
}

export const enhancedEsSearchStrategyProvider: TSearchStrategyProvider<typeof ES_SEARCH_STRATEGY> = (
context: ISearchContext,
caller: APICaller
Expand All @@ -28,28 +34,62 @@ export const enhancedEsSearchStrategyProvider: TSearchStrategyProvider<typeof ES
) => {
const config = await context.config$.pipe(first()).toPromise();
const defaultParams = getDefaultSearchParams(config);
const params = { ...defaultParams, ...request.params };
const params = { ...defaultParams, trackTotalHits: true, ...request.params };

const rawResponse = (await (request.indexType === 'rollup'
const response = await (request.indexType === 'rollup'
? rollupSearch(caller, { ...request, params }, options)
: caller('search', params, options))) as SearchResponse<any>;
: asyncSearch(caller, { ...request, params }, options));

const rawResponse =
request.indexType === 'rollup'
? (response as SearchResponse<any>)
: (response as AsyncSearchResponse<any>).response;

if (typeof rawResponse.hits.total !== 'number') {
// @ts-ignore This should be fixed as part of https://github.com/elastic/kibana/issues/26356
rawResponse.hits.total = rawResponse.hits.total.value;
}

const id = (response as AsyncSearchResponse<any>).id;
const { total, failed, successful } = rawResponse._shards;
const loaded = failed + successful;
return { total, loaded, rawResponse };
return { id, total, loaded, rawResponse };
};

return { search };
const cancel: ISearchCancel<typeof ES_SEARCH_STRATEGY> = async id => {
const method = 'DELETE';
const path = `_async_search/${id}`;
await caller('transport.request', { method, path });
};

return { search, cancel };
};

function rollupSearch(
function asyncSearch(
caller: APICaller,
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
) {
const { body = undefined, index = undefined, ...params } = request.id ? {} : request.params;

// If we have an ID, then just poll for that ID, otherwise send the entire request body
const method = request.id ? 'GET' : 'POST';
const path = request.id ? `_async_search/${request.id}` : `${index}/_async_search`;

// Wait up to 1s for the response to return
const query = toSnakeCase({ waitForCompletion: '1s', ...params });

return caller('transport.request', { method, path, body, query }, options);
}

async function rollupSearch(
caller: APICaller,
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
) {
const { body, index, ...params } = request.params;
const method = 'POST';
const path = `${request.params.index}/_rollup_search`;
const { body, ...params } = request.params;
const path = `${index}/_rollup_search`;
const query = toSnakeCase(params);
return caller('transport.request', { method, path, body, query }, options);
}
Expand Down

0 comments on commit 3bfabb0

Please sign in to comment.