Skip to content

Commit

Permalink
Apply back pressure in Task Manager whenever Elasticsearch responds w…
Browse files Browse the repository at this point in the history
…ith a 429 (#75666)

* Make task manager maxWorkers and pollInterval observables (#75293)

* WIP step 1

* WIP step 2

* Cleanup

* Make maxWorkers an observable for the task pool

* Cleanup

* Fix test failures

* Use BehaviorSubject

* Add some tests

* Make the task manager store emit error events (#75679)

* Add errors$ observable to the task store

* Add unit tests

* Temporarily apply back pressure to maxWorkers and pollInterval when 429 errors occur (#77096)

* WIP

* Cleanup

* Add error count to message

* Reset observable values on stop

* Add comments

* Fix issues when changing configurations

* Cleanup code

* Cleanup pt2

* Some renames

* Fix typecheck

* Use observables to manage throughput

* Rename class

* Switch to createManagedConfiguration

* Add some comments

* Start unit tests

* Add logs

* Fix log level

* Attempt at adding integration tests

* Fix test failures

* Fix timer

* Revert "Fix timer"

This reverts commit 0817e5e.

* Use Symbol

* Fix merge scan

* replace startsWith with a timer that is scheduled to 0

* typo

Co-authored-by: Kibana Machine <[email protected]>
Co-authored-by: Gidi Meir Morris <[email protected]>
  • Loading branch information
3 people authored Oct 13, 2020
1 parent cd84ace commit e0bb860
Show file tree
Hide file tree
Showing 11 changed files with 882 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import sinon from 'sinon';
import { mockLogger } from '../test_utils';
import { TaskManager } from '../task_manager';
import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks';
import {
SavedObjectsSerializer,
SavedObjectTypeRegistry,
SavedObjectsErrorHelpers,
} from '../../../../../src/core/server';
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration';

describe('managed configuration', () => {
let taskManager: TaskManager;
let clock: sinon.SinonFakeTimers;
const callAsInternalUser = jest.fn();
const logger = mockLogger();
const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry());
const savedObjectsClient = savedObjectsRepositoryMock.create();
const config = {
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
};

beforeEach(() => {
jest.resetAllMocks();
callAsInternalUser.mockResolvedValue({ total: 0, updated: 0, version_conflicts: 0 });
clock = sinon.useFakeTimers();
taskManager = new TaskManager({
config,
logger,
serializer,
callAsInternalUser,
taskManagerId: 'some-uuid',
savedObjectsRepository: savedObjectsClient,
});
taskManager.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
taskManager.start();
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall
clock.tick(0);
});

afterEach(() => clock.restore());

test('should lower max workers when Elasticsearch returns 429 error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);
// Cause "too many requests" error to be thrown
await expect(
taskManager.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Max workers configuration changing from 10 to 8 after seeing 1 error(s)'
);
expect(logger.debug).toHaveBeenCalledWith('Task pool now using 10 as the max worker value');
});

test('should increase poll interval when Elasticsearch returns 429 error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);
// Cause "too many requests" error to be thrown
await expect(
taskManager.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 3600 after seeing 1 error(s)'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import sinon from 'sinon';
import { Subject } from 'rxjs';
import { mockLogger } from '../test_utils';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import {
createManagedConfiguration,
ADJUST_THROUGHPUT_INTERVAL,
} from './create_managed_configuration';

describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
const logger = mockLogger();

beforeEach(() => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();
});

afterEach(() => clock.restore());

test('returns observables with initialized values', async () => {
const maxWorkersSubscription = jest.fn();
const pollIntervalSubscription = jest.fn();
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
logger,
errors$: new Subject<Error>(),
startingMaxWorkers: 1,
startingPollInterval: 2,
});
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
expect(maxWorkersSubscription).toHaveBeenNthCalledWith(1, 1);
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2);
});

test(`skips errors that aren't about too many requests`, async () => {
const maxWorkersSubscription = jest.fn();
const pollIntervalSubscription = jest.fn();
const errors$ = new Subject<Error>();
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
errors$,
logger,
startingMaxWorkers: 100,
startingPollInterval: 100,
});
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
errors$.next(new Error('foo'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
});

describe('maxWorker configuration', () => {
function setupScenario(startingMaxWorkers: number) {
const errors$ = new Subject<Error>();
const subscription = jest.fn();
const { maxWorkersConfiguration$ } = createManagedConfiguration({
errors$,
startingMaxWorkers,
logger,
startingPollInterval: 1,
});
maxWorkersConfiguration$.subscribe(subscription);
return { subscription, errors$ };
}

beforeEach(() => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();
});

afterEach(() => clock.restore());

test('should decrease configuration at the next interval when an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 80);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
);
});

test('should increase configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
expect(subscription).toHaveBeenNthCalledWith(2, 80);
expect(subscription).toHaveBeenNthCalledWith(3, 84);
// 88.2- > 89 from Math.ceil
expect(subscription).toHaveBeenNthCalledWith(4, 89);
expect(subscription).toHaveBeenNthCalledWith(5, 94);
expect(subscription).toHaveBeenNthCalledWith(6, 99);
// 103.95 -> 100 from Math.min with starting value
expect(subscription).toHaveBeenNthCalledWith(7, 100);
// No new calls due to value not changing and usage of distinctUntilChanged()
expect(subscription).toHaveBeenCalledTimes(7);
});

test('should keep reducing configuration when errors keep emitting', async () => {
const { subscription, errors$ } = setupScenario(100);
for (let i = 0; i < 20; i++) {
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
}
expect(subscription).toHaveBeenNthCalledWith(2, 80);
expect(subscription).toHaveBeenNthCalledWith(3, 64);
// 51.2 -> 51 from Math.floor
expect(subscription).toHaveBeenNthCalledWith(4, 51);
expect(subscription).toHaveBeenNthCalledWith(5, 40);
expect(subscription).toHaveBeenNthCalledWith(6, 32);
expect(subscription).toHaveBeenNthCalledWith(7, 25);
expect(subscription).toHaveBeenNthCalledWith(8, 20);
expect(subscription).toHaveBeenNthCalledWith(9, 16);
expect(subscription).toHaveBeenNthCalledWith(10, 12);
expect(subscription).toHaveBeenNthCalledWith(11, 9);
expect(subscription).toHaveBeenNthCalledWith(12, 7);
expect(subscription).toHaveBeenNthCalledWith(13, 5);
expect(subscription).toHaveBeenNthCalledWith(14, 4);
expect(subscription).toHaveBeenNthCalledWith(15, 3);
expect(subscription).toHaveBeenNthCalledWith(16, 2);
expect(subscription).toHaveBeenNthCalledWith(17, 1);
// No new calls due to value not changing and usage of distinctUntilChanged()
expect(subscription).toHaveBeenCalledTimes(17);
});
});

describe('pollInterval configuration', () => {
function setupScenario(startingPollInterval: number) {
const errors$ = new Subject<Error>();
const subscription = jest.fn();
const { pollIntervalConfiguration$ } = createManagedConfiguration({
logger,
errors$,
startingPollInterval,
startingMaxWorkers: 1,
});
pollIntervalConfiguration$.subscribe(subscription);
return { subscription, errors$ };
}

beforeEach(() => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();
});

afterEach(() => clock.restore());

test('should increase configuration at the next interval when an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
);
});

test('should decrease configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
expect(subscription).toHaveBeenNthCalledWith(2, 120);
expect(subscription).toHaveBeenNthCalledWith(3, 114);
// 108.3 -> 108 from Math.floor
expect(subscription).toHaveBeenNthCalledWith(4, 108);
expect(subscription).toHaveBeenNthCalledWith(5, 102);
// 96.9 -> 100 from Math.max with the starting value
expect(subscription).toHaveBeenNthCalledWith(6, 100);
// No new calls due to value not changing and usage of distinctUntilChanged()
expect(subscription).toHaveBeenCalledTimes(6);
});

test('should increase configuration when errors keep emitting', async () => {
const { subscription, errors$ } = setupScenario(100);
for (let i = 0; i < 3; i++) {
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
}
expect(subscription).toHaveBeenNthCalledWith(2, 120);
expect(subscription).toHaveBeenNthCalledWith(3, 144);
// 172.8 -> 173 from Math.ceil
expect(subscription).toHaveBeenNthCalledWith(4, 173);
});
});
});
Loading

0 comments on commit e0bb860

Please sign in to comment.