-
Notifications
You must be signed in to change notification settings - Fork 8.3k
/
async_search_strategy.ts
73 lines (65 loc) · 2.69 KB
/
async_search_strategy.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { EMPTY, fromEvent, NEVER, Observable, throwError, timer } from 'rxjs';
import { mergeMap, expand, takeUntil } from 'rxjs/operators';
import { AbortError } from '../../../../../src/plugins/data/common';
import {
IKibanaSearchResponse,
ISearchContext,
ISearchStrategy,
SYNC_SEARCH_STRATEGY,
TSearchStrategyProvider,
} from '../../../../../src/plugins/data/public';
import { IAsyncSearchRequest, IAsyncSearchOptions } from './types';
export const ASYNC_SEARCH_STRATEGY = 'ASYNC_SEARCH_STRATEGY';
declare module '../../../../../src/plugins/data/public' {
export interface IRequestTypesMap {
[ASYNC_SEARCH_STRATEGY]: IAsyncSearchRequest;
}
}
export const asyncSearchStrategyProvider: TSearchStrategyProvider<typeof ASYNC_SEARCH_STRATEGY> = (
context: ISearchContext
): ISearchStrategy<typeof ASYNC_SEARCH_STRATEGY> => {
const syncStrategyProvider = context.getSearchStrategy(SYNC_SEARCH_STRATEGY);
const { search } = syncStrategyProvider(context);
return {
search: (
request: IAsyncSearchRequest,
{ pollInterval = 1000, ...options }: IAsyncSearchOptions = {}
): Observable<IKibanaSearchResponse> => {
const { serverStrategy } = request;
let id: string | undefined = request.id;
const aborted$ = options.signal
? fromEvent(options.signal, 'abort').pipe(
mergeMap(() => {
// If we haven't received the response to the initial request, including the ID, then
// we don't need to send a follow-up request to delete this search. Otherwise, we
// send the follow-up request to delete this search, then throw an abort error.
if (id !== undefined) {
context.core.http.delete(`/internal/search/${request.serverStrategy}/${id}`);
}
return throwError(new AbortError());
})
)
: NEVER;
return search(request, options).pipe(
expand(response => {
// If the response indicates it is complete, stop polling and complete the observable
if ((response.loaded ?? 0) >= (response.total ?? 0)) return EMPTY;
id = response.id;
// Delay by the given poll interval
return timer(pollInterval).pipe(
// Send future requests using just the ID from the response
mergeMap(() => {
return search({ id, serverStrategy }, options);
})
);
}),
takeUntil(aborted$)
);
},
};
};