Skip to content

Commit

Permalink
[Ingest Manager] Fix config rollout move to limit concurrent config c…
Browse files Browse the repository at this point in the history
…hange instead of config per second (elastic#72931)
  • Loading branch information
nchaulet committed Jul 23, 2020
1 parent 2b69da0 commit 90d47da
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 45 deletions.
3 changes: 1 addition & 2 deletions x-pack/plugins/ingest_manager/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ export interface IngestManagerConfigType {
host?: string;
ca_sha256?: string;
};
agentConfigRollupRateLimitIntervalMs: number;
agentConfigRollupRateLimitRequestPerInterval: number;
agentConfigRolloutConcurrency: number;
};
}

Expand Down
3 changes: 1 addition & 2 deletions x-pack/plugins/ingest_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ export const config = {
host: schema.maybe(schema.string()),
ca_sha256: schema.maybe(schema.string()),
}),
agentConfigRollupRateLimitIntervalMs: schema.number({ defaultValue: 5000 }),
agentConfigRollupRateLimitRequestPerInterval: schema.number({ defaultValue: 50 }),
agentConfigRolloutConcurrency: schema.number({ defaultValue: 10 }),
}),
}),
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import * as Rx from 'rxjs';
import { share } from 'rxjs/operators';
import { createSubscriberConcurrencyLimiter } from './rxjs_utils';

function createSpyObserver(o: Rx.Observable<any>): [Rx.Subscription, jest.Mock] {
const spy = jest.fn();
const observer = o.subscribe(spy);
return [observer, spy];
}

describe('createSubscriberConcurrencyLimiter', () => {
it('should not publish to more than n concurrent subscriber', async () => {
const subject = new Rx.Subject<any>();
const sharedObservable = subject.pipe(share());

const limiter = createSubscriberConcurrencyLimiter(2);

const [observer1, spy1] = createSpyObserver(sharedObservable.pipe(limiter()));
const [observer2, spy2] = createSpyObserver(sharedObservable.pipe(limiter()));
const [observer3, spy3] = createSpyObserver(sharedObservable.pipe(limiter()));
const [observer4, spy4] = createSpyObserver(sharedObservable.pipe(limiter()));
subject.next('test1');

expect(spy1).toBeCalled();
expect(spy2).toBeCalled();
expect(spy3).not.toBeCalled();
expect(spy4).not.toBeCalled();

observer1.unsubscribe();
expect(spy3).toBeCalled();
expect(spy4).not.toBeCalled();

observer2.unsubscribe();
expect(spy4).toBeCalled();

observer3.unsubscribe();
observer4.unsubscribe();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -43,49 +43,32 @@ export const toPromiseAbortable = <T>(
}
});

export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerInterval: number) {
function createCurrentInterval() {
return {
startedAt: Rx.asyncScheduler.now(),
numRequests: 0,
};
}

let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval();
export function createSubscriberConcurrencyLimiter(maxConcurrency: number) {
let observers: Array<[Rx.Subscriber<any>, any]> = [];
let timerSubscription: Rx.Subscription | undefined;
let activeObservers: Array<Rx.Subscriber<any>> = [];

function createTimeout() {
if (timerSubscription) {
function processNext() {
if (activeObservers.length >= maxConcurrency) {
return;
}
timerSubscription = Rx.asyncScheduler.schedule(() => {
timerSubscription = undefined;
currentInterval = createCurrentInterval();
for (const [waitingObserver, value] of observers) {
if (currentInterval.numRequests >= ratelimitRequestPerInterval) {
createTimeout();
continue;
}
currentInterval.numRequests++;
waitingObserver.next(value);
}
}, ratelimitIntervalMs);
const observerValuePair = observers.shift();

if (!observerValuePair) {
return;
}

const [observer, value] = observerValuePair;
activeObservers.push(observer);
observer.next(value);
}

return function limit<T>(): Rx.MonoTypeOperatorFunction<T> {
return (observable) =>
new Rx.Observable<T>((observer) => {
const subscription = observable.subscribe({
next(value) {
if (currentInterval.numRequests < ratelimitRequestPerInterval) {
currentInterval.numRequests++;
observer.next(value);
return;
}

observers = [...observers, [observer, value]];
createTimeout();
processNext();
},
error(err) {
observer.error(err);
Expand All @@ -96,8 +79,10 @@ export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerIn
});

return () => {
activeObservers = activeObservers.filter((o) => o !== observer);
observers = observers.filter((o) => o[0] !== observer);
subscription.unsubscribe();
processNext();
};
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import { AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS } from '../../../constants';

function agentCheckinStateFactory() {
const agentConnected = agentCheckinStateConnectedAgentsFactory();
const newActions = agentCheckinStateNewActionsFactory();
let newActions: ReturnType<typeof agentCheckinStateNewActionsFactory>;
let interval: NodeJS.Timeout;

function start() {
newActions = agentCheckinStateNewActionsFactory();
interval = setInterval(async () => {
try {
await agentConnected.updateLastCheckinAt();
Expand All @@ -31,15 +33,20 @@ function agentCheckinStateFactory() {
}
}
return {
subscribeToNewActions: (
subscribeToNewActions: async (
soClient: SavedObjectsClientContract,
agent: Agent,
options?: { signal: AbortSignal }
) =>
agentConnected.wrapPromise(
) => {
if (!newActions) {
throw new Error('Agent checkin state not initialized');
}

return agentConnected.wrapPromise(
agent.id,
newActions.subscribeToNewActions(soClient, agent, options)
),
);
},
start,
stop,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import * as APIKeysService from '../../api_keys';
import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants';
import { createAgentAction, getNewActionsSince } from '../actions';
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError, createLimiter } from './rxjs_utils';
import { toPromiseAbortable, AbortError, createSubscriberConcurrencyLimiter } from './rxjs_utils';

function getInternalUserSOClient() {
const fakeRequest = ({
Expand Down Expand Up @@ -134,9 +134,8 @@ export function agentCheckinStateNewActionsFactory() {
const agentConfigs$ = new Map<string, Observable<FullAgentConfig | null>>();
const newActions$ = createNewActionsSharedObservable();
// Rx operators
const rateLimiter = createLimiter(
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitIntervalMs || 5000,
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitRequestPerInterval || 50
const concurrencyLimiter = createSubscriberConcurrencyLimiter(
appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 10
);

async function subscribeToNewActions(
Expand All @@ -155,10 +154,11 @@ export function agentCheckinStateNewActionsFactory() {
if (!agentConfig$) {
throw new Error(`Invalid state no observable for config ${configId}`);
}

const stream$ = agentConfig$.pipe(
timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0),
filter((config) => shouldCreateAgentConfigAction(agent, config)),
rateLimiter(),
concurrencyLimiter(),
mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)),
merge(newActions$),
mergeMap(async (data) => {
Expand Down

0 comments on commit 90d47da

Please sign in to comment.