Skip to content

Commit

Permalink
Define failure modes for Lambdas (#29)
Browse files Browse the repository at this point in the history
* Defined failure modes for monitoring lambdas
* Fixed lambdas implementation and tests accordingly to failure modes
* Defined failure mode for aggregate worker
* Created sqs queues and eventbridge rules for event delivery
* Added policies to allow EventBridge to push to SQS
  • Loading branch information
Rocco Zanni authored Nov 3, 2023
1 parent decbc24 commit 233cd64
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 66 deletions.
21 changes: 20 additions & 1 deletion analytics/deployment/resources/events.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,23 @@ Resources:
MainEventBus:
Type: AWS::Events::EventBus
Properties:
Name: ${self:custom.appPrefix}-${opt:stage}-main
Name: ${self:custom.appPrefix}-${opt:stage}-main
SendToSQSArchiveEventRule:
Type: AWS::Events::Rule
Properties:
EventBusName: !Ref MainEventBus
EventPattern:
source:
- monitor
State: ENABLED
Targets:
- Id: send-to-sqs-archive-raw
Arn:
Fn::GetAtt:
- ArchiveRawQueue
- Arn
- Id: send-to-sqs-archive-database
Arn:
Fn::GetAtt:
- ArchiveDatabaseQueue
- Arn
70 changes: 69 additions & 1 deletion analytics/deployment/resources/sqs.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,74 @@
Resources:
BlackholeDeadletterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-blackhole
MessageRetentionPeriod: 60
ArchiveDeadletterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-archive-deadletter
MessageRetentionPeriod: 604800
ArchiveRawQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-archive-raw
VisibilityTimeout: 60
RedrivePolicy:
maxReceiveCount: 5
deadLetterTargetArn:
Fn::GetAtt:
- ArchiveDeadletterQueue
- Arn
ArchiveRawQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Statement:
- Effect: Allow
Principal:
Service: events.amazonaws.com
Action: SQS:SendMessage
Resource:
Fn::GetAtt:
- ArchiveRawQueue
- Arn
Queues:
- Ref: ArchiveRawQueue
ArchiveDatabaseQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-archive-database
VisibilityTimeout: 60
RedrivePolicy:
maxReceiveCount: 5
deadLetterTargetArn:
Fn::GetAtt:
- ArchiveDeadletterQueue
- Arn
ArchiveDatabaseQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Statement:
- Effect: Allow
Principal:
Service: events.amazonaws.com
Action: SQS:SendMessage
Resource:
Fn::GetAtt:
- ArchiveDatabaseQueue
- Arn
Queues:
- Ref: ArchiveDatabaseQueue
AggregateQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-aggregate
VisibilityTimeout: 60
VisibilityTimeout: 60
RedrivePolicy:
maxReceiveCount: 1
deadLetterTargetArn:
Fn::GetAtt:
- BlackholeDeadletterQueue
- Arn
12 changes: 4 additions & 8 deletions analytics/functions/aggregate-daily/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@ export const aggregateDaily = async function (event, context) {
})).map(buildAggregationMessage);

for (const batch of chunk(messages, 10)) {
try {
await client.send(new SendMessageBatchCommand({
QueueUrl: process.env.AGGREGATE_QUEUE_URL,
Entries: batch
}));
} catch (err) {
console.error(`Cannot schedule aggregation batch`, batch, err);
}
await client.send(new SendMessageBatchCommand({
QueueUrl: process.env.AGGREGATE_QUEUE_URL,
Entries: batch
}));
}
}
8 changes: 3 additions & 5 deletions analytics/functions/aggregate-daily/handler_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { aggregateDaily } from "./handler.js"
import config from "../../conf/config.js";
import { flattenDeep } from "lodash-es";
import { use } from "../../common/fixtures.js";
import { throwsAsync } from '../../common/assert.js'

const expectedMessages = flattenDeep(config.regions.map((region) => {
return config.endpoints.map((endpoint) => {
Expand Down Expand Up @@ -32,13 +33,10 @@ describe('analytics - aggregateDaily', () => {
assert.deepStrictEqual(expectedMessages, actualMessages);
});

it('should handle SQS errors', async (t) => {
it('should throw on SQS errors', async (t) => {

const errorLogger = t.mock.method(console, 'error', () => { });
sqs.rejects('simulated error');

await aggregateDaily();

assert.equal(errorLogger.mock.calls.length, Math.ceil(expectedMessages.length / 10));
await throwsAsync(aggregateDaily());
});
});
12 changes: 4 additions & 8 deletions analytics/functions/aggregate-minutely/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ export const aggregateMinutely = async function (event, context) {
})).map(buildAggregationMessage);

for (const batch of chunk(messages, 10)) {
try {
await client.send(new SendMessageBatchCommand({
QueueUrl: process.env.AGGREGATE_QUEUE_URL,
Entries: batch
}));
} catch (err) {
console.error(`Cannot schedule aggregation batch`, batch, err);
}
await client.send(new SendMessageBatchCommand({
QueueUrl: process.env.AGGREGATE_QUEUE_URL,
Entries: batch
}));
}
}
8 changes: 3 additions & 5 deletions analytics/functions/aggregate-minutely/handler_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { aggregateMinutely } from "./handler.js"
import config from "../../conf/config.js";
import { flattenDeep } from "lodash-es";
import { use } from "../../common/fixtures.js";
import { throwsAsync } from '../../common/assert.js'

const expectedMessages = flattenDeep(config.regions.map((region) => {
return [
Expand Down Expand Up @@ -38,13 +39,10 @@ describe('analytics - aggregateMinutely', () => {
assert.deepStrictEqual(expectedMessages, actualMessages);
});

it('should handle SQS errors', async (t) => {
it('should throw SQS errors', async (t) => {

const errorLogger = t.mock.method(console, 'error', () => { });
sqs.rejects('simulated error');

await aggregateMinutely();

assert.equal(errorLogger.mock.calls.length, Math.ceil(expectedMessages.length / 10));
await throwsAsync(aggregateMinutely());
});
});
69 changes: 63 additions & 6 deletions analytics/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,16 @@ resources:
- ${file(deployment/resources/sqs.yml)}

functions:

#
# Function: archiveRaw
# Purpose: archive a raw copy of the incoming measurement event
# on S3 for troubleshooting purposes and re-ingestion
# Failure mode: This function is invoked by the Lambda/SQS poller
# so its retry policy is managed by the SQS queue configuration. In this
# specific case, it's setup to allow for 5 deliveries and then move the
# message to a DLQ that hold the messages for a week to allow for a redrive
#
# TODO switch to SQS in a separate PR to avoid data loss
archiveRaw:
name: ${self:custom.appPrefix}-archiveRaw
handler: functions/archive-raw/handler.archiveRaw
Expand All @@ -124,7 +133,16 @@ functions:
pattern:
source:
- monitor

#
# Function: archiveDatabase
# Purpose: archive a the measurement event in Aurora to be used by
# the aggregation tasks
# Failure mode: This function is invoked by the Lambda/SQS poller
# so its retry policy is managed by the SQS queue configuration. In this
# specific case, it's setup to allow for 5 deliveries and then move the
# message to a DLQ that hold the messages for a week to allow for a redrive
#
# TODO switch to SQS in a separate PR to avoid data loss
archiveDatabase:
name: ${self:custom.appPrefix}-archiveDatabase
handler: functions/archive-database/handler.archiveDatabase
Expand All @@ -135,7 +153,17 @@ functions:
pattern:
source:
- monitor

#
# Function: aggregateMinutely
# Purpose: kickstart aggregation jobs that are supposed to run
# on a minutely schedule
# Failure mode: This function is invoked async from EventBridge so its retry
# policy is managed with maximumRetryAttempts/onFailure. In this
# specific case being this function re-triggered every minute from
# a scheduled event, we're NOT going to perform any retry in case
# this function fail and we don't need the initial event to be
# delivered to a DLQ
#
aggregateMinutely:
name: ${self:custom.appPrefix}-aggregateMinutely
handler: functions/aggregate-minutely/handler.aggregateMinutely
Expand All @@ -147,7 +175,17 @@ functions:
- QueueUrl
events:
- schedule: rate(1 minute)

maximumRetryAttempts: 0
#
# Function: aggregateDaily
# Purpose: kickstart aggregation jobs that are supposed to run
# on a daily schedule
# Failure mode: This function is invoked async from EventBridge so its retry
# policy is managed with maximumRetryAttempts/onFailure. In this
# specific case to avoid waiting until the next day to have it triggered
# again we allow for a few retries but we don't need the initial event to be
# delivered to a DLQ
#
aggregateDaily:
name: ${self:custom.appPrefix}-aggregateDaily
handler: functions/aggregate-daily/handler.aggregateDaily
Expand All @@ -159,7 +197,16 @@ functions:
- QueueUrl
events:
- schedule: cron(5 0 * * ? *)

maximumRetryAttempts: 2
#
# Function: aggregateWorker
# Purpose: perform a single aggregation job and save the results
# on S3
# Failure mode: This function is invoked by the Lambda/SQS poller
# so its retry policy is managed by the SQS queue configuration. In this
# specific case, it's setup to allow for single delivery and then move the
# message to a blackhole DLQ that evicts them fast and we'd never redrive.
#
aggregateWorker:
name: ${self:custom.appPrefix}-aggregateWorker
handler: functions/aggregate-worker/handler.aggregateWorker
Expand All @@ -174,9 +221,19 @@ functions:
Fn::GetAtt:
- AggregateQueue
- Arn

#
# Function: cleanupDatabase
# Purpose: implement database retention period by deleting rows falling
# out of the retention window.
# Failure mode: This function is invoked async from EventBridge so its retry
# policy is managed with maximumRetryAttempts/onFailure. In this
# specific case to avoid waiting until the next day to have it triggered
# again we allow for a few retries but we don't need the initial event to be
# delivered to a DLQ
#
cleanupDatabase:
name: ${self:custom.appPrefix}-cleanupDatabase
handler: functions/cleanup-database/handler.cleanupDatabase
events:
- schedule: rate(1 day)
maximumRetryAttempts: 2
12 changes: 12 additions & 0 deletions monitor/common/assert.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import assert from 'node:assert/strict';

export const throwsAsync = async function (promise, error) {
let f = () => { };
try {
await promise;
} catch (e) {
f = () => { throw e };
} finally {
assert.throws(f, error);
}
}
13 changes: 12 additions & 1 deletion monitor/deployment/resources/sqs.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
Resources:
BlackholeDeadletterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-blackhole
MessageRetentionPeriod: 60
CheckEndpointQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-check-endpoint
QueueName: ${self:custom.appPrefix}-${opt:stage}-check-endpoint
RedrivePolicy:
maxReceiveCount: 1
deadLetterTargetArn:
Fn::GetAtt:
- BlackholeDeadletterQueue
- Arn
20 changes: 8 additions & 12 deletions monitor/functions/check-endpoint/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,12 @@ export const checkEndpoint = async function (event, context) {
const service = JSON.parse(event.Records[0].body);
const result = await performCheck(service, context?.agentFactory ?? createAgent);

try {
await client.send(new PutEventsCommand({
Entries: [{
EventBusName: process.env.EVENT_BUS,
Source: 'monitor',
DetailType: "measurement",
Detail: JSON.stringify(result)
}]
}));
} catch (e) {
console.error("Put event error", result, e);
}
await client.send(new PutEventsCommand({
Entries: [{
EventBusName: process.env.EVENT_BUS,
Source: 'monitor',
DetailType: "measurement",
Detail: JSON.stringify(result)
}]
}));
}
Loading

0 comments on commit 233cd64

Please sign in to comment.