Skip to content

Commit

Permalink
chore: standardise position prepare handler (#1005)
Browse files Browse the repository at this point in the history
* feat: added fx-position-prepare capability to batch handler

* chore: reverted fx implementation in non batch mode

* chore: added unit tests

* chore: dep, audit and lint

---------

Co-authored-by: Kevin Leyow <[email protected]>
  • Loading branch information
vijayg10 and kleyow authored Apr 23, 2024
1 parent d917f00 commit cf80376
Show file tree
Hide file tree
Showing 8 changed files with 1,222 additions and 353 deletions.
26 changes: 13 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.3.0",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.2.1-snapshot.1",
"@mojaloop/central-services-shared": "18.4.0-snapshot.10",
"@mojaloop/central-services-stream": "11.2.4",
"@mojaloop/event-sdk": "14.0.2",
"@mojaloop/ml-number": "11.2.3",
Expand Down
200 changes: 134 additions & 66 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const Logger = require('@mojaloop/central-services-logger')
const BatchPositionModel = require('../../models/position/batch')
const BatchPositionModelCached = require('../../models/position/batchCached')
const PositionPrepareDomain = require('./prepare')
const PositionFxPrepareDomain = require('./fx-prepare')
const PositionFulfilDomain = require('./fulfil')
const SettlementModelCached = require('../../models/settlement/settlementModelCached')
const Enum = require('@mojaloop/central-services-shared').Enum
Expand All @@ -52,75 +53,25 @@ const participantFacade = require('../../models/participant/facade')
* @returns {results} - Returns a list of bins with results or throws an error if failed
*/
const processBins = async (bins, trx) => {
const transferIdList = []
const reservedActionTransferIdList = []
await iterateThroughBins(bins, (_accountID, action, item) => {
if (item.decodedPayload?.transferId) {
transferIdList.push(item.decodedPayload.transferId)
// get transferId from uriParams for fulfil messages
} else if (item.message?.value?.content?.uriParams?.id) {
transferIdList.push(item.message.value.content.uriParams.id)
if (action === Enum.Events.Event.Action.RESERVE) {
reservedActionTransferIdList.push(item.message.value.content.uriParams.id)
}
}
})
// Get transferIdList, reservedActionTransferIdList and commitRequestId for actions PREPARE, FX_PREPARE, FX_RESERVE, COMMIT and RESERVE
const { transferIdList, reservedActionTransferIdList, commitRequestIdList } = await _getTransferIdList(bins)

// Pre fetch latest transferStates for all the transferIds in the account-bin
const latestTransferStateChanges = await BatchPositionModel.getLatestTransferStateChangesByTransferIdList(trx, transferIdList)
const latestTransferStates = {}
for (const key in latestTransferStateChanges) {
latestTransferStates[key] = latestTransferStateChanges[key].transferStateId
}
const latestTransferStates = await _fetchLatestTransferStates(trx, transferIdList)

// Pre fetch latest fxTransferStates for all the commitRequestIds in the account-bin
const latestFxTransferStates = await _fetchLatestFxTransferStates(trx, commitRequestIdList)

const accountIds = Object.keys(bins)

// Pre fetch all settlement accounts corresponding to the position accounts
// Get all participantIdMap for the accountIds
const participantCurrencyIds = await BatchPositionModelCached.getParticipantCurrencyByIds(trx, accountIds)

// Validate that participantCurrencyIds exist for each of the accountIds
// i.e every unique accountId has a corresponding entry in participantCurrencyIds
const participantIdsHavingCurrencyIdsList = [...new Set(participantCurrencyIds.map(item => item.participantCurrencyId))]
const allAccountIdsHaveParticipantCurrencyIds = accountIds.every(accountId => {
return participantIdsHavingCurrencyIdsList.includes(Number(accountId))
})
if (!allAccountIdsHaveParticipantCurrencyIds) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, 'Not all accountIds have corresponding participantCurrencyIds')
}
const participantCurrencyIds = await _getParticipantCurrencyIds(trx, accountIds)

// Pre fetch all settlement accounts corresponding to the position accounts
const allSettlementModels = await SettlementModelCached.getAll()

// Construct objects participantIdMap, accountIdMap and currencyIdMap
const participantIdMap = {}
const accountIdMap = {}
const currencyIdMap = {}
for (const item of participantCurrencyIds) {
const { participantId, currencyId, participantCurrencyId } = item
if (!participantIdMap[participantId]) {
participantIdMap[participantId] = {}
}
if (!currencyIdMap[currencyId]) {
currencyIdMap[currencyId] = {
settlementModel: _getSettlementModelForCurrency(currencyId, allSettlementModels)
}
}
participantIdMap[participantId][currencyId] = participantCurrencyId
accountIdMap[participantCurrencyId] = { participantId, currencyId }
}

// Get all participantCurrencyIds for the participantIdMap
const allParticipantCurrencyIds = await BatchPositionModelCached.getParticipantCurrencyByParticipantIds(trx, Object.keys(participantIdMap))
const settlementCurrencyIds = []
for (const pc of allParticipantCurrencyIds) {
const correspondingParticipantCurrencyId = participantIdMap[pc.participantId][pc.currencyId]
if (correspondingParticipantCurrencyId) {
const settlementModel = currencyIdMap[pc.currencyId].settlementModel
if (pc.ledgerAccountTypeId === settlementModel.settlementAccountTypeId) {
settlementCurrencyIds.push(pc)
accountIdMap[correspondingParticipantCurrencyId].settlementCurrencyId = pc.participantCurrencyId
}
}
}
const { settlementCurrencyIds, accountIdMap, currencyIdMap } = await _constructRequiredMaps(participantCurrencyIds, allSettlementModels, trx)

// Pre fetch all position account balances for the account-bin and acquire lock on position
const positions = await BatchPositionModel.getPositionsByAccountIdsForUpdate(trx, [
Expand Down Expand Up @@ -152,9 +103,8 @@ const processBins = async (bins, trx) => {
array2.every((element) => array1.includes(element))
// If non-prepare/non-commit action found, log error
// We need to remove this once we implement all the actions
if (!isSubset(['prepare', 'commit', 'reserve'], actions)) {
Logger.isErrorEnabled && Logger.error('Only prepare/commit actions are allowed in a batch')
// throw new Error('Only prepare action is allowed in a batch')
if (!isSubset([Enum.Events.Event.Action.PREPARE, Enum.Events.Event.Action.FX_PREPARE, Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.RESERVE], actions)) {
Logger.isErrorEnabled && Logger.error('Only prepare/fx-prepare/commit actions are allowed in a batch')
}

const settlementParticipantPosition = positions[accountIdMap[accountID].settlementCurrencyId].value
Expand All @@ -172,7 +122,9 @@ const processBins = async (bins, trx) => {
let accumulatedPositionValue = positions[accountID].value
let accumulatedPositionReservedValue = positions[accountID].reservedValue
let accumulatedTransferStates = latestTransferStates
const accumulatedFxTransferStates = latestFxTransferStates
let accumulatedTransferStateChanges = []
let accumulatedFxTransferStateChanges = []
let accumulatedPositionChanges = []

// If fulfil action found then call processPositionPrepareBin function
Expand Down Expand Up @@ -214,19 +166,47 @@ const processBins = async (bins, trx) => {
accumulatedPositionChanges = accumulatedPositionChanges.concat(prepareActionResult.accumulatedPositionChanges)
notifyMessages = notifyMessages.concat(prepareActionResult.notifyMessages)

// If fx-prepare action found then call processPositionFxPrepareBin function
const fxPrepareActionResult = await PositionFxPrepareDomain.processFxPositionPrepareBin(
accountBin[Enum.Events.Event.Action.FX_PREPARE],
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedFxTransferStates,
settlementParticipantPosition,
participantLimit
)

// Update accumulated values
accumulatedPositionValue = fxPrepareActionResult.accumulatedPositionValue
accumulatedPositionReservedValue = fxPrepareActionResult.accumulatedPositionReservedValue
accumulatedTransferStates = fxPrepareActionResult.accumulatedTransferStates
// Append accumulated arrays
accumulatedFxTransferStateChanges = accumulatedFxTransferStateChanges.concat(fxPrepareActionResult.accumulatedFxTransferStateChanges)
accumulatedPositionChanges = accumulatedPositionChanges.concat(fxPrepareActionResult.accumulatedPositionChanges)
notifyMessages = notifyMessages.concat(fxPrepareActionResult.notifyMessages)

// Update accumulated position values by calling a facade function
await BatchPositionModel.updateParticipantPosition(trx, positions[accountID].participantPositionId, accumulatedPositionValue, accumulatedPositionReservedValue)

// Bulk insert accumulated transferStateChanges by calling a facade function
await BatchPositionModel.bulkInsertTransferStateChanges(trx, accumulatedTransferStateChanges)
// Bulk insert accumulated fxTransferStateChanges by calling a facade function
await BatchPositionModel.bulkInsertFxTransferStateChanges(trx, accumulatedFxTransferStateChanges)

// Bulk get the transferStateChangeIds for transferids using select whereIn
const fetchedTransferStateChanges = await BatchPositionModel.getLatestTransferStateChangesByTransferIdList(trx, accumulatedTransferStateChanges.map(item => item.transferId))
// Mutate accumulated positionChanges with transferStateChangeIds
// Bulk get the fxTransferStateChangeIds for commitRequestId using select whereIn
const fetchedFxTransferStateChanges = await BatchPositionModel.getLatestFxTransferStateChangesByCommitRequestIdList(trx, accumulatedFxTransferStateChanges.map(item => item.commitRequestId))
// Mutate accumulated positionChanges with transferStateChangeIds and fxTransferStateChangeIds
for (const positionChange of accumulatedPositionChanges) {
positionChange.transferStateChangeId = fetchedTransferStateChanges[positionChange.transferId].transferStateChangeId
if (positionChange.transferId) {
positionChange.transferStateChangeId = fetchedTransferStateChanges[positionChange.transferId].transferStateChangeId
delete positionChange.transferId
} else if (positionChange.commitRequestId) {
positionChange.fxTransferStateChangeId = fetchedFxTransferStateChanges[positionChange.commitRequestId].fxTransferStateChangeId
delete positionChange.commitRequestId
}
positionChange.participantPositionId = positions[accountID].participantPositionId
delete positionChange.transferId
}
// Bulk insert accumulated positionChanges by calling a facade function
await BatchPositionModel.bulkInsertParticipantPositionChanges(trx, accumulatedPositionChanges)
Expand Down Expand Up @@ -285,6 +265,94 @@ const _getSettlementModelForCurrency = (currencyId, allSettlementModels) => {
return settlementModels.find(sm => sm.ledgerAccountTypeId === Enum.Accounts.LedgerAccountType.POSITION)
}

const _getTransferIdList = async (bins) => {
const transferIdList = []
const reservedActionTransferIdList = []
const commitRequestIdList = []
await iterateThroughBins(bins, (_accountID, action, item) => {
if (action === Enum.Events.Event.Action.PREPARE) {
transferIdList.push(item.decodedPayload.transferId)
} else if (action === Enum.Events.Event.Action.FULFIL) {
transferIdList.push(item.message.value.content.uriParams.id)
} else if (action === Enum.Events.Event.Action.RESERVE) {
transferIdList.push(item.message.value.content.uriParams.id)
reservedActionTransferIdList.push(item.message.value.content.uriParams.id)
} else if (action === Enum.Events.Event.Action.FX_PREPARE) {
commitRequestIdList.push(item.decodedPayload.commitRequestId)
} else if (action === Enum.Events.Event.Action.FX_RESERVE) {
commitRequestIdList.push(item.message.value.content.uriParams.id)
}
})
return { transferIdList, reservedActionTransferIdList, commitRequestIdList }
}

const _fetchLatestTransferStates = async (trx, transferIdList) => {
const latestTransferStateChanges = await BatchPositionModel.getLatestTransferStateChangesByTransferIdList(trx, transferIdList)
const latestTransferStates = {}
for (const key in latestTransferStateChanges) {
latestTransferStates[key] = latestTransferStateChanges[key].transferStateId
}
return latestTransferStates
}

const _fetchLatestFxTransferStates = async (trx, commitRequestIdList) => {
const latestFxTransferStateChanges = await BatchPositionModel.getLatestFxTransferStateChangesByCommitRequestIdList(trx, commitRequestIdList)
const latestFxTransferStates = {}
for (const key in latestFxTransferStateChanges) {
latestFxTransferStates[key] = latestFxTransferStateChanges[key].transferStateId
}
return latestFxTransferStates
}

const _getParticipantCurrencyIds = async (trx, accountIds) => {
const participantCurrencyIds = await BatchPositionModelCached.getParticipantCurrencyByIds(trx, accountIds)

// Validate that participantCurrencyIds exist for each of the accountIds
// i.e every unique accountId has a corresponding entry in participantCurrencyIds
const participantIdsHavingCurrencyIdsList = [...new Set(participantCurrencyIds.map(item => item.participantCurrencyId))]
const allAccountIdsHaveParticipantCurrencyIds = accountIds.every(accountId => {
return participantIdsHavingCurrencyIdsList.includes(Number(accountId))
})
if (!allAccountIdsHaveParticipantCurrencyIds) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, 'Not all accountIds have corresponding participantCurrencyIds')
}
return participantCurrencyIds
}

const _constructRequiredMaps = async (participantCurrencyIds, allSettlementModels, trx) => {
const participantIdMap = {}
const accountIdMap = {}
const currencyIdMap = {}
for (const item of participantCurrencyIds) {
const { participantId, currencyId, participantCurrencyId } = item
if (!participantIdMap[participantId]) {
participantIdMap[participantId] = {}
}
if (!currencyIdMap[currencyId]) {
currencyIdMap[currencyId] = {
settlementModel: _getSettlementModelForCurrency(currencyId, allSettlementModels)
}
}
participantIdMap[participantId][currencyId] = participantCurrencyId
accountIdMap[participantCurrencyId] = { participantId, currencyId }
}

// Get all participantCurrencyIds for the participantIdMap
const allParticipantCurrencyIds = await BatchPositionModelCached.getParticipantCurrencyByParticipantIds(trx, Object.keys(participantIdMap))
const settlementCurrencyIds = []
for (const pc of allParticipantCurrencyIds) {
const correspondingParticipantCurrencyId = participantIdMap[pc.participantId][pc.currencyId]
if (correspondingParticipantCurrencyId) {
const settlementModel = currencyIdMap[pc.currencyId].settlementModel
if (pc.ledgerAccountTypeId === settlementModel.settlementAccountTypeId) {
settlementCurrencyIds.push(pc)
accountIdMap[correspondingParticipantCurrencyId].settlementCurrencyId = pc.participantCurrencyId
}
}
}
return { settlementCurrencyIds, accountIdMap, currencyIdMap }
}

module.exports = {
processBins,
iterateThroughBins
Expand Down
Loading

0 comments on commit cf80376

Please sign in to comment.