Skip to content

Commit

Permalink
Move our workaround to a helper function
Browse files Browse the repository at this point in the history
This was repeated in five places, time to consolidate.
  • Loading branch information
rylnd committed Sep 30, 2020
1 parent 9a61749 commit 10b6979
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/plugins/data/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ export {
getTotalLoaded,
shimHitsTotal,
usageProvider,
shimAbortSignal,
SearchUsage,
} from './search';

Expand Down
16 changes: 11 additions & 5 deletions src/plugins/data/server/search/es_search/es_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ import { Observable } from 'rxjs';
import { ApiResponse } from '@elastic/elasticsearch';
import { SearchUsage } from '../collectors/usage';
import { toSnakeCase } from './to_snake_case';
import { ISearchStrategy, getDefaultSearchParams, getTotalLoaded, getShardTimeout } from '..';
import {
ISearchStrategy,
getDefaultSearchParams,
getTotalLoaded,
getShardTimeout,
shimAbortSignal,
} from '..';

export const esSearchStrategyProvider = (
config$: Observable<SharedGlobalConfig>,
Expand Down Expand Up @@ -52,10 +58,10 @@ export const esSearchStrategyProvider = (
});

try {
// Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297
const promise = context.core.elasticsearch.client.asCurrentUser.search(params);
if (options?.abortSignal)
options.abortSignal.addEventListener('abort', () => promise.abort());
const promise = shimAbortSignal(
context.core.elasticsearch.client.asCurrentUser.search(params),
options?.abortSignal
);
const { body: rawResponse } = (await promise) as ApiResponse<SearchResponse<any>>;

if (usage) usage.trackSuccess(rawResponse.took);
Expand Down
1 change: 1 addition & 0 deletions src/plugins/data/server/search/es_search/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ export { esSearchStrategyProvider } from './es_search_strategy';
export * from './get_default_search_params';
export { getTotalLoaded } from './get_total_loaded';
export * from './to_snake_case';
export { shimAbortSignal } from './shim_abort_signal';

export { ES_SEARCH_STRATEGY, IEsSearchRequest, IEsSearchResponse } from '../../../common';
55 changes: 55 additions & 0 deletions src/plugins/data/server/search/es_search/shim_abort_signal.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { elasticsearchServiceMock } from '../../../../../core/server/mocks';
import { shimAbortSignal } from '.';

describe('shimAbortSignal', () => {
it('aborts the promise if the signal is aborted', () => {
const promise = elasticsearchServiceMock.createSuccessTransportRequestPromise({
success: true,
});
const controller = new AbortController();
shimAbortSignal(promise, controller.signal);
controller.abort();

expect(promise.abort).toHaveBeenCalled();
});

it('returns the original promise', async () => {
const promise = elasticsearchServiceMock.createSuccessTransportRequestPromise({
success: true,
});
const controller = new AbortController();
const response = await shimAbortSignal(promise, controller.signal);

expect(response).toEqual(expect.objectContaining({ body: { success: true } }));
});

it('allows the promise to be aborted manually', () => {
const promise = elasticsearchServiceMock.createSuccessTransportRequestPromise({
success: true,
});
const controller = new AbortController();
const enhancedPromise = shimAbortSignal(promise, controller.signal);

enhancedPromise.abort();
expect(promise.abort).toHaveBeenCalled();
});
});
40 changes: 40 additions & 0 deletions src/plugins/data/server/search/es_search/shim_abort_signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';

/**
*
* NOTE: Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297
* is resolved
*
* @param promise a TransportRequestPromise
* @param signal optional AbortSignal
*
* @returns a TransportRequestPromise that will be aborted if the signal is aborted
*/
export const shimAbortSignal = <T extends TransportRequestPromise<unknown>>(
promise: T,
signal: AbortSignal | undefined
): T => {
if (signal) {
signal.addEventListener('abort', () => promise.abort());
}
return promise;
};
23 changes: 11 additions & 12 deletions src/plugins/data/server/search/routes/call_msearch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { IUiSettingsClient, IScopedClusterClient, SharedGlobalConfig } from 'src

import { MsearchRequestBody, MsearchResponse } from '../../../common/search/search_source';
import { shimHitsTotal } from './shim_hits_total';
import { getShardTimeout, getDefaultSearchParams, toSnakeCase } from '..';
import { getShardTimeout, getDefaultSearchParams, toSnakeCase, shimAbortSignal } from '..';

/** @internal */
export function convertRequestBody(
Expand Down Expand Up @@ -74,18 +74,17 @@ export function getCallMsearch(dependencies: CallMsearchDependencies) {

const body = convertRequestBody(params.body, timeout);

// Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297
const promise = esClient.asCurrentUser.msearch(
{
body,
},
{
querystring: toSnakeCase(defaultParams),
}
const promise = shimAbortSignal(
esClient.asCurrentUser.msearch(
{
body,
},
{
querystring: toSnakeCase(defaultParams),
}
),
params.signal
);
if (params.signal) {
params.signal.addEventListener('abort', () => promise.abort());
}
const response = (await promise) as ApiResponse<{ responses: Array<SearchResponse<any>> }>;

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
getDefaultSearchParams,
ISearchStrategy,
toSnakeCase,
shimAbortSignal,
} from '../../../../../src/plugins/data/server';
import { EqlSearchStrategyRequest, EqlSearchStrategyResponse } from '../../common/search/types';

Expand Down Expand Up @@ -55,10 +56,7 @@ export const eqlSearchStrategyProvider = (
);
}

// Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297
if (options?.abortSignal)
options.abortSignal.addEventListener('abort', () => promise.abort());
const rawResponse = await promise;
const rawResponse = await shimAbortSignal(promise, options?.abortSignal);
const { id, is_partial: isPartial, is_running: isRunning } = rawResponse.body;

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
toSnakeCase,
shimHitsTotal,
getAsyncOptions,
shimAbortSignal,
} from '../../../../../src/plugins/data/server';
import { IEnhancedEsSearchRequest } from '../../common';
import {
Expand Down Expand Up @@ -99,9 +100,7 @@ export const enhancedEsSearchStrategyProvider = (
});
}

// Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297
if (options?.abortSignal) options.abortSignal.addEventListener('abort', () => promise.abort());
const esResponse = await promise;
const esResponse = await shimAbortSignal(promise, options?.abortSignal);
const { id, response, is_partial: isPartial, is_running: isRunning } = esResponse.body;
return {
id,
Expand Down Expand Up @@ -136,9 +135,7 @@ export const enhancedEsSearchStrategyProvider = (
querystring,
});

// Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297
if (options?.abortSignal) options.abortSignal.addEventListener('abort', () => promise.abort());
const esResponse = await promise;
const esResponse = await shimAbortSignal(promise, options?.abortSignal);

const response = esResponse.body as SearchResponse<any>;
return {
Expand Down

0 comments on commit 10b6979

Please sign in to comment.