Skip to content

Commit

Permalink
[Ingest Manager] Rate limit agent config update (elastic#70871)
Browse files Browse the repository at this point in the history
  • Loading branch information
nchaulet committed Jul 7, 2020
1 parent 4bba510 commit c45b8ca
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 11 deletions.
3 changes: 3 additions & 0 deletions x-pack/plugins/ingest_manager/common/constants/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ export const AGENT_POLLING_THRESHOLD_MS = 30000;
export const AGENT_POLLING_INTERVAL = 1000;
export const AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS = 30000;
export const AGENT_UPDATE_ACTIONS_INTERVAL_MS = 5000;

export const AGENT_CONFIG_ROLLUP_RATE_LIMIT_INTERVAL_MS = 5000;
export const AGENT_CONFIG_ROLLUP_RATE_LIMIT_REQUEST_PER_INTERVAL = 60;
2 changes: 2 additions & 0 deletions x-pack/plugins/ingest_manager/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export interface IngestManagerConfigType {
host?: string;
ca_sha256?: string;
};
agentConfigRollupRateLimitIntervalMs: number;
agentConfigRollupRateLimitRequestPerInterval: number;
};
}

Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/ingest_manager/server/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export {
AGENT_POLLING_THRESHOLD_MS,
AGENT_POLLING_INTERVAL,
AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS,
AGENT_CONFIG_ROLLUP_RATE_LIMIT_REQUEST_PER_INTERVAL,
AGENT_CONFIG_ROLLUP_RATE_LIMIT_INTERVAL_MS,
AGENT_UPDATE_ACTIONS_INTERVAL_MS,
INDEX_PATTERN_PLACEHOLDER_SUFFIX,
// Routes
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/ingest_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export const config = {
host: schema.maybe(schema.string()),
ca_sha256: schema.maybe(schema.string()),
}),
agentConfigRollupRateLimitIntervalMs: schema.number({ defaultValue: 5000 }),
agentConfigRollupRateLimitRequestPerInterval: schema.number({ defaultValue: 50 }),
}),
}),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Observable } from 'rxjs';

import * as Rx from 'rxjs';

export class AbortError extends Error {}

export const toPromiseAbortable = <T>(
observable: Observable<T>,
observable: Rx.Observable<T>,
signal?: AbortSignal
): Promise<T> =>
new Promise((resolve, reject) => {
Expand Down Expand Up @@ -41,3 +42,63 @@ export const toPromiseAbortable = <T>(
signal.addEventListener('abort', listener, { once: true });
}
});

export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerInterval: number) {
function createCurrentInterval() {
return {
startedAt: Rx.asyncScheduler.now(),
numRequests: 0,
};
}

let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval();
let observers: Array<[Rx.Subscriber<any>, any]> = [];
let timerSubscription: Rx.Subscription | undefined;

function createTimeout() {
if (timerSubscription) {
return;
}
timerSubscription = Rx.asyncScheduler.schedule(() => {
timerSubscription = undefined;
currentInterval = createCurrentInterval();
for (const [waitingObserver, value] of observers) {
if (currentInterval.numRequests >= ratelimitRequestPerInterval) {
createTimeout();
continue;
}
currentInterval.numRequests++;
waitingObserver.next(value);
}
}, ratelimitIntervalMs);
}

return function limit<T>(): Rx.MonoTypeOperatorFunction<T> {
return (observable) =>
new Rx.Observable<T>((observer) => {
const subscription = observable.subscribe({
next(value) {
if (currentInterval.numRequests < ratelimitRequestPerInterval) {
currentInterval.numRequests++;
observer.next(value);
return;
}

observers = [...observers, [observer, value]];
createTimeout();
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
});

return () => {
observers = observers.filter((o) => o[0] !== observer);
subscription.unsubscribe();
};
});
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import * as APIKeysService from '../../api_keys';
import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants';
import { createAgentAction, getNewActionsSince } from '../actions';
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError } from './rxjs_utils';
import { toPromiseAbortable, AbortError, createLimiter } from './rxjs_utils';

function getInternalUserSOClient() {
const fakeRequest = ({
Expand Down Expand Up @@ -95,19 +95,23 @@ async function getOrCreateAgentDefaultOutputAPIKey(
return outputAPIKey.key;
}

async function createAgentActionFromConfigIfOutdated(
soClient: SavedObjectsClientContract,
agent: Agent,
config: FullAgentConfig | null
) {
function shouldCreateAgentConfigAction(agent: Agent, config: FullAgentConfig | null): boolean {
if (!config || !config.revision) {
return;
return false;
}
const isAgentConfigOutdated = !agent.config_revision || agent.config_revision < config.revision;
if (!isAgentConfigOutdated) {
return;
return false;
}

return true;
}

async function createAgentActionFromConfig(
soClient: SavedObjectsClientContract,
agent: Agent,
config: FullAgentConfig | null
) {
// Deep clone !not supporting Date, and undefined value.
const newConfig = JSON.parse(JSON.stringify(config));

Expand All @@ -129,6 +133,11 @@ export function agentCheckinStateNewActionsFactory() {
// Shared Observables
const agentConfigs$ = new Map<string, Observable<FullAgentConfig | null>>();
const newActions$ = createNewActionsSharedObservable();
// Rx operators
const rateLimiter = createLimiter(
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitIntervalMs || 5000,
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitRequestPerInterval || 50
);

async function subscribeToNewActions(
soClient: SavedObjectsClientContract,
Expand All @@ -148,7 +157,9 @@ export function agentCheckinStateNewActionsFactory() {
}
const stream$ = agentConfig$.pipe(
timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0),
mergeMap((config) => createAgentActionFromConfigIfOutdated(soClient, agent, config)),
filter((config) => shouldCreateAgentConfigAction(agent, config)),
rateLimiter(),
mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)),
merge(newActions$),
mergeMap(async (data) => {
if (!data) {
Expand Down

0 comments on commit c45b8ca

Please sign in to comment.