diff --git a/x-pack/plugins/ingest_manager/common/types/index.ts b/x-pack/plugins/ingest_manager/common/types/index.ts index d7edc04a35799..7acef263f973a 100644 --- a/x-pack/plugins/ingest_manager/common/types/index.ts +++ b/x-pack/plugins/ingest_manager/common/types/index.ts @@ -22,8 +22,7 @@ export interface IngestManagerConfigType { host?: string; ca_sha256?: string; }; - agentConfigRollupRateLimitIntervalMs: number; - agentConfigRollupRateLimitRequestPerInterval: number; + agentConfigRolloutConcurrency: number; }; } diff --git a/x-pack/plugins/ingest_manager/server/index.ts b/x-pack/plugins/ingest_manager/server/index.ts index 6c72218abc531..40e0153a26581 100644 --- a/x-pack/plugins/ingest_manager/server/index.ts +++ b/x-pack/plugins/ingest_manager/server/index.ts @@ -35,8 +35,7 @@ export const config = { host: schema.maybe(schema.string()), ca_sha256: schema.maybe(schema.string()), }), - agentConfigRollupRateLimitIntervalMs: schema.number({ defaultValue: 5000 }), - agentConfigRollupRateLimitRequestPerInterval: schema.number({ defaultValue: 50 }), + agentConfigRolloutConcurrency: schema.number({ defaultValue: 10 }), }), }), }; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts new file mode 100644 index 0000000000000..70207dcf325c4 --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import * as Rx from 'rxjs'; +import { share } from 'rxjs/operators'; +import { createSubscriberConcurrencyLimiter } from './rxjs_utils'; + +function createSpyObserver(o: Rx.Observable): [Rx.Subscription, jest.Mock] { + const spy = jest.fn(); + const observer = o.subscribe(spy); + return [observer, spy]; +} + +describe('createSubscriberConcurrencyLimiter', () => { + it('should not publish to more than n concurrent subscriber', async () => { + const subject = new Rx.Subject(); + const sharedObservable = subject.pipe(share()); + + const limiter = createSubscriberConcurrencyLimiter(2); + + const [observer1, spy1] = createSpyObserver(sharedObservable.pipe(limiter())); + const [observer2, spy2] = createSpyObserver(sharedObservable.pipe(limiter())); + const [observer3, spy3] = createSpyObserver(sharedObservable.pipe(limiter())); + const [observer4, spy4] = createSpyObserver(sharedObservable.pipe(limiter())); + subject.next('test1'); + + expect(spy1).toBeCalled(); + expect(spy2).toBeCalled(); + expect(spy3).not.toBeCalled(); + expect(spy4).not.toBeCalled(); + + observer1.unsubscribe(); + expect(spy3).toBeCalled(); + expect(spy4).not.toBeCalled(); + + observer2.unsubscribe(); + expect(spy4).toBeCalled(); + + observer3.unsubscribe(); + observer4.unsubscribe(); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts index a806169019a1e..dc0ed35207e46 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts @@ -43,34 +43,23 @@ export const toPromiseAbortable = ( } }); -export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerInterval: number) { - function createCurrentInterval() { - return { - startedAt: Rx.asyncScheduler.now(), - numRequests: 0, - }; - } - - let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval(); +export function createSubscriberConcurrencyLimiter(maxConcurrency: number) { let observers: Array<[Rx.Subscriber, any]> = []; - let timerSubscription: Rx.Subscription | undefined; + let activeObservers: Array> = []; - function createTimeout() { - if (timerSubscription) { + function processNext() { + if (activeObservers.length >= maxConcurrency) { return; } - timerSubscription = Rx.asyncScheduler.schedule(() => { - timerSubscription = undefined; - currentInterval = createCurrentInterval(); - for (const [waitingObserver, value] of observers) { - if (currentInterval.numRequests >= ratelimitRequestPerInterval) { - createTimeout(); - continue; - } - currentInterval.numRequests++; - waitingObserver.next(value); - } - }, ratelimitIntervalMs); + const observerValuePair = observers.shift(); + + if (!observerValuePair) { + return; + } + + const [observer, value] = observerValuePair; + activeObservers.push(observer); + observer.next(value); } return function limit(): Rx.MonoTypeOperatorFunction { @@ -78,14 +67,8 @@ export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerIn new Rx.Observable((observer) => { const subscription = observable.subscribe({ next(value) { - if (currentInterval.numRequests < ratelimitRequestPerInterval) { - currentInterval.numRequests++; - observer.next(value); - return; - } - observers = [...observers, [observer, value]]; - createTimeout(); + processNext(); }, error(err) { observer.error(err); @@ -96,8 +79,10 @@ export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerIn }); return () => { + activeObservers = activeObservers.filter((o) => o !== observer); observers = observers.filter((o) => o[0] !== observer); subscription.unsubscribe(); + processNext(); }; }); }; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts index 69d61171b21fc..63f22b82611c2 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts @@ -13,9 +13,11 @@ import { AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS } from '../../../constants'; function agentCheckinStateFactory() { const agentConnected = agentCheckinStateConnectedAgentsFactory(); - const newActions = agentCheckinStateNewActionsFactory(); + let newActions: ReturnType; let interval: NodeJS.Timeout; + function start() { + newActions = agentCheckinStateNewActionsFactory(); interval = setInterval(async () => { try { await agentConnected.updateLastCheckinAt(); @@ -31,15 +33,20 @@ function agentCheckinStateFactory() { } } return { - subscribeToNewActions: ( + subscribeToNewActions: async ( soClient: SavedObjectsClientContract, agent: Agent, options?: { signal: AbortSignal } - ) => - agentConnected.wrapPromise( + ) => { + if (!newActions) { + throw new Error('Agent checkin state not initialized'); + } + + return agentConnected.wrapPromise( agent.id, newActions.subscribeToNewActions(soClient, agent, options) - ), + ); + }, start, stop, }; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index 5ceb774a1946c..53270afe453c4 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -28,7 +28,7 @@ import * as APIKeysService from '../../api_keys'; import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants'; import { createAgentAction, getNewActionsSince } from '../actions'; import { appContextService } from '../../app_context'; -import { toPromiseAbortable, AbortError, createLimiter } from './rxjs_utils'; +import { toPromiseAbortable, AbortError, createSubscriberConcurrencyLimiter } from './rxjs_utils'; function getInternalUserSOClient() { const fakeRequest = ({ @@ -134,9 +134,8 @@ export function agentCheckinStateNewActionsFactory() { const agentConfigs$ = new Map>(); const newActions$ = createNewActionsSharedObservable(); // Rx operators - const rateLimiter = createLimiter( - appContextService.getConfig()?.fleet.agentConfigRollupRateLimitIntervalMs || 5000, - appContextService.getConfig()?.fleet.agentConfigRollupRateLimitRequestPerInterval || 50 + const concurrencyLimiter = createSubscriberConcurrencyLimiter( + appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 10 ); async function subscribeToNewActions( @@ -155,10 +154,11 @@ export function agentCheckinStateNewActionsFactory() { if (!agentConfig$) { throw new Error(`Invalid state no observable for config ${configId}`); } + const stream$ = agentConfig$.pipe( timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0), filter((config) => shouldCreateAgentConfigAction(agent, config)), - rateLimiter(), + concurrencyLimiter(), mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)), merge(newActions$), mergeMap(async (data) => {