Skip to content

Commit

Permalink
[Reporting] Fix scroll timeout logging bug (#49111)
Browse files Browse the repository at this point in the history
* [Reporting] Fix scroll timeout logging bug

* test cancellation token

* test time out
  • Loading branch information
tsullivan authored Oct 25, 2019
1 parent 243a981 commit e0cf748
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 75 deletions.
12 changes: 8 additions & 4 deletions x-pack/legacy/plugins/reporting/common/cancellation_token.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import { isFunction } from 'lodash';

export class CancellationToken {
private isCancelled: boolean;
private _isCancelled: boolean;
private _callbacks: Function[];

constructor() {
this.isCancelled = false;
this._isCancelled = false;
this._callbacks = [];
}

Expand All @@ -20,7 +20,7 @@ export class CancellationToken {
throw new Error('Expected callback to be a function');
}

if (this.isCancelled) {
if (this._isCancelled) {
callback();
return;
}
Expand All @@ -29,7 +29,11 @@ export class CancellationToken {
};

public cancel = () => {
this.isCancelled = true;
this._isCancelled = true;
this._callbacks.forEach(callback => callback());
};

public isCancelled() {
return this._isCancelled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import expect from '@kbn/expect';
import sinon from 'sinon';
import { CancellationToken } from '../../../../../common/cancellation_token';
import { Logger, ScrollConfig } from '../../../../../types';
import { createHitIterator } from '../hit_iterator';

const mockLogger = {
error: new Function(),
debug: new Function(),
warning: new Function(),
} as Logger;
const debugLogStub = sinon.stub(mockLogger, 'debug');
const warnLogStub = sinon.stub(mockLogger, 'warning');
const errorLogStub = sinon.stub(mockLogger, 'error');
const mockCallEndpoint = sinon.stub();
const mockSearchRequest = {};
const mockConfig: ScrollConfig = { duration: '2s', size: 123 };
let realCancellationToken = new CancellationToken();
let isCancelledStub: sinon.SinonStub;

describe('hitIterator', function() {
beforeEach(() => {
debugLogStub.resetHistory();
warnLogStub.resetHistory();
errorLogStub.resetHistory();
mockCallEndpoint.resetHistory();
mockCallEndpoint.resetBehavior();
mockCallEndpoint.resolves({ _scroll_id: '123blah', hits: { hits: ['you found me'] } });
mockCallEndpoint.onCall(11).resolves({ _scroll_id: '123blah', hits: {} });

isCancelledStub = sinon.stub(realCancellationToken, 'isCancelled');
isCancelledStub.returns(false);
});

afterEach(() => {
realCancellationToken = new CancellationToken();
});

it('iterates hits', async () => {
// Begin
const hitIterator = createHitIterator(mockLogger);
const iterator = hitIterator(
mockConfig,
mockCallEndpoint,
mockSearchRequest,
realCancellationToken
);

while (true) {
const { done: iterationDone, value: hit } = await iterator.next();
if (iterationDone) {
break;
}
expect(hit).to.be('you found me');
}

expect(mockCallEndpoint.callCount).to.be(13);
expect(debugLogStub.callCount).to.be(13);
expect(warnLogStub.callCount).to.be(0);
expect(errorLogStub.callCount).to.be(0);
});

it('stops searches after cancellation', async () => {
// Setup
isCancelledStub.onFirstCall().returns(false);
isCancelledStub.returns(true);

// Begin
const hitIterator = createHitIterator(mockLogger);
const iterator = hitIterator(
mockConfig,
mockCallEndpoint,
mockSearchRequest,
realCancellationToken
);

while (true) {
const { done: iterationDone, value: hit } = await iterator.next();
if (iterationDone) {
break;
}
expect(hit).to.be('you found me');
}

expect(mockCallEndpoint.callCount).to.be(3);
expect(debugLogStub.callCount).to.be(3);
expect(warnLogStub.callCount).to.be(1);
expect(errorLogStub.callCount).to.be(0);

expect(warnLogStub.firstCall.lastArg).to.be(
'Any remaining scrolling searches have been cancelled by the cancellation token.'
);
});

it('handles time out', async () => {
// Setup
mockCallEndpoint.onCall(2).resolves({ status: 404 });

// Begin
const hitIterator = createHitIterator(mockLogger);
const iterator = hitIterator(
mockConfig,
mockCallEndpoint,
mockSearchRequest,
realCancellationToken
);

let errorThrown = false;
try {
while (true) {
const { done: iterationDone, value: hit } = await iterator.next();
if (iterationDone) {
break;
}
expect(hit).to.be('you found me');
}
} catch (err) {
expect(err).to.eql(
new Error('Expected _scroll_id in the following Elasticsearch response: {"status":404}')
);
errorThrown = true;
}

expect(mockCallEndpoint.callCount).to.be(4);
expect(debugLogStub.callCount).to.be(4);
expect(warnLogStub.callCount).to.be(0);
expect(errorLogStub.callCount).to.be(1);
expect(errorThrown).to.be(true);
});
});

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SearchParams, SearchResponse } from 'elasticsearch';

import { i18n } from '@kbn/i18n';
import { CancellationToken, ScrollConfig, Logger } from '../../../../types';

async function parseResponse(request: SearchResponse<any>) {
const response = await request;
if (!response || !response._scroll_id) {
throw new Error(
i18n.translate('xpack.reporting.exportTypes.csv.hitIterator.expectedScrollIdErrorMessage', {
defaultMessage: 'Expected {scrollId} in the following Elasticsearch response: {response}',
values: { response: JSON.stringify(response), scrollId: '_scroll_id' },
})
);
}

if (!response.hits) {
throw new Error(
i18n.translate('xpack.reporting.exportTypes.csv.hitIterator.expectedHitsErrorMessage', {
defaultMessage: 'Expected {hits} in the following Elasticsearch response: {response}',
values: { response: JSON.stringify(response), hits: 'hits' },
})
);
}

return {
scrollId: response._scroll_id,
hits: response.hits.hits,
};
}

export function createHitIterator(logger: Logger) {
return async function* hitIterator(
scrollSettings: ScrollConfig,
callEndpoint: Function,
searchRequest: SearchParams,
cancellationToken: CancellationToken
) {
logger.debug('executing search request');
function search(index: string | boolean | string[] | undefined, body: object) {
return parseResponse(
callEndpoint('search', {
index,
body,
scroll: scrollSettings.duration,
size: scrollSettings.size,
})
);
}

function scroll(scrollId: string | undefined) {
logger.debug('executing scroll request');
return parseResponse(
callEndpoint('scroll', {
scrollId,
scroll: scrollSettings.duration,
})
);
}

function clearScroll(scrollId: string | undefined) {
logger.debug('executing clearScroll request');
return callEndpoint('clearScroll', {
scrollId: [scrollId],
});
}

try {
let { scrollId, hits } = await search(searchRequest.index, searchRequest.body);
try {
while (hits && hits.length && !cancellationToken.isCancelled()) {
for (const hit of hits) {
yield hit;
}

({ scrollId, hits } = await scroll(scrollId));

if (cancellationToken.isCancelled()) {
logger.warning(
'Any remaining scrolling searches have been cancelled by the cancellation token.'
);
}
}
} finally {
await clearScroll(scrollId);
}
} catch (err) {
logger.error(err);
throw err;
}
};
}
7 changes: 7 additions & 0 deletions x-pack/legacy/plugins/reporting/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ export interface QueueConfig {
timeout: number;
}

export interface ScrollConfig {
duration: string;
size: number;
}

export interface ElementPosition {
boundingClientRect: {
// modern browsers support x/y, but older ones don't
Expand Down Expand Up @@ -248,5 +253,7 @@ export interface ExportTypesRegistry {
register: (exportTypeDefinition: ExportTypeDefinition) => void;
}

export { CancellationToken } from './common/cancellation_token';

// Prefer to import this type using: `import { LevelLogger } from 'relative/path/server/lib';`
export { LevelLogger as Logger } from './server/lib/level_logger';

0 comments on commit e0cf748

Please sign in to comment.