Skip to content

Commit

Permalink
[Ingest Manager] Revert fleet config concurrency rollout to rate limit (
Browse files Browse the repository at this point in the history
  • Loading branch information
nchaulet committed Jul 31, 2020
1 parent 1f22d16 commit 8434df9
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 67 deletions.
3 changes: 2 additions & 1 deletion x-pack/plugins/ingest_manager/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ export interface IngestManagerConfigType {
host?: string;
ca_sha256?: string;
};
agentConfigRolloutConcurrency: number;
agentConfigRolloutRateLimitIntervalMs: number;
agentConfigRolloutRateLimitRequestPerInterval: number;
};
}

Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/ingest_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ export const config = {
host: schema.maybe(schema.string()),
ca_sha256: schema.maybe(schema.string()),
}),
agentConfigRolloutConcurrency: schema.number({ defaultValue: 10 }),
agentConfigRolloutRateLimitIntervalMs: schema.number({ defaultValue: 5000 }),
agentConfigRolloutRateLimitRequestPerInterval: schema.number({ defaultValue: 5 }),
}),
}),
};
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,52 @@ export const toPromiseAbortable = <T>(
}
});

export function createSubscriberConcurrencyLimiter(maxConcurrency: number) {
let observers: Array<[Rx.Subscriber<any>, any]> = [];
let activeObservers: Array<Rx.Subscriber<any>> = [];
export function createRateLimiter(
ratelimitIntervalMs: number,
ratelimitRequestPerInterval: number
) {
function createCurrentInterval() {
return {
startedAt: Rx.asyncScheduler.now(),
numRequests: 0,
};
}

function processNext() {
if (activeObservers.length >= maxConcurrency) {
return;
}
const observerValuePair = observers.shift();
let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval();
let observers: Array<[Rx.Subscriber<any>, any]> = [];
let timerSubscription: Rx.Subscription | undefined;

if (!observerValuePair) {
function createTimeout() {
if (timerSubscription) {
return;
}

const [observer, value] = observerValuePair;
activeObservers.push(observer);
observer.next(value);
timerSubscription = Rx.asyncScheduler.schedule(() => {
timerSubscription = undefined;
currentInterval = createCurrentInterval();
for (const [waitingObserver, value] of observers) {
if (currentInterval.numRequests >= ratelimitRequestPerInterval) {
createTimeout();
continue;
}
currentInterval.numRequests++;
waitingObserver.next(value);
}
}, ratelimitIntervalMs);
}

return function limit<T>(): Rx.MonoTypeOperatorFunction<T> {
return (observable) =>
new Rx.Observable<T>((observer) => {
const subscription = observable.subscribe({
next(value) {
if (currentInterval.numRequests < ratelimitRequestPerInterval) {
currentInterval.numRequests++;
observer.next(value);
return;
}

observers = [...observers, [observer, value]];
processNext();
createTimeout();
},
error(err) {
observer.error(err);
Expand All @@ -79,10 +99,8 @@ export function createSubscriberConcurrencyLimiter(maxConcurrency: number) {
});

return () => {
activeObservers = activeObservers.filter((o) => o !== observer);
observers = observers.filter((o) => o[0] !== observer);
subscription.unsubscribe();
processNext();
};
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import * as APIKeysService from '../../api_keys';
import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants';
import { createAgentAction, getNewActionsSince } from '../actions';
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError, createSubscriberConcurrencyLimiter } from './rxjs_utils';
import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils';

function getInternalUserSOClient() {
const fakeRequest = ({
Expand Down Expand Up @@ -134,8 +134,9 @@ export function agentCheckinStateNewActionsFactory() {
const agentConfigs$ = new Map<string, Observable<FullAgentConfig | null>>();
const newActions$ = createNewActionsSharedObservable();
// Rx operators
const concurrencyLimiter = createSubscriberConcurrencyLimiter(
appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 10
const rateLimiter = createRateLimiter(
appContextService.getConfig()?.fleet.agentConfigRolloutRateLimitIntervalMs ?? 5000,
appContextService.getConfig()?.fleet.agentConfigRolloutRateLimitRequestPerInterval ?? 50
);

async function subscribeToNewActions(
Expand All @@ -158,7 +159,7 @@ export function agentCheckinStateNewActionsFactory() {
const stream$ = agentConfig$.pipe(
timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0),
filter((config) => shouldCreateAgentConfigAction(agent, config)),
concurrencyLimiter(),
rateLimiter(),
mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)),
merge(newActions$),
mergeMap(async (data) => {
Expand Down

0 comments on commit 8434df9

Please sign in to comment.