Skip to content

Commit

Permalink
Fix after code review and add a unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
nchaulet committed Jul 22, 2020
1 parent 7b73177 commit 68f9a6c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import * as Rx from 'rxjs';
import { share } from 'rxjs/operators';
import { createSubscriberConcurrencyLimiter } from './rxjs_utils';

function createSpyObserver(o: Rx.Observable<any>): [Rx.Subscription, jest.Mock] {
const spy = jest.fn();
const observer = o.subscribe(spy);
return [observer, spy];
}

describe('createSubscriberConcurrencyLimiter', () => {
it('should not publish to more than n concurrent subscriber', async () => {
const subject = new Rx.Subject<any>();
const sharedObservable = subject.pipe(share());

const limiter = createSubscriberConcurrencyLimiter(2);

const [observer1, spy1] = createSpyObserver(sharedObservable.pipe(limiter()));
const [observer2, spy2] = createSpyObserver(sharedObservable.pipe(limiter()));
const [observer3, spy3] = createSpyObserver(sharedObservable.pipe(limiter()));
const [observer4, spy4] = createSpyObserver(sharedObservable.pipe(limiter()));
subject.next('test1');

expect(spy1).toBeCalled();
expect(spy2).toBeCalled();
expect(spy3).not.toBeCalled();
expect(spy4).not.toBeCalled();

observer1.unsubscribe();
expect(spy3).toBeCalled();
expect(spy4).not.toBeCalled();

observer2.unsubscribe();
expect(spy4).toBeCalled();

observer3.unsubscribe();
observer4.unsubscribe();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ export const toPromiseAbortable = <T>(
}
});

export function createSubscriberConcurrencyLimiter() {
export function createSubscriberConcurrencyLimiter(maxConcurrency: number) {
let observers: Array<[Rx.Subscriber<any>, any]> = [];
let activeObservers: Array<Rx.Subscriber<any>> = [];

function processNext(maxConcurrency: number) {
function processNext() {
if (activeObservers.length >= maxConcurrency) {
return;
}
Expand All @@ -62,14 +62,13 @@ export function createSubscriberConcurrencyLimiter() {
observer.next(value);
}

return function limit<T>(maxConcurrency: number): Rx.MonoTypeOperatorFunction<T> {
return function limit<T>(): Rx.MonoTypeOperatorFunction<T> {
return (observable) =>
new Rx.Observable<T>((observer) => {
const subscription = observable.subscribe({
next(value) {
observers = [...observers, [observer, value]];

processNext(maxConcurrency);
processNext();
},
error(err) {
observer.error(err);
Expand All @@ -83,7 +82,7 @@ export function createSubscriberConcurrencyLimiter() {
activeObservers = activeObservers.filter((o) => o !== observer);
observers = observers.filter((o) => o[0] !== observer);
subscription.unsubscribe();
processNext(maxConcurrency);
processNext();
};
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import { AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS } from '../../../constants';

function agentCheckinStateFactory() {
const agentConnected = agentCheckinStateConnectedAgentsFactory();
const newActions = agentCheckinStateNewActionsFactory();
let newActions: ReturnType<typeof agentCheckinStateNewActionsFactory>;
let interval: NodeJS.Timeout;

function start() {
newActions = agentCheckinStateNewActionsFactory();
interval = setInterval(async () => {
try {
await agentConnected.updateLastCheckinAt();
Expand All @@ -31,15 +33,20 @@ function agentCheckinStateFactory() {
}
}
return {
subscribeToNewActions: (
subscribeToNewActions: async (
soClient: SavedObjectsClientContract,
agent: Agent,
options?: { signal: AbortSignal }
) =>
agentConnected.wrapPromise(
) => {
if (!newActions) {
throw new Error('Agent checkin state not initialized');
}

return agentConnected.wrapPromise(
agent.id,
newActions.subscribeToNewActions(soClient, agent, options)
),
);
},
start,
stop,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ export function agentCheckinStateNewActionsFactory() {
const agentConfigs$ = new Map<string, Observable<FullAgentConfig | null>>();
const newActions$ = createNewActionsSharedObservable();
// Rx operators
const conccurencyLimiter = createSubscriberConcurrencyLimiter();
const concurrencyLimiter = createSubscriberConcurrencyLimiter(
appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 10
);

async function subscribeToNewActions(
soClient: SavedObjectsClientContract,
Expand All @@ -156,7 +158,7 @@ export function agentCheckinStateNewActionsFactory() {
const stream$ = agentConfig$.pipe(
timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0),
filter((config) => shouldCreateAgentConfigAction(agent, config)),
conccurencyLimiter(appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 1),
concurrencyLimiter(),
mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)),
merge(newActions$),
mergeMap(async (data) => {
Expand Down

0 comments on commit 68f9a6c

Please sign in to comment.