Skip to content

Commit

Permalink
chore: added first surgical operation
Browse files Browse the repository at this point in the history
  • Loading branch information
froid1911 committed Mar 15, 2024
1 parent 62eb5d9 commit 8126702
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 116 deletions.
214 changes: 100 additions & 114 deletions api/src/modules/real-world-assets/listener.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Prisma } from "@prisma/client";
import { Prisma, RWAPortfolio } from "@prisma/client";
import { InternalTransmitterUpdate, OperationUpdate } from "document-drive";
import { AddFileInput, DeleteNodeInput, DocumentDriveDocument, DocumentDriveState, ListenerFilter, actions } from "document-model-libs/document-drive";
import { CashGroupTransactionType, CreateFixedIncomeAssetInput, EditFixedIncomeAssetInput, RealWorldAssetsDocument, RealWorldAssetsState, utils } from "document-model-libs/real-world-assets"
Expand Down Expand Up @@ -41,9 +41,74 @@ export async function transmit(strands: InternalTransmitterUpdate<RealWorldAsset
}
}

async function updateEntireState(strand: InternalTransmitterUpdate<RealWorldAssetsDocument, "global">, prisma: Prisma.TransactionClient) {
const { transactions, principalLenderAccountId, fixedIncomeTypes, spvs, accounts, feeTypes, portfolio } = strand.state;
const { driveId, documentId } = strand;


async function handleDriveStrand(strand: InternalTransmitterUpdate<DocumentDriveDocument, "global">, prisma: Prisma.TransactionClient) {
logger.info("Received strand for drive");
if (strandStartsFromOpZero(strand)) {
await deleteDriveState(strand.state, prisma);
}

await doSurgicalDriveUpdate(strand, prisma);
}

function strandStartsFromOpZero(strand: InternalTransmitterUpdate<DocumentDriveDocument | RealWorldAssetsDocument, "global">) {
const resetNeeded = strand.operations.length > 0 && strand.operations[0].index === 0;
logger.info(`Reset needed: ${resetNeeded}`);
return resetNeeded;
}
async function doSurgicalDriveUpdate(strand: InternalTransmitterUpdate<DocumentDriveDocument, "global">, prisma: Prisma.TransactionClient) {
logger.info("Doing surgical drive update");
for (const operation of strand.operations) {
logger.info(`Operation: ${operation.type}`);
switch (operation.type) {
case "ADD_FILE":
const addFileInput = operation.input as AddFileInput;
if (addFileInput.documentType === "makerdao/rwa-portfolio") {
const result = await prisma.rWAPortfolio.create({
data: {
driveId: strand.driveId,
documentId: addFileInput.id,
principalLenderAccountId: "",
}
})
logger.info({ PortfolioID: result.id })
logger.info({ msg: "Adding file", operation });
}
break;
case "DELETE_NODE":
const deleteNodeInput = operation.input as DeleteNodeInput;
const driveId = strand.driveId;
logger.info(`Removing file ${deleteNodeInput.id} from ${driveId}`);
const result = await prisma.rWAPortfolio.deleteMany({
where: {
AND: {
documentId: deleteNodeInput.id,
driveId
}
}
})
logger.info(`Removed ${result.count} portfolios`);
logger.info({ msg: "Removing file", operation });
break;
default:
logger.info(`Ignoring operation ${operation.type}`);
break;
}
}
}

async function deleteDriveState(state: DocumentDriveState, prisma: Prisma.TransactionClient) {
logger.info("Deleting rwa read model");
await prisma.rWAPortfolio.deleteMany({
where: {
driveId: state.id
}
});
}

async function rebuildRwaPortfolio(driveId: string, documentId: string, state: RealWorldAssetsState, prisma: Prisma.TransactionClient) {
const { transactions, principalLenderAccountId, fixedIncomeTypes, spvs, accounts, feeTypes, portfolio } = state;
// create portfolio document
const portfolioEntity = await prisma.rWAPortfolio.upsert({
where: {
Expand Down Expand Up @@ -137,90 +202,6 @@ async function updateEntireState(strand: InternalTransmitterUpdate<RealWorldAsse
data: accounts.map((account) => ({ portfolioId: portfolioEntity.id, accountId: account.id })),
skipDuplicates: true,
});
}

async function analytics(prisma: Prisma.TransactionClient) {
// const data: Record<string, number> = {};
// for (const asset of portfolioEntity!.portfolio) {
// const type = asset.fixedIncomeType?.name;
// if (!type || asset.purchasePrice === null) {
// continue;
// }
// if (!data[type]) {
// data[type] = 0;
// }

// data[type] = data[type] + asset.purchasePrice;
// }

// console.log(data);
}
async function handleDriveStrand(strand: InternalTransmitterUpdate<DocumentDriveDocument, "global">, prisma: Prisma.TransactionClient) {
logger.info("Received strand for drive");
if (strandStartsFromOpZero(strand)) {
await deleteDriveState(strand.state, prisma);
}

await doSurgicalDriveUpdate(strand, prisma);
}

function strandStartsFromOpZero(strand: InternalTransmitterUpdate<DocumentDriveDocument | RealWorldAssetsDocument, "global">) {
const resetNeeded = strand.operations.length > 0 && strand.operations[0].index === 0;
logger.info(`Reset needed: ${resetNeeded}`);
return resetNeeded;
}
async function doSurgicalDriveUpdate(strand: InternalTransmitterUpdate<DocumentDriveDocument, "global">, prisma: Prisma.TransactionClient) {
logger.info("Doing surgical drive update");
for (const operation of strand.operations) {
logger.info(`Operation: ${operation.type}`);
switch (operation.type) {
case "ADD_FILE":
const addFileInput = operation.input as AddFileInput;
if (addFileInput.documentType === "makerdao/rwa-portfolio") {
const result = await prisma.rWAPortfolio.create({
data: {
driveId: strand.driveId,
documentId: addFileInput.id,
principalLenderAccountId: "",
}
})
logger.info({ PortfolioID: result.id })
logger.info({ msg: "Adding file", operation });
}
break;
case "DELETE_NODE":
const deleteNodeInput = operation.input as DeleteNodeInput;
const driveId = strand.driveId;
logger.info(`Removing file ${deleteNodeInput.id} from ${driveId}`);
const result = await prisma.rWAPortfolio.deleteMany({
where: {
AND: {
documentId: deleteNodeInput.id,
driveId
}
}
})
logger.info(`Removed ${result.count} portfolios`);
logger.info({ msg: "Removing file", operation });
break;
default:
logger.info(`Ignoring operation ${operation.type}`);
break;
}
}
}

async function deleteDriveState(state: DocumentDriveState, prisma: Prisma.TransactionClient) {
logger.info("Deleting rwa read model");
await prisma.rWAPortfolio.deleteMany({
where: {
driveId: state.id
}
});
}

async function rebuildRwaPortfolio(driveId: string, documentId: string, state: RealWorldAssetsState) {
logger.info("Rebuilding rwa portfolio");

}

Expand All @@ -235,53 +216,58 @@ async function rwaPortfolioExists(driveId: string, documentId: string, prisma: P
return !!portfolio;
}

const surgicalOperations: Record<string, (input: any, portfolio: RWAPortfolio, prisma: Prisma.TransactionClient) => Promise<void>> = {
"CREATE_FIXED_INCOME_ASSET": async (input: CreateFixedIncomeAssetInput, portfolio: RWAPortfolio, prisma: Prisma.TransactionClient) => {
await prisma.rWAPortfolioAsset.create({
data: {
...input,
assetRefId: input.id,
portfolioId: portfolio.id,
assetType: "FixedIncome"
}

});

logger.info({ msg: "Creating fixed income asset", input });
},
}

async function handleRwaDocumentStrand(strand: InternalTransmitterUpdate<RealWorldAssetsDocument, "global">, prisma: Prisma.TransactionClient) {
logger.info(`Received strand for document ${strand.documentId} with operations: ${strand.operations.map(op => op.type).join(", ")}`);
if (!await rwaPortfolioExists(strand.driveId, strand.documentId, prisma)) {
logger.info(`Skipping strand for document ${strand.documentId} as it doesn't exist in the read model`);
return;
}

const surgicalOperations: Record<string, (input: any) => void> = {
"CREATE_FIXED_INCOME_ASSET": (input: CreateFixedIncomeAssetInput) => {
logger.info({ msg: "Creating fixed income asset", input });
},
}



if (strandStartsFromOpZero(strand) || !allOperationsAreSurgical(strand, surgicalOperations)) {
// await deleteDriveState(strand.state, prisma);
await rebuildRwaPortfolio(strand.driveId, strand.documentId, strand.state);
await rebuildRwaPortfolio(strand.driveId, strand.documentId, strand.state, prisma);
} else {
const portfolio = await prisma.rWAPortfolio.findFirst({
where: {
driveId: strand.driveId,
documentId: strand.documentId
}
})
if (!portfolio) {
logger.error({ msg: "Portfolio not found", driveId: strand.driveId, documentId: strand.documentId });
return;
}
for (const operation of strand.operations) {
await doSurgicalRwaPortfolioUpdate(operation, prisma);
await doSurgicalRwaPortfolioUpdate(operation, portfolio, prisma);
}

}

// TODO: come up with a better idea to check whether strandupdate is part of real world assets
//TODO: check whether all operations can be applied individually otherwise delete database and insert everything new
let operationsCanbeAppliedIndividually = false;
for (const operation of strand.operations) {
// if undo/redo operation is present then delete database and insert everything new

}

// // update entire state if operations can't be applied individually
// if (!operationsCanbeAppliedIndividually) {
// await updateEntireState(strand, prisma);
// }

// await analytics(prisma);
}

function doSurgicalRwaPortfolioUpdate(operation: OperationUpdate, prisma: Prisma.TransactionClient) {
function doSurgicalRwaPortfolioUpdate(operation: OperationUpdate, portfolio: RWAPortfolio, prisma: Prisma.TransactionClient) {
logger.info({ msg: "Doing surgical rwa portfolio update", name: operation.type });

return;
}

function allOperationsAreSurgical(strand: InternalTransmitterUpdate<RealWorldAssetsDocument, "global">, surgicalOperations: Record<string, (input: any) => void>) {
function allOperationsAreSurgical(strand: InternalTransmitterUpdate<RealWorldAssetsDocument, "global">, surgicalOperations: Record<string, (input: any, portfolio: RWAPortfolio, prisma: Prisma.TransactionClient) => void>) {
const allOperationsAreSurgical = strand.operations.filter(op => surgicalOperations[op.type] === undefined).length === 0;
logger.info(`All operations are surgical: ${allOperationsAreSurgical}`);
return allOperationsAreSurgical
Expand Down
2 changes: 1 addition & 1 deletion api/src/modules/real-world-assets/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export function getRWACRUD(prisma: Prisma.TransactionClient) {

try {
const portfolios = await prisma.rWAPortfolio.findMany({
where: newWhere, include: {
include: {
spvs: {
include: { spv: true }
},
Expand Down
5 changes: 4 additions & 1 deletion api/src/modules/real-world-assets/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
export function transformPortfolioToState(portfolios) {
import { RWAPortfolio } from "@prisma/client";

export function transformPortfolioToState(portfolios: RWAPortfolio[]) {
console.log(portfolios);
return portfolios.map(portfolio => ({
id: portfolio.id,
name: portfolio.name,
Expand Down

0 comments on commit 8126702

Please sign in to comment.