Skip to content

Commit

Permalink
Update positions on investment-transaction changes (#214)
Browse files Browse the repository at this point in the history
* Update yahoo finance

* Fix updatePosition

* Fix

* Update queue

* Update logs

* Add tests

* Update schema

* Update gitignore

---------

Co-authored-by: Eric Bach <[email protected]>
  • Loading branch information
eric-bach and ericbach authored Jan 1, 2025
1 parent 92cde42 commit 2ff08c1
Show file tree
Hide file tree
Showing 30 changed files with 813 additions and 3,016 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@ yarn-error.log*
!tailwind.config.js
!next.config.js
!postcss.config.js

# reference
~*.xlsx
49 changes: 49 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,55 @@ Ensure to configure the GitHub Secrets to include:
CYPRESS_PASSWORD - The Cognito password for Cypress integration testing
```

# Troubleshooting

## Lambda DLQ

To retry the Lambda function

1. Get the message from the SQS DLQ

```
aws sqs receive-message --queue-url https://sqs.us-east-1.amazonaws.com/524849261220/pecuniary-dev-updatePosition-DLQ --profile bach-dev
```

2. Save the DLQ message event to a json file

```
{
"version": "0",
"id": "26b8f7d1-b141-9baf-8a01-f625b7c3b0bc",
"detail-type": "InvestmentTransactionSavedEvent",
"source": "custom.pecuniary",
"account": "524849261220",
"time": "2024-12-31T20:29:49Z",
"region": "us-east-1",
"resources": [],
"detail": {
"accountId": "c8df14ba-0bf9-43dc-a92f-0185714336a7",
"transactionDate": "2024-12-31",
"type": "Buy",
"symbol": "FNMA",
"shares": 123.0,
"price": 12.0,
"commission": 12.0,
"userId": "a4e824d8-2021-7008-6807-ec5d9400debb"
}
}
```

3. Run the CLI command to invoke lambda

```
aws lambda invoke --function-name pecuniary-dev-UpdatePosition --payload fileb://request.json response.json --profile bach-dev
```

4. Run the CLI command to remove the message from the SQS DLQ

```
aws sqs delete-message --queue-url https://sqs.us-east-1.amazonaws.com/524849261220/pecuniary-dev-updatePosition-DLQ --receipt-handle <receipt-handle> --profile bach-dev
```

# Event Sourcing and CQRS Architecture

For more detailed information about the event-driven nature of the Pecuniary application and it's architecture, please see the [Architecture.md](ARCHITECTURE.md)
Expand Down
10 changes: 7 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ X Switch transactionsResolver to use AppSync JS pipeline resolvers

##### Current Task

- Rearchitect updatePositions/updateBalances to use AppSync JS Resolvers
- Add Python Lambda function to updatePositionMakretPrice
- Ensure updatePositions updates on investment transactions create/update - FAILING, add tests to Lambdas
X Rearchitect updatePositions/updateBalances to use AppSync JS Resolvers
X BUG: Creates multiple symbols in drop down
X Ensure updatePositions updates on investment transactions create/update

- Create updateBalances updates on bank transactions create/update
- simulate failure in updatePosition Lambda to ensure it goes to DLQ (i.e. set the pk to undefined L133)
- add tests to updatePosition Lambda
- Update dashboard to pull positions and totals - requires new API getPositions(userId)

##### Future Task

Expand Down
2 changes: 1 addition & 1 deletion backend/jest.config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module.exports = {
testEnvironment: 'node',
roots: ['<rootDir>/test'],
roots: ['<rootDir>/test', '<rootDir>/src/lambda'],
testMatch: ['**/*.test.ts'],
transform: {
'^.+\\.tsx?$': 'ts-jest',
Expand Down
140 changes: 41 additions & 99 deletions backend/lib/api-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { UserPool } from 'aws-cdk-lib/aws-cognito';
import { PolicyStatement, Effect, Role, ServicePrincipal, PolicyDocument } from 'aws-cdk-lib/aws-iam';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { EmailSubscription } from 'aws-cdk-lib/aws-sns-subscriptions';
import { Alarm, Metric, ComparisonOperator } from 'aws-cdk-lib/aws-cloudwatch';
import { Alarm, Metric, ComparisonOperator, TreatMissingData } from 'aws-cdk-lib/aws-cloudwatch';
import { SnsAction } from 'aws-cdk-lib/aws-cloudwatch-actions';
import {
Code,
Expand All @@ -25,8 +25,8 @@ import { LambdaFunction } from 'aws-cdk-lib/aws-events-targets';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { Table } from 'aws-cdk-lib/aws-dynamodb';
import { PecuniaryApiStackProps } from './types/PecuniaryStackProps';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import { LayerVersion, Runtime, Tracing } from 'aws-cdk-lib/aws-lambda';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';

const dotenv = require('dotenv');
dotenv.config();
Expand All @@ -43,47 +43,48 @@ export class ApiStack extends Stack {
***/

// Event handler DLQ
const eventHandlerQueue = new Queue(this, 'EventHandlerQueue', {
queueName: `${props.appName}-eventHandler-DeadLetterQueue-${props.envName}`,
const updatePositionDLQ = new Queue(this, 'UpdatePositionDLQ', {
queueName: `${props.appName}-${props.envName}-updatePosition-DLQ`,
});

/***
*** AWS SNS - Topics
***/

const eventHandlerTopic = new Topic(this, 'EventHandlerTopic', {
topicName: `${props.appName}-eventHandler-Topic-${props.envName}`,
displayName: 'Event Handler Topic',
const updatePositionNotification = new Topic(this, 'UpdatePositionNotification', {
topicName: `${props.appName}-${props.envName}-updatePosition-Notification`,
displayName: 'Update Postion DLQ Notification',
});
if (props.params.dlqNotifications) {
eventHandlerTopic.addSubscription(new EmailSubscription(props.params.dlqNotifications));
updatePositionNotification.addSubscription(new EmailSubscription(props.params.dlqNotifications));
}

/***
*** AWS CloudWatch - Alarms
***/

// Generic metric
const metric = new Metric({
const dlqMetric = new Metric({
namespace: 'AWS/SQS',
metricName: 'NumberOfMessagesSent',
});
// TODO Doesn't seem to work
metric.with({
metricName: 'ApproximateNumberOfMessagesVisible',
dimensionsMap: {
QueueName: updatePositionDLQ.queueName,
},
period: Duration.minutes(1),
statistic: 'Sum',
period: Duration.seconds(300),
});

const eventHandlerAlarm = new Alarm(this, 'EventHandlerAlarm', {
alarmName: `${props.appName}-eventHandler-Alarm-${props.envName}`,
alarmDescription: 'One or more failed EventHandler messages',
metric: metric,
const updatePositionAlarm = new Alarm(this, 'UpdatePositionAlarm', {
alarmName: `${props.appName}-${props.envName}-updatePosition-Alarm`,
alarmDescription: 'Unable to update one or more positions',
metric: dlqMetric,
datapointsToAlarm: 1,
evaluationPeriods: 2,
evaluationPeriods: 1,
threshold: 1,
comparisonOperator: ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
treatMissingData: TreatMissingData.NOT_BREACHING,
});
eventHandlerAlarm.addAlarmAction(new SnsAction(eventHandlerTopic));
updatePositionAlarm.addAlarmAction(new SnsAction(updatePositionNotification));

/***
*** AWS EventBridge - Event Bus
Expand Down Expand Up @@ -541,99 +542,58 @@ export class ApiStack extends Stack {
***/

// AWS ADOT Lambda layer
const adotLayer = LayerVersion.fromLayerVersionArn(
const adotJavscriptLayer = LayerVersion.fromLayerVersionArn(
this,
'adotLayer',
'adotJavascriptLayer',
`arn:aws:lambda:${Stack.of(this).region}:901920570463:layer:aws-otel-nodejs-amd64-ver-1-18-1:4`
);

const updatePositionsFunction = new NodejsFunction(this, 'UpdatePositions', {
runtime: Runtime.NODEJS_18_X,
functionName: `${props.appName}-${props.envName}-UpdatePositions`,
const updatePositionFunction = new NodejsFunction(this, 'UpdatePosition', {
runtime: Runtime.NODEJS_22_X,
functionName: `${props.appName}-${props.envName}-UpdatePosition`,
handler: 'handler',
entry: path.resolve(__dirname, '../src/lambda/updatePositions/main.ts'),
entry: path.resolve(__dirname, '../src/lambda/updatePosition/main.ts'),
memorySize: 1024,
timeout: Duration.seconds(10),
tracing: Tracing.ACTIVE,
layers: [adotLayer],
layers: [adotJavscriptLayer],
environment: {
REGION: Stack.of(this).region,
DATA_TABLE_NAME: dataTable.tableName,
EVENTBUS_PECUNIARY_NAME: eventBus.eventBusName,
AWS_LAMBDA_EXEC_WRAPPER: '/opt/otel-handler',
},
deadLetterQueue: eventHandlerQueue,
deadLetterQueue: updatePositionDLQ,
});
// Add permissions to call DynamoDB
updatePositionsFunction.addToRolePolicy(
updatePositionFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['dynamodb:Query'],
resources: [dataTable.tableArn, dataTable.tableArn + '/index/accountId-gsi', dataTable.tableArn + '/index/transaction-gsi'],
})
);
updatePositionsFunction.addToRolePolicy(
updatePositionFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['dynamodb:GetItem', 'dynamodb:PutItem', 'dynamodb:UpdateItem'],
resources: [dataTable.tableArn],
})
);
// Add permission to send to EventBridge
updatePositionsFunction.addToRolePolicy(
updatePositionFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['events:PutEvents'],
resources: [eventBus.eventBusArn],
})
);
// Add permission send message to SQS
updatePositionsFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['SQS:SendMessage', 'SNS:Publish'],
resources: [eventHandlerQueue.queueArn],
})
);

// TODO: Switch to python and use yfinance
const updatePositionMarketValueFunction = new NodejsFunction(this, 'UpdatePositionMarketValue', {
runtime: Runtime.NODEJS_22_X,
functionName: `${props.appName}-${props.envName}-UpdatePositionMarketValue`,
handler: 'handler',
entry: path.resolve(__dirname, '../src/lambda/updateMarketValue/main.ts'),
memorySize: 1024,
timeout: Duration.seconds(10),
tracing: Tracing.ACTIVE,
layers: [adotLayer],
environment: {
REGION: Stack.of(this).region,
DATA_TABLE_NAME: dataTable.tableName,
AWS_LAMBDA_EXEC_WRAPPER: '/opt/otel-handler',
},
deadLetterQueue: eventHandlerQueue,
});
// Add permissions to call DynamoDB
updatePositionMarketValueFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['dynamodb:Query'],
resources: [dataTable.tableArn, dataTable.tableArn + '/index/accountId-gsi'],
})
);
updatePositionMarketValueFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['dynamodb:GetItem', 'dynamodb:PutItem', 'dynamodb:UpdateItem'],
resources: [dataTable.tableArn],
})
);
// Add permission send message to SQS
updatePositionMarketValueFunction.addToRolePolicy(
updatePositionFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['SQS:SendMessage', 'SNS:Publish'],
resources: [eventHandlerQueue.queueArn],
resources: [updatePositionDLQ.queueArn],
})
);

Expand All @@ -652,25 +612,7 @@ export class ApiStack extends Stack {
},
});
investmentTransactionSavedRule.addTarget(
new LambdaFunction(updatePositionsFunction, {
//deadLetterQueue: SqsQueue,
maxEventAge: Duration.hours(2),
retryAttempts: 2,
})
);

// EventBus Rule - PositionUpdatedEventRule
const positionUpdatedEventRule = new Rule(this, 'PositionUpdatedEventRule', {
ruleName: `${props.appName}-PositionUpdatedEventRule-${props.envName}`,
description: 'PositionUpdatedEvent',
eventBus: eventBus,
eventPattern: {
source: ['custom.pecuniary'],
detailType: ['PositionUpdatedEvent'],
},
});
positionUpdatedEventRule.addTarget(
new LambdaFunction(updatePositionMarketValueFunction, {
new LambdaFunction(updatePositionFunction, {
//deadLetterQueue: SqsQueue,
maxEventAge: Duration.hours(2),
retryAttempts: 2,
Expand All @@ -682,13 +624,13 @@ export class ApiStack extends Stack {
***/

// Dead Letter Queues
new CfnOutput(this, 'EventHandlerQueueArn', {
value: eventHandlerQueue.queueArn,
exportName: `${props.appName}-${props.envName}-eventHandlerQueueArn`,
new CfnOutput(this, 'UpdatePositionDLQArn', {
value: updatePositionDLQ.queueArn,
exportName: `${props.appName}-${props.envName}-updatePositionDLQArn`,
});

// SNS Topics
new CfnOutput(this, 'EventHandlerTopicArn', { value: eventHandlerTopic.topicArn });
new CfnOutput(this, 'UpdatePositionNotificationArn', { value: updatePositionNotification.topicArn });

// AppSync API
new CfnOutput(this, 'GraphQLApiUrl', { value: api.graphqlUrl });
Expand All @@ -700,8 +642,8 @@ export class ApiStack extends Stack {
});

// Lambda functions
new CfnOutput(this, 'UpdatePositionsFunctionArn', {
value: updatePositionsFunction.functionArn,
new CfnOutput(this, 'UpdatePositionFunctionArn', {
value: updatePositionFunction.functionArn,
});
}
}
Loading

0 comments on commit 2ff08c1

Please sign in to comment.