Skip to content

Commit

Permalink
chore: add Lambda fn to write BacklogPerTask CW metric (#2804)
Browse files Browse the repository at this point in the history
We need to write this metric for Worker Service autoscaling.

Related #2796

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
  • Loading branch information
efekarakus authored Sep 8, 2021
1 parent 0b1d401 commit 1c6e51b
Show file tree
Hide file tree
Showing 2 changed files with 301 additions and 0 deletions.
129 changes: 129 additions & 0 deletions cf-custom-resources/lib/backlog-per-task-calculator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
"use strict";

const aws = require("aws-sdk");

// AWS Clients that are overriden in tests.
let ecs, sqs;

/**
* This lambda function calculates the backlog of SQS messages per running ECS tasks,
* and writes the metric to CloudWatch.
*/
exports.handler = async (event, context) => {
setupClients();
try {
const runningCount = await getRunningTaskCount(process.env.CLUSTER_NAME, process.env.SERVICE_NAME);
const backlogs = await Promise.all(
convertQueueNames(process.env.QUEUE_NAMES).map(async (queueName) => {
const queueUrl = await getQueueURL(queueName);
return {
queueName: queueName,
backlogPerTask: await getBacklogPerTask(queueUrl, runningCount),
};
})
);
const timestamp = Date.now();
for (const {queueName, backlogPerTask} of backlogs) {
emitBacklogPerTaskMetric(process.env.NAMESPACE, timestamp, queueName, backlogPerTask);
}
} catch(err) {
// If there is any issue we won't log a metric.
// This is okay because autoscaling will maintain the current number of running tasks if a data point is missing.
// See https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-autoscaling-targettracking.html#targettracking-considerations
console.error(`Unexpected error ${err}`);
}
};

/**
* Returns the backlog per task. The backlog per task is calculated by dividing the number of messages in the queue with
* the number of running tasks.
* If there are no running task, we return the total number of messages in the queue so that we can start scaling up.
* @param queueUrl The url of the queue.
* @param runningTaskCount The number of running tasks part of the ECS service.
* @return int The expected number of messages each running task will consume.
*/
const getBacklogPerTask = async (queueUrl, runningTaskCount) => {
const adjustedRunningTasks = runningTaskCount === 0 ? 1 : runningTaskCount;
const totalNumberOfMessages = await getQueueDepth(queueUrl);
return Math.ceil(totalNumberOfMessages/adjustedRunningTasks);
}

/**
* Writes the backlogPerTask metric for the given queue to stdout following the CloudWatch embedded metric format.
* @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation.html
* @param namespace The namespace for the metric.
* @param timestamp The number of milliseconds after Jan 1, 1970 00:00:00 UTC used to emit the metric.
* @param queueName The name of the queue.
* @param backlogPerTask The number of messages in the queue divided by the number of running tasks.
*/
const emitBacklogPerTaskMetric = (namespace, timestamp, queueName, backlogPerTask) => {
console.log(JSON.stringify({
"_aws": {
"Timestamp": timestamp,
"CloudWatchMetrics": [{
"Namespace": namespace,
"Dimensions": [["QueueName"]],
"Metrics": [{"Name":"BacklogPerTask", "Unit": "Count"}]
}],
},
"QueueName": queueName,
"BacklogPerTask": backlogPerTask,
}));
}

/**
* Returns the URL for the SQS queue.
* @param queueName The name of the queue.
* @returns string The URL of the queue.
*/
const getQueueURL = async (queueName) => {
const out = await sqs.getQueueUrl({
QueueName: queueName,
}).promise();
return out.QueueUrl;
}

/**
* Returns the total number of messages in the SQS queue.
* @param queueUrl The URL of the SQS queue.
* @return int The ApproximateNumberOfMessages in the queue.
*/
const getQueueDepth = async (queueUrl) => {
const out = await sqs.getQueueAttributes({
QueueUrl: queueUrl,
AttributeNames: ['ApproximateNumberOfMessages'],
}).promise();
return out.Attributes.ApproximateNumberOfMessages;
}

/**
* Returns the number of running tasks part of the service.
* @param clusterId The short name or full Amazon Resource Name (ARN) of the cluster.
* @param serviceName The service name or full Amazon Resource Name (ARN) of the service.
* @returns int The number of tasks running part of the service.
*/
const getRunningTaskCount = async (clusterId, serviceName) => {
const out = await ecs.describeServices({
cluster: clusterId,
services: [serviceName],
}).promise();
if (out.services.length === 0) {
throw new Error(`service ${serviceName} of cluster ${clusterId} does not exist`);
}
return out.services[0].runningCount;
}

/**
* Create new clients.
*/
const setupClients = () => {
ecs = new aws.ECS();
sqs = new aws.SQS();
}

// convertQueueNames takes a comma separated string of SQS queue names and returns it as an array of strings.
const convertQueueNames = (stringToSplit) => {
return stringToSplit.split(',')
}
172 changes: 172 additions & 0 deletions cf-custom-resources/test/backlog-per-task-calculator-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
"use strict";
const aws = require("aws-sdk-mock");
const lambdaTester = require("lambda-tester").noVersionCheck();
const sinon = require("sinon");
const calculatorLambda = require("../lib/backlog-per-task-calculator");


describe("BacklogPerTask metric calculator", () => {
const origConsole = console;
const origEnvVars = process.env;

beforeAll(() => {
jest
.spyOn(global.Date, 'now')
.mockImplementation(() =>
new Date('2021-09-02').valueOf(), // maps to 1630540800000.
);
});

afterEach(() => {
process.env = origEnvVars;
aws.restore();
});

afterAll(() => {
console = origConsole;
jest.spyOn(global.Date, 'now').mockClear();
});

test("should write the error to console on unexpected failure", async () => {
// GIVEN
console.error = sinon.stub();
console.log = sinon.stub()
aws.mock("ECS", "describeServices", sinon.fake.rejects("some message"));


// WHEN
const tester = lambdaTester(calculatorLambda.handler)
.event({});

// THEN
await tester.expectResolve(() => {
sinon.assert.called(console.error);
sinon.assert.calledWith(console.error, "Unexpected error Error: some message");
sinon.assert.notCalled(console.log);
});
});

test("should write the total number of messages to console.log if there are no tasks running", async () => {
// GIVEN
process.env = {
...process.env,
NAMESPACE: "app-env-service",
CLUSTER_NAME: "cluster",
SERVICE_NAME: "service",
QUEUE_NAMES: "queue1",
}
console.error = sinon.stub();
console.log = sinon.stub();

aws.mock("ECS", "describeServices", sinon.fake.resolves({
services: [
{
runningCount: 0,
},
],
}));
aws.mock("SQS", "getQueueUrl", sinon.fake.resolves({
QueueUrl: "url",
}));
aws.mock("SQS", "getQueueAttributes", sinon.fake.resolves({
Attributes: {
ApproximateNumberOfMessages: 100,
},
}));


// WHEN
const tester = lambdaTester(calculatorLambda.handler)
.event({});

// THEN
await tester.expectResolve(() => {
sinon.assert.called(console.log);
sinon.assert.calledWith(console.log, JSON.stringify({
"_aws": {
"Timestamp": 1630540800000,
"CloudWatchMetrics": [{
"Namespace": "app-env-service",
"Dimensions": [["QueueName"]],
"Metrics": [{"Name":"BacklogPerTask", "Unit": "Count"}]
}],
},
"QueueName": "queue1",
"BacklogPerTask": 100,
}));
sinon.assert.notCalled(console.error);
});
});

test("should write the backlog per task for each queue", async () => {
// GIVEN
process.env = {
...process.env,
NAMESPACE: "app-env-service",
CLUSTER_NAME: "cluster",
SERVICE_NAME: "service",
QUEUE_NAMES: "queue1,queue2",
}
console.error = sinon.stub();
console.log = sinon.stub();

aws.mock("ECS", "describeServices", sinon.fake.resolves({
services: [
{
runningCount: 3,
},
],
}));
aws.mock("SQS", "getQueueUrl", sinon.fake.resolves({
QueueUrl: "url",
}));
aws.mock("SQS", "getQueueAttributes", sinon.stub()
.onFirstCall().resolves({
Attributes: {
ApproximateNumberOfMessages: 100,
},
})
.onSecondCall().resolves({
Attributes: {
ApproximateNumberOfMessages: 495,
},
}));


// WHEN
const tester = lambdaTester(calculatorLambda.handler)
.event({});

// THEN
await tester.expectResolve(() => {
sinon.assert.called(console.log);
sinon.assert.calledWith(console.log.firstCall, JSON.stringify({
"_aws": {
"Timestamp": 1630540800000,
"CloudWatchMetrics": [{
"Namespace": "app-env-service",
"Dimensions": [["QueueName"]],
"Metrics": [{"Name":"BacklogPerTask", "Unit": "Count"}]
}],
},
"QueueName": "queue1",
"BacklogPerTask": 34,
}));
sinon.assert.calledWith(console.log.secondCall, JSON.stringify({
"_aws": {
"Timestamp": 1630540800000,
"CloudWatchMetrics": [{
"Namespace": "app-env-service",
"Dimensions": [["QueueName"]],
"Metrics": [{"Name":"BacklogPerTask", "Unit": "Count"}]
}],
},
"QueueName": "queue2",
"BacklogPerTask": 165,
}));
sinon.assert.notCalled(console.error);
});
});
});

0 comments on commit 1c6e51b

Please sign in to comment.