Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Fix config rollout move to limit concurrent config change instead of config per second #72931

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions x-pack/plugins/ingest_manager/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ export interface IngestManagerConfigType {
host?: string;
ca_sha256?: string;
};
agentConfigRollupRateLimitIntervalMs: number;
agentConfigRollupRateLimitRequestPerInterval: number;
agentConfigRolloutConcurrency: number;
};
}

Expand Down
3 changes: 1 addition & 2 deletions x-pack/plugins/ingest_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ export const config = {
host: schema.maybe(schema.string()),
ca_sha256: schema.maybe(schema.string()),
}),
agentConfigRollupRateLimitIntervalMs: schema.number({ defaultValue: 5000 }),
agentConfigRollupRateLimitRequestPerInterval: schema.number({ defaultValue: 50 }),
agentConfigRolloutConcurrency: schema.number({ defaultValue: 10 }),
}),
}),
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import * as Rx from 'rxjs';
import { share } from 'rxjs/operators';
import { createSubscriberConcurrencyLimiter } from './rxjs_utils';

function createSpyObserver(o: Rx.Observable<any>): [Rx.Subscription, jest.Mock] {
const spy = jest.fn();
const observer = o.subscribe(spy);
return [observer, spy];
}

describe('createSubscriberConcurrencyLimiter', () => {
it('should not publish to more than n concurrent subscriber', async () => {
const subject = new Rx.Subject<any>();
const sharedObservable = subject.pipe(share());

const limiter = createSubscriberConcurrencyLimiter(2);

const [observer1, spy1] = createSpyObserver(sharedObservable.pipe(limiter()));
const [observer2, spy2] = createSpyObserver(sharedObservable.pipe(limiter()));
const [observer3, spy3] = createSpyObserver(sharedObservable.pipe(limiter()));
const [observer4, spy4] = createSpyObserver(sharedObservable.pipe(limiter()));
subject.next('test1');

expect(spy1).toBeCalled();
expect(spy2).toBeCalled();
expect(spy3).not.toBeCalled();
expect(spy4).not.toBeCalled();

observer1.unsubscribe();
expect(spy3).toBeCalled();
expect(spy4).not.toBeCalled();

observer2.unsubscribe();
expect(spy4).toBeCalled();

observer3.unsubscribe();
observer4.unsubscribe();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -43,49 +43,32 @@ export const toPromiseAbortable = <T>(
}
});

export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerInterval: number) {
function createCurrentInterval() {
return {
startedAt: Rx.asyncScheduler.now(),
numRequests: 0,
};
}

let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval();
export function createSubscriberConcurrencyLimiter(maxConcurrency: number) {
let observers: Array<[Rx.Subscriber<any>, any]> = [];
let timerSubscription: Rx.Subscription | undefined;
let activeObservers: Array<Rx.Subscriber<any>> = [];

function createTimeout() {
if (timerSubscription) {
function processNext() {
if (activeObservers.length >= maxConcurrency) {
return;
}
timerSubscription = Rx.asyncScheduler.schedule(() => {
timerSubscription = undefined;
currentInterval = createCurrentInterval();
for (const [waitingObserver, value] of observers) {
if (currentInterval.numRequests >= ratelimitRequestPerInterval) {
createTimeout();
continue;
}
currentInterval.numRequests++;
waitingObserver.next(value);
}
}, ratelimitIntervalMs);
const observerValuePair = observers.shift();

if (!observerValuePair) {
return;
}

const [observer, value] = observerValuePair;
activeObservers.push(observer);
observer.next(value);
}

return function limit<T>(): Rx.MonoTypeOperatorFunction<T> {
return (observable) =>
new Rx.Observable<T>((observer) => {
const subscription = observable.subscribe({
next(value) {
if (currentInterval.numRequests < ratelimitRequestPerInterval) {
currentInterval.numRequests++;
observer.next(value);
return;
}

observers = [...observers, [observer, value]];
createTimeout();
processNext();
},
error(err) {
observer.error(err);
Expand All @@ -96,8 +79,10 @@ export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerIn
});

return () => {
activeObservers = activeObservers.filter((o) => o !== observer);
observers = observers.filter((o) => o[0] !== observer);
subscription.unsubscribe();
processNext();
};
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import { AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS } from '../../../constants';

function agentCheckinStateFactory() {
const agentConnected = agentCheckinStateConnectedAgentsFactory();
const newActions = agentCheckinStateNewActionsFactory();
let newActions: ReturnType<typeof agentCheckinStateNewActionsFactory>;
let interval: NodeJS.Timeout;

function start() {
newActions = agentCheckinStateNewActionsFactory();
interval = setInterval(async () => {
try {
await agentConnected.updateLastCheckinAt();
Expand All @@ -31,15 +33,20 @@ function agentCheckinStateFactory() {
}
}
return {
subscribeToNewActions: (
subscribeToNewActions: async (
soClient: SavedObjectsClientContract,
agent: Agent,
options?: { signal: AbortSignal }
) =>
agentConnected.wrapPromise(
) => {
if (!newActions) {
throw new Error('Agent checkin state not initialized');
}

return agentConnected.wrapPromise(
agent.id,
newActions.subscribeToNewActions(soClient, agent, options)
),
);
},
start,
stop,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import * as APIKeysService from '../../api_keys';
import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants';
import { createAgentAction, getNewActionsSince } from '../actions';
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError, createLimiter } from './rxjs_utils';
import { toPromiseAbortable, AbortError, createSubscriberConcurrencyLimiter } from './rxjs_utils';

function getInternalUserSOClient() {
const fakeRequest = ({
Expand Down Expand Up @@ -134,9 +134,8 @@ export function agentCheckinStateNewActionsFactory() {
const agentConfigs$ = new Map<string, Observable<FullAgentConfig | null>>();
const newActions$ = createNewActionsSharedObservable();
// Rx operators
const rateLimiter = createLimiter(
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitIntervalMs || 5000,
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitRequestPerInterval || 50
const concurrencyLimiter = createSubscriberConcurrencyLimiter(
appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 10
);

async function subscribeToNewActions(
Expand All @@ -155,10 +154,11 @@ export function agentCheckinStateNewActionsFactory() {
if (!agentConfig$) {
throw new Error(`Invalid state no observable for config ${configId}`);
}

const stream$ = agentConfig$.pipe(
timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0),
filter((config) => shouldCreateAgentConfigAction(agent, config)),
rateLimiter(),
concurrencyLimiter(),
mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)),
merge(newActions$),
mergeMap(async (data) => {
Expand Down