diff --git a/api/src/modules/real-world-assets/listener.ts b/api/src/modules/real-world-assets/listener.ts index 14ea6bbe..e24a1e85 100644 --- a/api/src/modules/real-world-assets/listener.ts +++ b/api/src/modules/real-world-assets/listener.ts @@ -1,4 +1,4 @@ -import { Prisma } from "@prisma/client"; +import { Prisma, RWAPortfolio } from "@prisma/client"; import { InternalTransmitterUpdate, OperationUpdate } from "document-drive"; import { AddFileInput, DeleteNodeInput, DocumentDriveDocument, DocumentDriveState, ListenerFilter, actions } from "document-model-libs/document-drive"; import { CashGroupTransactionType, CreateFixedIncomeAssetInput, EditFixedIncomeAssetInput, RealWorldAssetsDocument, RealWorldAssetsState, utils } from "document-model-libs/real-world-assets" @@ -41,9 +41,74 @@ export async function transmit(strands: InternalTransmitterUpdate, prisma: Prisma.TransactionClient) { - const { transactions, principalLenderAccountId, fixedIncomeTypes, spvs, accounts, feeTypes, portfolio } = strand.state; - const { driveId, documentId } = strand; + + +async function handleDriveStrand(strand: InternalTransmitterUpdate, prisma: Prisma.TransactionClient) { + logger.info("Received strand for drive"); + if (strandStartsFromOpZero(strand)) { + await deleteDriveState(strand.state, prisma); + } + + await doSurgicalDriveUpdate(strand, prisma); +} + +function strandStartsFromOpZero(strand: InternalTransmitterUpdate) { + const resetNeeded = strand.operations.length > 0 && strand.operations[0].index === 0; + logger.info(`Reset needed: ${resetNeeded}`); + return resetNeeded; +} +async function doSurgicalDriveUpdate(strand: InternalTransmitterUpdate, prisma: Prisma.TransactionClient) { + logger.info("Doing surgical drive update"); + for (const operation of strand.operations) { + logger.info(`Operation: ${operation.type}`); + switch (operation.type) { + case "ADD_FILE": + const addFileInput = operation.input as AddFileInput; + if (addFileInput.documentType === "makerdao/rwa-portfolio") { + const result = await prisma.rWAPortfolio.create({ + data: { + driveId: strand.driveId, + documentId: addFileInput.id, + principalLenderAccountId: "", + } + }) + logger.info({ PortfolioID: result.id }) + logger.info({ msg: "Adding file", operation }); + } + break; + case "DELETE_NODE": + const deleteNodeInput = operation.input as DeleteNodeInput; + const driveId = strand.driveId; + logger.info(`Removing file ${deleteNodeInput.id} from ${driveId}`); + const result = await prisma.rWAPortfolio.deleteMany({ + where: { + AND: { + documentId: deleteNodeInput.id, + driveId + } + } + }) + logger.info(`Removed ${result.count} portfolios`); + logger.info({ msg: "Removing file", operation }); + break; + default: + logger.info(`Ignoring operation ${operation.type}`); + break; + } + } +} + +async function deleteDriveState(state: DocumentDriveState, prisma: Prisma.TransactionClient) { + logger.info("Deleting rwa read model"); + await prisma.rWAPortfolio.deleteMany({ + where: { + driveId: state.id + } + }); +} + +async function rebuildRwaPortfolio(driveId: string, documentId: string, state: RealWorldAssetsState, prisma: Prisma.TransactionClient) { + const { transactions, principalLenderAccountId, fixedIncomeTypes, spvs, accounts, feeTypes, portfolio } = state; // create portfolio document const portfolioEntity = await prisma.rWAPortfolio.upsert({ where: { @@ -137,90 +202,6 @@ async function updateEntireState(strand: InternalTransmitterUpdate ({ portfolioId: portfolioEntity.id, accountId: account.id })), skipDuplicates: true, }); -} - -async function analytics(prisma: Prisma.TransactionClient) { - // const data: Record = {}; - // for (const asset of portfolioEntity!.portfolio) { - // const type = asset.fixedIncomeType?.name; - // if (!type || asset.purchasePrice === null) { - // continue; - // } - // if (!data[type]) { - // data[type] = 0; - // } - - // data[type] = data[type] + asset.purchasePrice; - // } - - // console.log(data); -} -async function handleDriveStrand(strand: InternalTransmitterUpdate, prisma: Prisma.TransactionClient) { - logger.info("Received strand for drive"); - if (strandStartsFromOpZero(strand)) { - await deleteDriveState(strand.state, prisma); - } - - await doSurgicalDriveUpdate(strand, prisma); -} - -function strandStartsFromOpZero(strand: InternalTransmitterUpdate) { - const resetNeeded = strand.operations.length > 0 && strand.operations[0].index === 0; - logger.info(`Reset needed: ${resetNeeded}`); - return resetNeeded; -} -async function doSurgicalDriveUpdate(strand: InternalTransmitterUpdate, prisma: Prisma.TransactionClient) { - logger.info("Doing surgical drive update"); - for (const operation of strand.operations) { - logger.info(`Operation: ${operation.type}`); - switch (operation.type) { - case "ADD_FILE": - const addFileInput = operation.input as AddFileInput; - if (addFileInput.documentType === "makerdao/rwa-portfolio") { - const result = await prisma.rWAPortfolio.create({ - data: { - driveId: strand.driveId, - documentId: addFileInput.id, - principalLenderAccountId: "", - } - }) - logger.info({ PortfolioID: result.id }) - logger.info({ msg: "Adding file", operation }); - } - break; - case "DELETE_NODE": - const deleteNodeInput = operation.input as DeleteNodeInput; - const driveId = strand.driveId; - logger.info(`Removing file ${deleteNodeInput.id} from ${driveId}`); - const result = await prisma.rWAPortfolio.deleteMany({ - where: { - AND: { - documentId: deleteNodeInput.id, - driveId - } - } - }) - logger.info(`Removed ${result.count} portfolios`); - logger.info({ msg: "Removing file", operation }); - break; - default: - logger.info(`Ignoring operation ${operation.type}`); - break; - } - } -} - -async function deleteDriveState(state: DocumentDriveState, prisma: Prisma.TransactionClient) { - logger.info("Deleting rwa read model"); - await prisma.rWAPortfolio.deleteMany({ - where: { - driveId: state.id - } - }); -} - -async function rebuildRwaPortfolio(driveId: string, documentId: string, state: RealWorldAssetsState) { - logger.info("Rebuilding rwa portfolio"); } @@ -235,6 +216,22 @@ async function rwaPortfolioExists(driveId: string, documentId: string, prisma: P return !!portfolio; } +const surgicalOperations: Record Promise> = { + "CREATE_FIXED_INCOME_ASSET": async (input: CreateFixedIncomeAssetInput, portfolio: RWAPortfolio, prisma: Prisma.TransactionClient) => { + await prisma.rWAPortfolioAsset.create({ + data: { + ...input, + assetRefId: input.id, + portfolioId: portfolio.id, + assetType: "FixedIncome" + } + + }); + + logger.info({ msg: "Creating fixed income asset", input }); + }, +} + async function handleRwaDocumentStrand(strand: InternalTransmitterUpdate, prisma: Prisma.TransactionClient) { logger.info(`Received strand for document ${strand.documentId} with operations: ${strand.operations.map(op => op.type).join(", ")}`); if (!await rwaPortfolioExists(strand.driveId, strand.documentId, prisma)) { @@ -242,46 +239,35 @@ async function handleRwaDocumentStrand(strand: InternalTransmitterUpdate void> = { - "CREATE_FIXED_INCOME_ASSET": (input: CreateFixedIncomeAssetInput) => { - logger.info({ msg: "Creating fixed income asset", input }); - }, - } - - if (strandStartsFromOpZero(strand) || !allOperationsAreSurgical(strand, surgicalOperations)) { // await deleteDriveState(strand.state, prisma); - await rebuildRwaPortfolio(strand.driveId, strand.documentId, strand.state); + await rebuildRwaPortfolio(strand.driveId, strand.documentId, strand.state, prisma); } else { + const portfolio = await prisma.rWAPortfolio.findFirst({ + where: { + driveId: strand.driveId, + documentId: strand.documentId + } + }) + if (!portfolio) { + logger.error({ msg: "Portfolio not found", driveId: strand.driveId, documentId: strand.documentId }); + return; + } for (const operation of strand.operations) { - await doSurgicalRwaPortfolioUpdate(operation, prisma); + await doSurgicalRwaPortfolioUpdate(operation, portfolio, prisma); } } - - // TODO: come up with a better idea to check whether strandupdate is part of real world assets - //TODO: check whether all operations can be applied individually otherwise delete database and insert everything new - let operationsCanbeAppliedIndividually = false; - for (const operation of strand.operations) { - // if undo/redo operation is present then delete database and insert everything new - - } - - // // update entire state if operations can't be applied individually - // if (!operationsCanbeAppliedIndividually) { - // await updateEntireState(strand, prisma); - // } - - // await analytics(prisma); } -function doSurgicalRwaPortfolioUpdate(operation: OperationUpdate, prisma: Prisma.TransactionClient) { +function doSurgicalRwaPortfolioUpdate(operation: OperationUpdate, portfolio: RWAPortfolio, prisma: Prisma.TransactionClient) { logger.info({ msg: "Doing surgical rwa portfolio update", name: operation.type }); + return; } -function allOperationsAreSurgical(strand: InternalTransmitterUpdate, surgicalOperations: Record void>) { +function allOperationsAreSurgical(strand: InternalTransmitterUpdate, surgicalOperations: Record void>) { const allOperationsAreSurgical = strand.operations.filter(op => surgicalOperations[op.type] === undefined).length === 0; logger.info(`All operations are surgical: ${allOperationsAreSurgical}`); return allOperationsAreSurgical diff --git a/api/src/modules/real-world-assets/model.ts b/api/src/modules/real-world-assets/model.ts index 2cdc6213..e1483e34 100644 --- a/api/src/modules/real-world-assets/model.ts +++ b/api/src/modules/real-world-assets/model.ts @@ -10,7 +10,7 @@ export function getRWACRUD(prisma: Prisma.TransactionClient) { try { const portfolios = await prisma.rWAPortfolio.findMany({ - where: newWhere, include: { + include: { spvs: { include: { spv: true } }, diff --git a/api/src/modules/real-world-assets/utils.ts b/api/src/modules/real-world-assets/utils.ts index 2e5f8ae8..e6438854 100644 --- a/api/src/modules/real-world-assets/utils.ts +++ b/api/src/modules/real-world-assets/utils.ts @@ -1,4 +1,7 @@ -export function transformPortfolioToState(portfolios) { +import { RWAPortfolio } from "@prisma/client"; + +export function transformPortfolioToState(portfolios: RWAPortfolio[]) { + console.log(portfolios); return portfolios.map(portfolio => ({ id: portfolio.id, name: portfolio.name,