Skip to content

Commit

Permalink
WIP add update market value
Browse files Browse the repository at this point in the history
  • Loading branch information
ericbach committed Dec 30, 2024
1 parent 01652a8 commit 92cde42
Show file tree
Hide file tree
Showing 12 changed files with 2,372 additions and 62 deletions.
1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ X Switch transactionsResolver to use AppSync JS pipeline resolvers
##### Current Task

- Rearchitect updatePositions/updateBalances to use AppSync JS Resolvers
- Add Python Lambda function to updatePositionMakretPrice
- Ensure updatePositions updates on investment transactions create/update - FAILING, add tests to Lambdas
- Create updateBalances updates on bank transactions create/update

Expand Down
59 changes: 59 additions & 0 deletions backend/lib/api-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,47 @@ export class ApiStack extends Stack {
})
);

// TODO: Switch to python and use yfinance
const updatePositionMarketValueFunction = new NodejsFunction(this, 'UpdatePositionMarketValue', {
runtime: Runtime.NODEJS_22_X,
functionName: `${props.appName}-${props.envName}-UpdatePositionMarketValue`,
handler: 'handler',
entry: path.resolve(__dirname, '../src/lambda/updateMarketValue/main.ts'),
memorySize: 1024,
timeout: Duration.seconds(10),
tracing: Tracing.ACTIVE,
layers: [adotLayer],
environment: {
REGION: Stack.of(this).region,
DATA_TABLE_NAME: dataTable.tableName,
AWS_LAMBDA_EXEC_WRAPPER: '/opt/otel-handler',
},
deadLetterQueue: eventHandlerQueue,
});
// Add permissions to call DynamoDB
updatePositionMarketValueFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['dynamodb:Query'],
resources: [dataTable.tableArn, dataTable.tableArn + '/index/accountId-gsi'],
})
);
updatePositionMarketValueFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['dynamodb:GetItem', 'dynamodb:PutItem', 'dynamodb:UpdateItem'],
resources: [dataTable.tableArn],
})
);
// Add permission send message to SQS
updatePositionMarketValueFunction.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['SQS:SendMessage', 'SNS:Publish'],
resources: [eventHandlerQueue.queueArn],
})
);

/***
*** AWS EventBridge - Event Bus Rules
***/
Expand All @@ -618,6 +659,24 @@ export class ApiStack extends Stack {
})
);

// EventBus Rule - PositionUpdatedEventRule
const positionUpdatedEventRule = new Rule(this, 'PositionUpdatedEventRule', {
ruleName: `${props.appName}-PositionUpdatedEventRule-${props.envName}`,
description: 'PositionUpdatedEvent',
eventBus: eventBus,
eventPattern: {
source: ['custom.pecuniary'],
detailType: ['PositionUpdatedEvent'],
},
});
positionUpdatedEventRule.addTarget(
new LambdaFunction(updatePositionMarketValueFunction, {
//deadLetterQueue: SqsQueue,
maxEventAge: Duration.hours(2),
retryAttempts: 2,
})
);

/***
*** Outputs
***/
Expand Down
18 changes: 8 additions & 10 deletions backend/src/lambda/types/Position.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
export type PositionReadModel = {
pk: string;
userId: string;
sk: string;
createdAt: Date;
updatedAt: Date;
aggregateId: string;
createdAt: string;
updatedAt: string;
accountId: string;
entity: string;
type: string;
description: string;
symbol: string;
exchange: string;
currency: string;
shares: number;
acb: number;
bookValue: number;

description: string;
exchange: string;
currency: string;
marketValue: number;
acb: number;
lastTransactionDate: Date;
};

export type PositionAppSyncEvent = {
Expand Down
21 changes: 21 additions & 0 deletions backend/src/lambda/updateMarketValue/helpers/dynamoDbCommand.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');

async function dynamoDbCommand(command: any) {
var result;

try {
console.debug(`ℹ️ Initializing DynamoDB client in ${process.env.REGION}`);
var client = new DynamoDBClient({ region: process.env.REGION });

console.debug(`ℹ️ Executing DynamoDB command:\n${JSON.stringify(command)}`);
result = await client.send(command);

console.log(`🔔 DynamoDB result:\n${JSON.stringify(result)}`);
} catch (error) {
console.error(`🛑 Error with DynamoDB command:\n`, error);
}

return result;
}

export default dynamoDbCommand;
166 changes: 166 additions & 0 deletions backend/src/lambda/updateMarketValue/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import { EventBridgeEvent } from 'aws-lambda';
import { QueryCommand, QueryCommandInput, PutItemCommand, PutItemCommandInput } from '@aws-sdk/client-dynamodb';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';

import { getQuoteSummary } from './yahooFinance';
import { PositionReadModel } from '../types/Position';
import { CreateInvestmentTransactionInput, InvestmentTransaction } from '../../appsync/api/codegen/appsync';
import dynamoDbCommand from './helpers/dynamoDbCommand';

type CreateTransactionInputV2 = {
userId: string;
symbol: string;
} & CreateInvestmentTransactionInput;

exports.handler = async (event: EventBridgeEvent<string, CreateTransactionInputV2>) => {
const detail = parseEvent(event);

// Get all transactions
const transactions = await getTransactions(detail);

// Calculate ACB
const { shares, acb, bookValue } = calculateAdjustedCostBase(transactions);

// Save Position - create if not exists, update if exists
return await savePosition(detail, shares, acb, bookValue);
};

// Returns all transactions for the symbol sorted in ascending order
async function getTransactions(detail: CreateTransactionInputV2): Promise<InvestmentTransaction[]> {
const params: QueryCommandInput = {
TableName: process.env.DATA_TABLE_NAME,
IndexName: 'transaction-gsi',
ScanIndexForward: true,
KeyConditionExpression: 'accountId = :v1',
FilterExpression: 'userId = :v2 AND entity = :v3 AND symbol = :v4',
ExpressionAttributeValues: {
':v1': { S: detail.accountId },
':v2': { S: detail.userId },
':v3': { S: 'investment-transaction' },
':v4': { S: detail.symbol },
},
};
const result = await dynamoDbCommand(new QueryCommand(params));

if (result.$metadata.httpStatusCode === 200) {
const transactions = result.Items?.map((Item: Record<string, any>) => unmarshall(Item));

console.log(`🔔 Found ${transactions.length} Transactions: ${JSON.stringify(transactions)}`);

return transactions;
}

console.log(`🛑 Could not find transactions: ${result}`);
return [] as InvestmentTransaction[];
}

async function getPosition(detail: CreateTransactionInputV2): Promise<PositionReadModel> {
const params: QueryCommandInput = {
TableName: process.env.DATA_TABLE_NAME,
IndexName: 'accountId-gsi',
KeyConditionExpression: 'accountId = :v1',
FilterExpression: 'userId = :v2 AND entity = :v3 AND symbol = :v4',
ExpressionAttributeValues: {
':v1': { S: detail.accountId },
':v2': { S: detail.userId },
':v3': { S: 'position' },
':v4': { S: detail.symbol },
},
};
const result = await dynamoDbCommand(new QueryCommand(params));

if (result.$metadata.httpStatusCode === 200) {
const position = result.Items?.map((Item: Record<string, any>) => unmarshall(Item))[0];
console.log(`🔔 Found Position: ${JSON.stringify(position)}`);

return position;
}

console.log(`🛑 Could not find Position: ${result}`);
return {} as PositionReadModel;
}

function calculateAdjustedCostBase(transactions: InvestmentTransaction[]) {
let acb = 0;
let shares = 0;
let bookValue = 0;

for (const t of transactions) {
const transactionType = t.type.toUpperCase();

console.debug(`START-${transactionType} acb: $${acb} positions: ${shares}`);

if (transactionType.localeCompare('buy', undefined, { sensitivity: 'base' }) === 0) {
// BUY: acb = prevAcb + (shares * price + commission)
acb = acb + (t.shares * t.price + t.commission);

shares = shares + t.shares;
bookValue = bookValue + (t.shares * t.price - t.commission);
} else if (transactionType.localeCompare('sell', undefined, { sensitivity: 'base' }) === 0) {
// SELL: acb = prevAcb * ((prevPositions - shares) / prevPositions)
acb = acb * ((shares - t.shares) / shares);

shares = shares - t.shares;
bookValue = bookValue - (t.shares * t.price - t.commission);
}

console.debug(`END-${transactionType} acb: $${acb} positions: ${shares}`);
}

return { shares, acb, bookValue };
}

async function savePosition(detail: CreateTransactionInputV2, shares: number, acb: number, bookValue: number) {
// Get current position (if exists)
const position = await getPosition(detail);

// Get quote for symbol
const quote = await getQuoteSummary(detail.symbol);

let result;

if (quote && quote.close) {
// Calculate market value
const marketValue = quote.close * shares;

const item = {
pk: position ? position.pk : 'pos#' + detail.accountId,
userId: detail.userId,
//sk: position ? position.sk : "ACCPOS#" + new Date().toISOString(),
createdAt: position ? position.createdAt : new Date().toISOString(),
updatedAt: new Date().toISOString(),
accountId: detail.accountId,
entity: 'position',
symbol: quote.symbol,
description: quote.description,
exchange: quote.exchange,
currency: quote.currency,
shares: shares,
acb: acb,
bookValue: bookValue,
marketValue: marketValue,
};

const putItemCommandInput: PutItemCommandInput = {
TableName: process.env.DATA_TABLE_NAME,
Item: marshall(item),
};
result = await dynamoDbCommand(new PutItemCommand(putItemCommandInput));

if (result.$metadata.httpStatusCode === 200) {
console.log(`✅ Saved Position: { result: ${JSON.stringify(result)}, items: ${JSON.stringify(item)} }`);
return result.Items;
}
}

console.log(`🛑 Could not save Position ${detail.symbol}:`, result);
return {};
}

function parseEvent(event: EventBridgeEvent<string, CreateTransactionInputV2>): CreateTransactionInputV2 {
const eventString: string = JSON.stringify(event);

console.debug(`🕧 Received event: ${eventString}`);

return JSON.parse(eventString).detail;
}
Loading

0 comments on commit 92cde42

Please sign in to comment.