Skip to content

Commit

Permalink
creates a sqs queue for retry failed scale-up requests that are retry…
Browse files Browse the repository at this point in the history
…able (such as lack of capacity) (#1947)
  • Loading branch information
jeanschmidt authored Jan 31, 2023
1 parent 61d67bf commit 9afcf35
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 16 deletions.
23 changes: 22 additions & 1 deletion terraform-aws-github-runner/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ resource "aws_sqs_queue" "queued_builds" {
visibility_timeout_seconds = var.runners_scale_up_sqs_visibility_timeout
fifo_queue = true
content_based_deduplication = true
max_message_size = 1024
max_message_size = 2048
message_retention_seconds = var.runners_scale_up_sqs_max_retry * var.runners_scale_up_sqs_visibility_timeout + 100
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.queued_builds_dead_letter.arn
Expand All @@ -45,6 +45,26 @@ resource "aws_sqs_queue" "queued_builds" {
tags = var.tags
}

resource "aws_sqs_queue" "queued_builds_retry_dead_letter" {
name = "${var.environment}-queued-builds-retry-dead-letter"
redrive_allow_policy = jsonencode({
redrivePermission = "allowAll",
})
tags = var.tags
}

resource "aws_sqs_queue" "queued_builds_retry" {
name = "${var.environment}-queued-builds-retry"
visibility_timeout_seconds = var.runners_scale_up_sqs_visibility_timeout
max_message_size = 2048
message_retention_seconds = var.runners_scale_up_sqs_max_retry * var.runners_scale_up_sqs_visibility_timeout + 100
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.queued_builds_retry_dead_letter.arn
maxReceiveCount = var.runners_scale_up_sqs_max_retry
})
tags = var.tags
}

module "webhook" {
source = "./modules/webhook"

Expand Down Expand Up @@ -104,6 +124,7 @@ module "runners" {
ami_filter_windows = var.ami_filter_windows

sqs_build_queue = aws_sqs_queue.queued_builds
sqs_build_queue_retry = aws_sqs_queue.queued_builds_retry
github_app = var.github_app
enable_organization_runners = var.enable_organization_runners
scale_down_schedule_expression = var.scale_down_schedule_expression
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import { scaleDown as scaleDownL, scaleUp as scaleUpL } from './lambda';

import nock from 'nock';
import { Config } from './scale-runners/config';
import { Context, SQSEvent, ScheduledEvent } from 'aws-lambda';
import { mocked } from 'ts-jest/utils';
import nock from 'nock';
import { scaleDown } from './scale-runners/scale-down';
import { scaleUp } from './scale-runners/scale-up';
import { scaleUp, RetryableScalingError } from './scale-runners/scale-up';

const mockSQS = {
sendMessage: jest.fn().mockReturnValue({ promise: jest.fn() }),
};
jest.mock('aws-sdk', () => ({
SQS: jest.fn().mockImplementation(() => mockSQS),
}));

jest.mock('./scale-runners/scale-down');
jest.mock('./scale-runners/scale-up');
Expand All @@ -17,6 +25,14 @@ beforeEach(() => {
});

describe('scaleUp', () => {
beforeEach(() => {
jest.spyOn(global.Math, 'random').mockReturnValue(1.0);
});

afterEach(() => {
jest.spyOn(global.Math, 'random').mockRestore();
});

it('succeeds', async () => {
const mockedScaleUp = mocked(scaleUp).mockResolvedValue(undefined);
const callback = jest.fn();
Expand Down Expand Up @@ -55,6 +71,44 @@ describe('scaleUp', () => {
expect(callback).toBeCalledTimes(1);
expect(callback).toBeCalledWith('Failed handling SQS event');
});

it('RetryableScalingError', async () => {
const config = {
maxRetryScaleUpRecord: 12,
retryScaleUpRecordDelayS: 20,
retryScaleUpRecordJitterPct: 0.2,
retryScaleUpRecordQueueUrl: 'asdf',
};
jest.spyOn(Config, 'Instance', 'get').mockImplementation(() => config as unknown as Config);
const records = [
{ eventSource: 'aws:sqs', body: '{"id":1}' },
{ eventSource: 'aws:sqs', body: '{"id":2}' },
{ eventSource: 'aws:sqs', body: '{"id":3}' },
{ eventSource: 'aws:sqs', body: '{"id":4,"retryCount":3}' },
{ eventSource: 'aws:sqs', body: '{"id":5,"retryCount":12}' },
];
const mockedScaleUp = mocked(scaleUp)
.mockResolvedValueOnce(undefined)
.mockRejectedValueOnce(new RetryableScalingError('whatever'))
.mockResolvedValueOnce(undefined)
.mockRejectedValueOnce(new RetryableScalingError('whatever'))
.mockRejectedValueOnce(new RetryableScalingError('whatever'));
const callback = jest.fn();
await scaleUpL({ Records: records } as unknown as SQSEvent, {} as unknown as Context, callback);
expect(mockedScaleUp).toBeCalledTimes(5);

expect(mockSQS.sendMessage).toBeCalledTimes(2);
expect(mockSQS.sendMessage).toBeCalledWith({
DelaySeconds: 24,
MessageBody: '{"id":2,"retryCount":1,"delaySeconds":24}',
QueueUrl: 'asdf',
});
expect(mockSQS.sendMessage).toBeCalledWith({
DelaySeconds: 192,
MessageBody: '{"id":4,"retryCount":4,"delaySeconds":192}',
QueueUrl: 'asdf',
});
});
});

describe('scaleDown', () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,68 @@
import { Context, SQSEvent, ScheduledEvent } from 'aws-lambda';
import { SQS } from 'aws-sdk';
import { Context, SQSEvent, SQSRecord, ScheduledEvent } from 'aws-lambda';

import { Config } from './scale-runners/config';
import { scaleDown as scaleDownR } from './scale-runners/scale-down';
import { scaleUp as scaleUpR, RetryableScalingError } from './scale-runners/scale-up';
import { scaleUp as scaleUpR, RetryableScalingError, ActionRequestMessage } from './scale-runners/scale-up';
import { getDelayWithJitterRetryCount } from './scale-runners/utils';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export async function scaleUp(event: SQSEvent, context: Context, callback: any) {
console.dir(event, { depth: 5 });
try {
for (const e of event.Records) {
await scaleUpR(e.eventSource, JSON.parse(e.body));
const evtFailed: Array<SQSRecord> = [];

for (const evt of event.Records) {
try {
await scaleUpR(evt.eventSource, JSON.parse(evt.body));
} catch (e) {
if (e instanceof RetryableScalingError) {
console.error(`Retryable error thrown: "${e.message}"`);
evtFailed.push(evt);
} else {
throw e;
}
}
}

if (evtFailed.length > 0) {
console.error(`Detected ${evtFailed.length} errors when processing messages, will retry relevant messages.`);

const sqs: SQS = new SQS();

for (const evt of evtFailed) {
const body: ActionRequestMessage = JSON.parse(evt.body);
const retryCount = body?.retryCount ?? 0;

if (
retryCount < Config.Instance.maxRetryScaleUpRecord &&
(Config.Instance.retryScaleUpRecordQueueUrl?.length ?? 0) > 0
) {
body.retryCount = retryCount + 1;
body.delaySeconds = getDelayWithJitterRetryCount(
retryCount,
Math.max(Config.Instance.retryScaleUpRecordDelayS, 20),
Config.Instance.retryScaleUpRecordJitterPct,
);

const sqsPayload: SQS.SendMessageRequest = {
DelaySeconds: body.delaySeconds,
MessageBody: JSON.stringify(body),
QueueUrl: Config.Instance.retryScaleUpRecordQueueUrl as string,
};

await sqs.sendMessage(sqsPayload).promise();
console.warn(`Sent message: ${evt.body}`);
} else {
console.error(`Permanently abandoning message: ${evt.body}`);
}
}
}

return callback(null);
} catch (e) {
console.error(e);
if (e instanceof RetryableScalingError) {
console.error('Received a RetryableScalingError, will callback with failure');
return callback(e);
} else {
return callback('Failed handling SQS event');
}
return callback('Failed handling SQS event');
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ export class Config {
readonly launchTemplateVersionLinux: string | undefined;
readonly launchTemplateVersionLinuxNvidia: string | undefined;
readonly launchTemplateVersionWindows: string | undefined;
readonly maxRetryScaleUpRecord: number;
readonly minAvailableRunners: number;
readonly minimumRunningTimeInMinutes: number;
readonly mustHaveIssuesLabels: string[];
readonly retryScaleUpRecordDelayS: number;
readonly retryScaleUpRecordJitterPct: number;
readonly retryScaleUpRecordQueueUrl: string | undefined;
readonly runnerGroupName: string | undefined;
readonly runnersExtraLabels: undefined | string;
readonly scaleConfigRepo: string;
Expand All @@ -50,6 +54,7 @@ export class Config {
this.githubAppClientSecret = process.env.GITHUB_APP_CLIENT_SECRET;
this.githubAppId = process.env.GITHUB_APP_ID;
this.kmsKeyId = process.env.KMS_KEY_ID;
/* istanbul ignore next */
this.lambdaTimeout = Number(process.env.LAMBDA_TIMEOUT || '600');
this.launchTemplateNameLinux = process.env.LAUNCH_TEMPLATE_NAME_LINUX;
this.launchTemplateNameLinuxNvidia = process.env.LAUNCH_TEMPLATE_NAME_LINUX_NVIDIA;
Expand All @@ -58,6 +63,8 @@ export class Config {
this.launchTemplateVersionLinuxNvidia = process.env.LAUNCH_TEMPLATE_VERSION_LINUX_NVIDIA;
this.launchTemplateVersionWindows = process.env.LAUNCH_TEMPLATE_VERSION_WINDOWS;
/* istanbul ignore next */
this.maxRetryScaleUpRecord = Number(process.env.MAX_RETRY_SCALEUP_RECORD || '0');
/* istanbul ignore next */
const mnAvalRuns = Number(process.env.MIN_AVAILABLE_RUNNERS || '10');
/* istanbul ignore next */
this.minAvailableRunners = mnAvalRuns > 0 ? mnAvalRuns : 1;
Expand All @@ -67,6 +74,11 @@ export class Config {
this.minimumRunningTimeInMinutes = mnRunMin > 0 ? mnRunMin : 1;
/* istanbul ignore next */
this.mustHaveIssuesLabels = process.env.MUST_HAVE_ISSUES_LABELS?.split(',').filter((w) => w.length > 0) || [];
/* istanbul ignore next */
this.retryScaleUpRecordDelayS = Number(process.env.RETRY_SCALE_UP_RECORD_DELAY_S || '0');
/* istanbul ignore next */
this.retryScaleUpRecordJitterPct = Number(process.env.RETRY_SCALE_UP_RECORD_JITTER_PCT || '0');
this.retryScaleUpRecordQueueUrl = process.env.RETRY_SCALE_UP_RECORD_QUEUE_URL;
this.runnerGroupName = process.env.RUNNER_GROUP_NAME;
this.runnersExtraLabels = process.env.RUNNER_EXTRA_LABELS;
/* istanbul ignore next */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export interface ActionRequestMessage {
repositoryOwner: string;
installationId?: number;
runnerLabels?: string[];
retryCount?: number;
delaySeconds?: number;
}

export class RetryableScalingError extends Error {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { getBoolean, getRepoKey, expBackOff, getRepo, groupBy } from './utils';
import {
getBoolean,
getRepoKey,
expBackOff,
getRepo,
groupBy,
getDelayWithJitter,
getDelayWithJitterRetryCount,
} from './utils';
import nock from 'nock';

beforeEach(() => {
Expand Down Expand Up @@ -130,4 +138,109 @@ describe('./utils', () => {
).rejects.toThrow(msg);
});
});

describe('getDelayWithJitter', () => {
it('have jitter == 0', () => {
expect(getDelayWithJitter(20, 0.0)).toEqual(20);
expect(getDelayWithJitter(0, 0.0)).toEqual(0);
expect(getDelayWithJitter(100, 0.0)).toEqual(100);
expect(getDelayWithJitter(100, -0.5)).toEqual(100);
expect(getDelayWithJitter(-100, 0.0)).toEqual(0);
});

it('jitter is in between bounds', () => {
const checks = 10000;
for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitter(20, 0.1);
expect(r).toBeLessThanOrEqual(22);
expect(r).toBeGreaterThanOrEqual(18);
}

for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitter(100, 0.5);
expect(r).toBeLessThanOrEqual(150);
expect(r).toBeGreaterThanOrEqual(50);
}

for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitter(1000, 1.0);
expect(r).toBeLessThanOrEqual(2000);
expect(r).toBeGreaterThanOrEqual(0);
}

for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitter(1000, 2.0);
expect(r).toBeLessThanOrEqual(3000);
expect(r).toBeGreaterThanOrEqual(0);
}
});
});

describe('getDelayWithJitter', () => {
it('have jitter == 0', () => {
expect(getDelayWithJitterRetryCount(0, 20, 0.0)).toEqual(20);
expect(getDelayWithJitterRetryCount(1, 20, 0.0)).toEqual(40);
expect(getDelayWithJitterRetryCount(2, 20, 0.0)).toEqual(80);
expect(getDelayWithJitterRetryCount(-1, 20, 0.0)).toEqual(20);

expect(getDelayWithJitterRetryCount(0, 0, 0.0)).toEqual(0);
expect(getDelayWithJitterRetryCount(1, 0, 0.0)).toEqual(0);
expect(getDelayWithJitterRetryCount(2, 0, 0.0)).toEqual(0);
expect(getDelayWithJitterRetryCount(-1, 0, 0.0)).toEqual(0);

expect(getDelayWithJitterRetryCount(0, 100, 0.0)).toEqual(100);
expect(getDelayWithJitterRetryCount(1, 100, 0.0)).toEqual(200);
expect(getDelayWithJitterRetryCount(2, 100, 0.0)).toEqual(400);
expect(getDelayWithJitterRetryCount(-1, 100, 0.0)).toEqual(100);

expect(getDelayWithJitterRetryCount(0, 100, -0.5)).toEqual(100);
expect(getDelayWithJitterRetryCount(1, 100, -0.5)).toEqual(200);
expect(getDelayWithJitterRetryCount(2, 100, -0.5)).toEqual(400);
expect(getDelayWithJitterRetryCount(-1, 100, -0.5)).toEqual(100);

expect(getDelayWithJitterRetryCount(0, -100, 0.0)).toEqual(0);
expect(getDelayWithJitterRetryCount(1, -100, 0.0)).toEqual(0);
expect(getDelayWithJitterRetryCount(2, -100, 0.0)).toEqual(0);
expect(getDelayWithJitterRetryCount(-1, -100, 0.0)).toEqual(0);
});

it('jitter is in between bounds', () => {
const checks = 10000;
for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitterRetryCount(0, 20, 0.1);
expect(r).toBeLessThanOrEqual(22);
expect(r).toBeGreaterThanOrEqual(20);
}

for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitterRetryCount(1, 20, 0.1);
expect(r).toBeLessThanOrEqual(44);
expect(r).toBeGreaterThanOrEqual(40);
}

for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitterRetryCount(2, 20, 0.1);
expect(r).toBeLessThanOrEqual(88);
expect(r).toBeGreaterThanOrEqual(80);
}

for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitterRetryCount(0, 100, 0.5);
expect(r).toBeLessThanOrEqual(150);
expect(r).toBeGreaterThanOrEqual(100);
}

for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitterRetryCount(1, 1000, 1.0);
expect(r).toBeLessThanOrEqual(4000);
expect(r).toBeGreaterThanOrEqual(1000);
}

for (let i = 0; i < checks; i += 1) {
const r = getDelayWithJitterRetryCount(0, 1000, 2.0);
expect(r).toBeLessThanOrEqual(3000);
expect(r).toBeGreaterThanOrEqual(1000);
}
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,11 @@ export function groupBy<T, V>(lst: T[], keyGetter: (itm: T) => V): Map<V, Array<
}
return map;
}

export function getDelayWithJitter(delayBase: number, jitter: number) {
return Math.max(0, delayBase) * (1 + Math.random() * Math.max(0, jitter));
}

export function getDelayWithJitterRetryCount(retryCount: number, delayBase: number, jitter: number) {
return getDelayWithJitter(Math.max(0, delayBase) * Math.pow(2, Math.max(0, retryCount)), jitter);
}
Loading

0 comments on commit 9afcf35

Please sign in to comment.