Skip to content

Commit

Permalink
feat: add position prepare bin logic (#970)
Browse files Browse the repository at this point in the history
* feat: add position prepare bin logic

* edit

* lint

* dep

* chore: changes

* chore: commet

* lint

* chore: refactor

* chore: unit tests

* chore: change

* chore: comment

* chore: test

* chore: more test

* chore: test?

* chore: remove

* chore: rework domain logic

* chore: test

* fix: position calculation

* fix: position change data

* fix; prepare domain function

* chore: fix

---------

Co-authored-by: Vijay <[email protected]>
  • Loading branch information
kleyow and vijayg10 authored Sep 14, 2023
1 parent 98c48bc commit 6318ce1
Show file tree
Hide file tree
Showing 7 changed files with 16,076 additions and 1,069 deletions.
15,849 changes: 14,797 additions & 1,052 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@
"@mojaloop/central-services-health": "14.0.2",
"@mojaloop/central-services-logger": "11.2.2",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.1.0",
"@mojaloop/central-services-shared": "18.1.1",
"@mojaloop/central-services-stream": "11.1.1",
"@mojaloop/event-sdk": "12.0.2",
"@mojaloop/event-sdk": "13.0.0",
"@mojaloop/ml-number": "11.2.3",
"@mojaloop/object-store-lib": "12.0.1",
"@now-ims/hapi-now-auth": "2.1.0",
Expand Down Expand Up @@ -129,12 +129,12 @@
"jsdoc": "4.0.2",
"jsonpath": "1.1.1",
"nodemon": "3.0.1",
"npm-check-updates": "16.13.3",
"npm-check-updates": "16.14.2",
"nyc": "15.1.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
"replace": "^1.2.2",
"sinon": "15.2.0",
"sinon": "16.0.0",
"standard": "17.1.0",
"standard-version": "^9.5.0",
"tap-spec": "^5.0.0",
Expand Down
5 changes: 3 additions & 2 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ const participantFacade = require('../../models/participant/facade')
const processBins = async (bins, trx) => {
const transferIdList = []
await iterateThroughBins(bins, async (item) => {
if (item.message.value.id) {
transferIdList.push(item.message.value.id)
if (item.message.value.content?.payload?.transferId) {
transferIdList.push(item.message.value.content.payload.transferId)
}
})
// Pre fetch latest transferStates for all the transferIds in the account-bin
Expand Down Expand Up @@ -172,6 +172,7 @@ const processBins = async (bins, trx) => {
// Bulk insert accumulated positionChanges by calling a facade function
await BatchPositionModel.bulkInsertParticipantPositionChanges(trx, accumulatedPositionChanges)

// testing: await trx.rollback()
limitAlarms = limitAlarms.concat(prepareActionResult.limitAlarms)
}

Expand Down
245 changes: 234 additions & 11 deletions src/domain/position/prepare.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,244 @@
const { Enum } = require('@mojaloop/central-services-shared')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Config = require('../../lib/config')
const Utility = require('@mojaloop/central-services-shared').Util
const resourceVersions = require('@mojaloop/central-services-shared').Util.resourceVersions
const MLNumber = require('@mojaloop/ml-number')
const Logger = require('@mojaloop/central-services-logger')
const decodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodePayload

/**
* @function processPositionPrepareBin
*
* @async
* @description This is the domain function to process a bin of position-prepare messages of a single participant account.
*
* @param {array} messages - a list of messages to consume for the relevant topic
* @param {number} accumulatedPositionValue - value of position accumulated so far
* @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far
* @param {number} settlementPositionValue - value of settlement position to be used for liquidity check
* @param {number} settlementModelDelay - settlement model delay (IMMEDIATE or DEFERRED)
* @param {number} participantLimitValue - NDC limit of participant
* @param {array} accumulatedTransferStateChanges - list of accumulated transfer state changes
*
* @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges or throws an error if failed
* @param {array} binItems - an array of objects that contain a position prepare message and its span. {message, span}
* @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing
* @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency
* @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output.
* @param {number} settlementParticipantPosition - position value of the participants settlement account
* @param {object} settlementModel - settlement model object for the currency
* @param {object} participantLimit - participant limit object for the currency
* @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed
*/
const processPositionPrepareBin = async (messages, accumulatedPositionValue, accumulatedPositionReservedValue, settlementPositionValue, settlementModelDelay, participantLimitValue, accumulatedTransferStateChanges) => {
// TODO: Implement processPositionPrepareBin
const processPositionPrepareBin = async (
binItems,
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
settlementParticipantPosition,
settlementModel,
participantLimit
) => {
const transferStateChanges = []
const participantPositionChanges = []
const resultMessages = []
const limitAlarms = []
const accumulatedTransferStatesCopy = Object.assign({}, accumulatedTransferStates)

let currentPosition = new MLNumber(accumulatedPositionValue)
const reservedPosition = new MLNumber(accumulatedPositionReservedValue)
const effectivePosition = new MLNumber(currentPosition.add(reservedPosition).toFixed(Config.AMOUNT.SCALE))
const liquidityCover = new MLNumber(settlementParticipantPosition).multiply(-1)
const payerLimit = new MLNumber(participantLimit.value)
let availablePositionBasedOnLiquidityCover = new MLNumber(liquidityCover.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE))
Logger.isInfoEnabled && Logger.info(`processPositionPrepareBin::availablePositionBasedOnLiquidityCover: ${availablePositionBasedOnLiquidityCover}`)
let availablePositionBasedOnPayerLimit = new MLNumber(payerLimit.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE))
Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::availablePositionBasedOnPayerLimit: ${availablePositionBasedOnPayerLimit}`)

for (const binItem of binItems) {
let transferStateId
let reason
let resultMessage
const transfer = decodePayload(binItem.message.value.content.payload)
Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::transfer:processingMessage: ${JSON.stringify(transfer)}`)

// Check if transfer is in correct state for processing, produce an internal error message
if (accumulatedTransferStates[transfer.transferId] !== Enum.Transfers.TransferInternalState.RECEIVED_PREPARE) {
Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::transferState: ${accumulatedTransferStates[transfer.transferId]} !== ${Enum.Transfers.TransferInternalState.RECEIVED_PREPARE}`)

transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
reason = 'Transfer in incorrect state'

const headers = Utility.Http.SwitchDefaultHeaders(
transfer.payerFsp,
Enum.Http.HeaderResources.TRANSFERS,
Enum.Http.Headers.FSPIOP.SWITCH.value,
resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion
)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR
).toApiErrorObject(Config.ERROR_HANDLING)
const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
fspiopError.errorInformation.errorCode,
fspiopError.errorInformation.errorDescription
)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transfer.transferId,
Enum.Kafka.Topics.NOTIFICATION,
Enum.Events.Event.Action.PREPARE,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payeeFsp,
transfer.payerFsp,
metadata,
headers,
fspiopError,
{ id: transfer.transferId },
'application/json'
)
// Check if payer has insufficient liquidity, produce an error message and abort transfer
} else if (availablePositionBasedOnLiquidityCover.toNumber() < transfer.amount.amount) {
transferStateId = Enum.Transfers.TransferState.ABORTED
reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY.message

const headers = Utility.Http.SwitchDefaultHeaders(
transfer.payerFsp,
Enum.Http.HeaderResources.TRANSFERS,
Enum.Http.Headers.FSPIOP.SWITCH.value,
resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion
)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY
).toApiErrorObject(Config.ERROR_HANDLING)
const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
fspiopError.errorInformation.errorCode,
fspiopError.errorInformation.errorDescription
)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transfer.transferId,
Enum.Kafka.Topics.NOTIFICATION,
Enum.Events.Event.Action.PREPARE,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payeeFsp,
transfer.payerFsp,
metadata,
headers,
fspiopError,
{ id: transfer.transferId },
'application/json'
)
// Check if payer has surpassed their limit, produce an error message and abort transfer
} else if (availablePositionBasedOnPayerLimit.toNumber() < transfer.amount.amount) {
transferStateId = Enum.Transfers.TransferState.ABORTED
reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_LIMIT_ERROR.message

const headers = Utility.Http.SwitchDefaultHeaders(
transfer.payerFsp,
Enum.Http.HeaderResources.TRANSFERS,
Enum.Http.Headers.FSPIOP.SWITCH.value,
resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion
)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_LIMIT_ERROR
).toApiErrorObject(Config.ERROR_HANDLING)
const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
fspiopError.errorInformation.errorCode,
fspiopError.errorInformation.errorDescription
)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transfer.transferId,
Enum.Kafka.Topics.NOTIFICATION,
Enum.Events.Event.Action.PREPARE,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payeeFsp,
transfer.payerFsp,
metadata,
headers,
fspiopError,
{ id: transfer.transferId },
'application/json'
)
// Payer has sufficient liquidity and limit
} else {
transferStateId = Enum.Transfers.TransferState.RESERVED
currentPosition = currentPosition.add(transfer.amount.amount)
availablePositionBasedOnLiquidityCover = availablePositionBasedOnLiquidityCover.add(transfer.amount.amount)
availablePositionBasedOnPayerLimit = availablePositionBasedOnPayerLimit.add(transfer.amount.amount)

// Are these the right headers?
const headers = Utility.Http.SwitchDefaultHeaders(
transfer.payeeFsp,
Enum.Http.HeaderResources.TRANSFERS,
transfer.payerFsp,
resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion
)
const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.SUCCESS.status,
null,
null
)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transfer.transferId,
Enum.Kafka.Topics.TRANSFER,
Enum.Events.Event.Action.PREPARE,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payeeFsp,
transfer.payerFsp,
metadata,
headers,
transfer,
{},
'application/json'
)

const participantPositionChange = {
transferId: transfer.transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: currentPosition.toNumber(),
reservedValue: accumulatedPositionReservedValue
}
participantPositionChanges.push(participantPositionChange)
Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::participantPositionChange: ${JSON.stringify(participantPositionChange)}`)
}

resultMessages.push({ binItem, message: resultMessage })

const transferStateChange = {
transferId: transfer.transferId,
transferStateId,
reason
}
transferStateChanges.push(transferStateChange)
Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::transferStateChange: ${JSON.stringify(transferStateChange)}`)

Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::limitAlarm: ${currentPosition.toNumber()} > ${liquidityCover.multiply(participantLimit.thresholdAlarmPercentage)}`)
if (currentPosition.toNumber() > liquidityCover.multiply(participantLimit.thresholdAlarmPercentage).toNumber()) {
limitAlarms.push(participantLimit)
}

accumulatedTransferStatesCopy[transfer.transferId] = transferStateId
Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::accumulatedTransferStatesCopy:finalizedTransferState ${JSON.stringify(transferStateId)}`)
}

return {
accumulatedPositionValue: currentPosition.toNumber(),
accumulatedTransferStates: accumulatedTransferStatesCopy, // finalized transfer state after prepare processing
accumulatedPositionReservedValue, // not used but kept for consistency
accumulatedTransferStateChanges: transferStateChanges, // transfer state changes to be persisted in order
limitAlarms, // array of participant limits that have been breached
accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order
notifyMessages: resultMessages // array of objects containing bin item and result message. {binItem, message}
}
}

module.exports = {
Expand Down
13 changes: 13 additions & 0 deletions src/models/position/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ const getPositionsByAccountIdsForUpdate = async (trx, accountIds) => {
return positions
}

const getPositionsByAccountIdsNonTrx = async (accountIds) => {
_initKnex()
const participantPositions = await knex('participantPosition')
.whereIn('participantCurrencyId', accountIds)
.select('*')
const positions = {}
participantPositions.forEach((position) => {
positions[position.participantCurrencyId] = position
})
return positions
}

const updateParticipantPosition = async (trx, participantPositionId, participantPositionValue, participantPositionReservedValue = null) => {
const optionalValues = {}
if (participantPositionReservedValue !== null) {
Expand All @@ -123,6 +135,7 @@ module.exports = {
startDbTransaction,
getLatestTransferStateChangesByTransferIdList,
getPositionsByAccountIdsForUpdate,
getPositionsByAccountIdsNonTrx,
getParticipantCurrencyIds,
getParticipantCurrencyIdsByParticipantIds,
updateParticipantPosition,
Expand Down
Loading

0 comments on commit 6318ce1

Please sign in to comment.