diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts new file mode 100644 index 0000000000000..70207dcf325c4 --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import * as Rx from 'rxjs'; +import { share } from 'rxjs/operators'; +import { createSubscriberConcurrencyLimiter } from './rxjs_utils'; + +function createSpyObserver(o: Rx.Observable): [Rx.Subscription, jest.Mock] { + const spy = jest.fn(); + const observer = o.subscribe(spy); + return [observer, spy]; +} + +describe('createSubscriberConcurrencyLimiter', () => { + it('should not publish to more than n concurrent subscriber', async () => { + const subject = new Rx.Subject(); + const sharedObservable = subject.pipe(share()); + + const limiter = createSubscriberConcurrencyLimiter(2); + + const [observer1, spy1] = createSpyObserver(sharedObservable.pipe(limiter())); + const [observer2, spy2] = createSpyObserver(sharedObservable.pipe(limiter())); + const [observer3, spy3] = createSpyObserver(sharedObservable.pipe(limiter())); + const [observer4, spy4] = createSpyObserver(sharedObservable.pipe(limiter())); + subject.next('test1'); + + expect(spy1).toBeCalled(); + expect(spy2).toBeCalled(); + expect(spy3).not.toBeCalled(); + expect(spy4).not.toBeCalled(); + + observer1.unsubscribe(); + expect(spy3).toBeCalled(); + expect(spy4).not.toBeCalled(); + + observer2.unsubscribe(); + expect(spy4).toBeCalled(); + + observer3.unsubscribe(); + observer4.unsubscribe(); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts index dabd280a55346..dc0ed35207e46 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts @@ -43,11 +43,11 @@ export const toPromiseAbortable = ( } }); -export function createSubscriberConcurrencyLimiter() { +export function createSubscriberConcurrencyLimiter(maxConcurrency: number) { let observers: Array<[Rx.Subscriber, any]> = []; let activeObservers: Array> = []; - function processNext(maxConcurrency: number) { + function processNext() { if (activeObservers.length >= maxConcurrency) { return; } @@ -62,14 +62,13 @@ export function createSubscriberConcurrencyLimiter() { observer.next(value); } - return function limit(maxConcurrency: number): Rx.MonoTypeOperatorFunction { + return function limit(): Rx.MonoTypeOperatorFunction { return (observable) => new Rx.Observable((observer) => { const subscription = observable.subscribe({ next(value) { observers = [...observers, [observer, value]]; - - processNext(maxConcurrency); + processNext(); }, error(err) { observer.error(err); @@ -83,7 +82,7 @@ export function createSubscriberConcurrencyLimiter() { activeObservers = activeObservers.filter((o) => o !== observer); observers = observers.filter((o) => o[0] !== observer); subscription.unsubscribe(); - processNext(maxConcurrency); + processNext(); }; }); }; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts index 69d61171b21fc..63f22b82611c2 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts @@ -13,9 +13,11 @@ import { AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS } from '../../../constants'; function agentCheckinStateFactory() { const agentConnected = agentCheckinStateConnectedAgentsFactory(); - const newActions = agentCheckinStateNewActionsFactory(); + let newActions: ReturnType; let interval: NodeJS.Timeout; + function start() { + newActions = agentCheckinStateNewActionsFactory(); interval = setInterval(async () => { try { await agentConnected.updateLastCheckinAt(); @@ -31,15 +33,20 @@ function agentCheckinStateFactory() { } } return { - subscribeToNewActions: ( + subscribeToNewActions: async ( soClient: SavedObjectsClientContract, agent: Agent, options?: { signal: AbortSignal } - ) => - agentConnected.wrapPromise( + ) => { + if (!newActions) { + throw new Error('Agent checkin state not initialized'); + } + + return agentConnected.wrapPromise( agent.id, newActions.subscribeToNewActions(soClient, agent, options) - ), + ); + }, start, stop, }; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index 2de8051748def..53270afe453c4 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -134,7 +134,9 @@ export function agentCheckinStateNewActionsFactory() { const agentConfigs$ = new Map>(); const newActions$ = createNewActionsSharedObservable(); // Rx operators - const conccurencyLimiter = createSubscriberConcurrencyLimiter(); + const concurrencyLimiter = createSubscriberConcurrencyLimiter( + appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 10 + ); async function subscribeToNewActions( soClient: SavedObjectsClientContract, @@ -156,7 +158,7 @@ export function agentCheckinStateNewActionsFactory() { const stream$ = agentConfig$.pipe( timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0), filter((config) => shouldCreateAgentConfigAction(agent, config)), - conccurencyLimiter(appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 1), + concurrencyLimiter(), mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)), merge(newActions$), mergeMap(async (data) => {