Skip to content

Commit

Permalink
feat(mojaloop/#3524): add reserve action to fulfil logic (#992)
Browse files Browse the repository at this point in the history
* feat: added skeleton and comments for prepare bin implementation

* chore: some changes

* feat: added new position handler v2

* feat: added some functionality to new position handler v2

* feat: added some todos

* feat: changed new handler name

* fix: lint

* feat: add some changes

* fix: lint

* chore: added copyright header

* feat: added some logic

* fix: lint

* feat: added unit tests

* feat: some improvements

* feat: added bin processor logic

* fix: lint

* feat; refactor

* feat: added limit

* fix: unit tests

* feat: added integration test

* feat: refactor

* fix: unit tests

* fix: integration tests

* feat: add position prepare bin logic (#970)

* feat: add position prepare bin logic

* edit

* lint

* dep

* chore: changes

* chore: commet

* lint

* chore: refactor

* chore: unit tests

* chore: change

* chore: comment

* chore: test

* chore: more test

* chore: test?

* chore: remove

* chore: rework domain logic

* chore: test

* fix: position calculation

* fix: position change data

* fix; prepare domain function

* chore: fix

---------

Co-authored-by: Vijay <[email protected]>

* chore: add tests (#972)

* chore: add tests

* chore: unit

* sleep

* chore: clear

* chore: test

* enum

* fix: deadlock in tests

---------

Co-authored-by: Vijay <[email protected]>

* fix: metrics

* fix: some issue with bin processor

* feat(mojaloop/#3489): add cached calls for participant currency, update tests (#973)

* chore: add tests

* chore: unit

* sleep

* chore: clear

* chore: test

* enum

* feat: add cached calls for participant currency

* chore: change

* chore: unit

* chore: unit

* lock

* test

* fix: unit test

---------

Co-authored-by: Vijay <[email protected]>

* chore: improve coverage and edit metric and query name (#974)

* chore: coverage

* lint

* chore: coverage

* more coverage

* chore: test

* chore: coverage

* chore: metric name

* chore: metric name

* chore: update query name

* chore: remove log

* chore(snapshot): 17.3.0-snapshot.0

* fix: test downgrade stream lib

* chore(snapshot): 17.3.0-snapshot.1

* fix: notification event action

* chore(snapshot): 17.3.0-snapshot.2

* fix: notification event

* chore(snapshot): 17.3.0-snapshot.3

* fix: notification messages

* chore(snapshot): 17.3.0-snapshot.4

* chore: updated conf

* chore: update deps

* chore(snapshot): 17.3.0-snapshot.5

* fix(mojaloop/#3529): fix high latency  (#981)

* fix: upgrade central-services-stream library

* chore(snapshot): v17.2.1-snapshot.0

* chore(snapshot): 17.2.1-snapshot.1

* chore(snapshot): 17.2.1-snapshot.2

* chore(release): 17.2.1 [skip ci]

* feat(mojaloop/#3489): add negative integration tests (#975)

* chore: add support for unique transfers and dual currencies in batch

* check in for a negative test case

* uncommented tests

* added test for mixed currencies

* updated test for mixed currencies

* chore: dep and lint

---------

Co-authored-by: Kevin Leyow <[email protected]>

* Merge branch main of https://github.com/mojaloop/central-ledger into feature/position-prepare-binning

* chore: foreach to for loop

* chore: cleanup

* Update src/handlers/index.js

Co-authored-by: Miguel de Barros <[email protected]>

* chore: added readme doc

* fix: lint

* fix: unit tests

* fix: audit

* chore: cleanup

* fix(mojaloop/#3533): helm v15.2.0-rc fixes (#982)

fix(mojaloop/#3533): helm v15.2.0-rc fixes - mojaloop/project#3533

List of fixes:
- fix(mojaloop/#3580): missing toDestination on handling the fspiop source/destiation headers failing match validation on fulfil - regression on #2697 - v17.0.0...v17.2.0#diff-3a2d4aabbde0cd9517dd372f6ae6001ad607d005b5316785c8698fe25160aa92L393 - mojaloop/project#3580
    Fixes currently resolve regression failures on these tests:
        - p2p_money_transfer_put_notifications - payee receives no Notification with ABORTED status after sending invalid FSPIOP-Destination header with transferStatus=COMMITTED, file path: golden_path/bug fixes /Test for Bugfix #2697 - Central-Ledger Fulfil Handler does not correctly invalidate requests with an incorrect-non-existent FSP-ID in the FSPIOP-Destination header.json
        - p2p_money_transfer_patch_notifications - payee receives PATCH Notification with ABORTED status after sending invalid FSPIOP-Destination header with transferStatus=RESERVED, file path: golden_path/bug fixes/Test for Bugfix #2697 - Central-Ledger Fulfil Handler does not correctly invalidate requests with an incorrect-non-existent FSP-ID in the FSPIOP-Destination header.json

* chore(release): 17.3.0 [skip ci]

* chore: remove uuid4 (#976)

chore: remove uuid4
 - Remove uuid4 and use native randomUUID.
 - Bump some deps are quested by commit hook

* chore(release): 17.3.1 [skip ci]

* fix(mojaloop/#3615): upgrade dependencies (#985)

* chore(release): 17.3.2 [skip ci]

* fix: remove unneeded async/await

* feat: add validation for participantCurrencyIds / accountIds mapping

* chore: remove resolved TODOs

* test: Add test coverage

* chore: remove whitespaces

* fix: fixrebase  merge conflict

* chore: update dependencies

* doc: fix typo

* chore(snapshot): 17.4.0-snapshot.0

* chore(snapshot): 17.4.0-snapshot.1

* test: update unit tests

* chore(snapshot): 17.4.0-snapshot.2

* fix: route error callback for batch correctly

* chore(snapshot): 17.4.0-snapshot.3

* fix(batch): fix error callback message routing. update tests

* chore(snapshot): 17.4.0-snapshot.4

* fix(batch): fix message routing / properties for more error cases

* chore(snapshot): 17.4.0-snapshot.5

* test: update test coverage

* chore(snapshot): 17.4.0-snapshot.6

* fix: route bulk-prepare messages to non-batch prepare handler

* chore(snapshot): 17.4.0-snapshot.7

* doc: update comment

* chore: update deps. update README for batch.

* doc: Address TODO regarding participanLimits query optimization

* stash

* chore: domain

* chore: test

* test

* chore: update bin processor tests

* chore: coverage

* log

* chore: update

* chore: up retries

* chore: test

* ci

* test

* install

* test

* test

* test

* script

* chore: fix order

* time

* test

* time

* time

* state

* time

* chore: check transfer state

* test

* lint

* chore: await

* address comments

* chore: dep

* chore: address comments

* coverage

* chore(snapshot): 17.5.0-snapshot.0

* feat: add reserve functionality

* int test

* remove

* name

* dep:check

---------

Co-authored-by: Vijay <[email protected]>
Co-authored-by: Aarón Reynoza <[email protected]>
Co-authored-by: mojaloopci <[email protected]>
Co-authored-by: Sridevi Miriyala <[email protected]>
Co-authored-by: vijayg10 <[email protected]>
Co-authored-by: Miguel de Barros <[email protected]>
Co-authored-by: Marco Ippolito <[email protected]>
Co-authored-by: Steven Oderayi <[email protected]>
  • Loading branch information
9 people authored Dec 20, 2023
1 parent 6f656a8 commit eb82c33
Show file tree
Hide file tree
Showing 11 changed files with 1,179 additions and 138 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"jsdoc": "4.0.2",
"jsonpath": "1.1.1",
"nodemon": "3.0.2",
"npm-check-updates": "16.14.11",
"npm-check-updates": "16.14.12",
"nyc": "15.1.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
Expand Down
19 changes: 15 additions & 4 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,16 @@ const participantFacade = require('../../models/participant/facade')
*/
const processBins = async (bins, trx) => {
const transferIdList = []
await iterateThroughBins(bins, (_accountID, _action, item) => {
const reservedActionTransferIdList = []
await iterateThroughBins(bins, (_accountID, action, item) => {
if (item.decodedPayload?.transferId) {
transferIdList.push(item.decodedPayload.transferId)
// get transferId from uriParams for fulfil messages
} else if (item.message?.value?.content?.uriParams?.id) {
transferIdList.push(item.message.value.content.uriParams.id)
if (action === Enum.Events.Event.Action.RESERVE) {
reservedActionTransferIdList.push(item.message.value.content.uriParams.id)
}
}
})
// Pre fetch latest transferStates for all the transferIds in the account-bin
Expand Down Expand Up @@ -131,6 +135,12 @@ const processBins = async (bins, trx) => {
Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE
)

// Pre fetch transfers for all reserve action fulfils
const reservedActionTransfers = await BatchPositionModel.getTransferByIdsForReserve(
trx,
reservedActionTransferIdList
)

let notifyMessages = []
let limitAlarms = []

Expand All @@ -142,7 +152,7 @@ const processBins = async (bins, trx) => {
array2.every((element) => array1.includes(element))
// If non-prepare/non-commit action found, log error
// We need to remove this once we implement all the actions
if (!isSubset(['prepare', 'commit'], actions)) {
if (!isSubset(['prepare', 'commit', 'reserve'], actions)) {
Logger.isErrorEnabled && Logger.error('Only prepare/commit actions are allowed in a batch')
// throw new Error('Only prepare action is allowed in a batch')
}
Expand All @@ -167,11 +177,12 @@ const processBins = async (bins, trx) => {

// If fulfil action found then call processPositionPrepareBin function
const fulfilActionResult = await PositionFulfilDomain.processPositionFulfilBin(
accountBin.commit,
[accountBin.commit, accountBin.reserve],
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
latestTransferInfoByTransferId
latestTransferInfoByTransferId,
reservedActionTransfers
)

// Update accumulated values
Expand Down
214 changes: 112 additions & 102 deletions src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,136 +4,146 @@ const Config = require('../../lib/config')
const Utility = require('@mojaloop/central-services-shared').Util
const MLNumber = require('@mojaloop/ml-number')
const Logger = require('@mojaloop/central-services-logger')
const TransferObjectTransform = require('../../domain/transfer/transform')

/**
* @function processPositionFulfilBin
*
* @async
* @description This is the domain function to process a bin of position-fulfil messages of a single participant account.
*
* @param {array} binItems - an array of objects that contain a position fulfil message and its span. {message, span}
* @param {array} commitReserveFulfilBins - an array containing commit and reserve action bins
* @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing
* @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency
* @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output.
* @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function.
* @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed
*/
const processPositionFulfilBin = async (
binItems,
commitReserveFulfilBins,
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
transferInfoList
transferInfoList,
reservedActionTransfers
) => {
const transferStateChanges = []
const participantPositionChanges = []
const resultMessages = []
const accumulatedTransferStatesCopy = Object.assign({}, accumulatedTransferStates)
let runningPosition = new MLNumber(accumulatedPositionValue)

if (binItems && binItems.length > 0) {
for (const binItem of binItems) {
let transferStateId
let reason
let resultMessage
const transferId = binItem.message.value.content.uriParams.id
const payeeFsp = binItem.message.value.from
const payerFsp = binItem.message.value.to
const transfer = binItem.decodedPayload
Logger.isDebugEnabled && Logger.debug(`processPositionFulfilBin::transfer:processingMessage: ${JSON.stringify(transfer)}`)
Logger.isDebugEnabled && Logger.debug(`accumulatedTransferStates: ${JSON.stringify(accumulatedTransferStates)}`)
// Inform payee dfsp if transfer is not in RECEIVED_FULFIL state, skip making any transfer state changes
if (accumulatedTransferStates[transferId] !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL) {
// forward same headers from the prepare message, except the content-length header
// set destination to payeefsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = payeeFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(
`Invalid State: ${accumulatedTransferStates[transferId]} - expected: ${Enum.Transfers.TransferInternalState.RECEIVED_FULFIL}`
).toApiErrorObject(Config.ERROR_HANDLING)
const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
fspiopError.errorInformation.errorCode,
fspiopError.errorInformation.errorDescription
)

const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transferId,
Enum.Kafka.Topics.NOTIFICATION,
Enum.Events.Event.Action.FULFIL,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
transferId,
payeeFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
metadata,
headers,
fspiopError,
{ id: transferId },
'application/json'
)
} else {
const transferInfo = transferInfoList[transferId]

// forward same headers from the prepare message, except the content-length header
const headers = { ...binItem.message.value.content.headers }
delete headers['content-length']

const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.SUCCESS.status,
null,
null
)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transferId,
Enum.Kafka.Topics.TRANSFER,
Enum.Events.Event.Action.COMMIT,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
transferId,
payerFsp,
payeeFsp,
metadata,
headers,
transfer,
{ id: transferId },
'application/json'
)

transferStateId = Enum.Transfers.TransferState.COMMITTED
// Amounts in `transferParticipant` for the payee are stored as negative values
runningPosition = new MLNumber(runningPosition.add(transferInfo.amount).toFixed(Config.AMOUNT.SCALE))

const participantPositionChange = {
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: runningPosition.toNumber(),
reservedValue: accumulatedPositionReservedValue
}
participantPositionChanges.push(participantPositionChange)
binItem.result = { success: true }
}
for (const binItems of commitReserveFulfilBins) {
if (binItems && binItems.length > 0) {
for (const binItem of binItems) {
let transferStateId
let reason
let resultMessage
const transferId = binItem.message.value.content.uriParams.id
const payeeFsp = binItem.message.value.from
const payerFsp = binItem.message.value.to
const transfer = binItem.decodedPayload
Logger.isDebugEnabled && Logger.debug(`processPositionFulfilBin::transfer:processingMessage: ${JSON.stringify(transfer)}`)
Logger.isDebugEnabled && Logger.debug(`accumulatedTransferStates: ${JSON.stringify(accumulatedTransferStates)}`)
// Inform payee dfsp if transfer is not in RECEIVED_FULFIL state, skip making any transfer state changes
if (accumulatedTransferStates[transferId] !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL) {
// forward same headers from the prepare message, except the content-length header
// set destination to payeefsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = payeeFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(
`Invalid State: ${accumulatedTransferStates[transferId]} - expected: ${Enum.Transfers.TransferInternalState.RECEIVED_FULFIL}`
).toApiErrorObject(Config.ERROR_HANDLING)
const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
fspiopError.errorInformation.errorCode,
fspiopError.errorInformation.errorDescription
)

const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transferId,
Enum.Kafka.Topics.NOTIFICATION,
Enum.Events.Event.Action.FULFIL,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
transferId,
payeeFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
metadata,
headers,
fspiopError,
{ id: transferId },
'application/json'
)
} else {
const transferInfo = transferInfoList[transferId]

// forward same headers from the prepare message, except the content-length header
const headers = { ...binItem.message.value.content.headers }
delete headers['content-length']

resultMessages.push({ binItem, message: resultMessage })
const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.SUCCESS.status,
null,
null
)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transferId,
Enum.Kafka.Topics.TRANSFER,
Enum.Events.Event.Action.COMMIT,
state
)

if (transferStateId) {
const transferStateChange = {
transferId,
transferStateId,
reason
resultMessage = Utility.StreamingProtocol.createMessage(
transferId,
payerFsp,
payeeFsp,
metadata,
headers,
transfer,
{ id: transferId },
'application/json'
)

if (binItem.message.value.metadata.event.action === Enum.Events.Event.Action.RESERVE) {
resultMessage.content.payload = TransferObjectTransform.toFulfil(
reservedActionTransfers[transferId]
)
}

transferStateId = Enum.Transfers.TransferState.COMMITTED
// Amounts in `transferParticipant` for the payee are stored as negative values
runningPosition = new MLNumber(runningPosition.add(transferInfo.amount).toFixed(Config.AMOUNT.SCALE))

const participantPositionChange = {
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: runningPosition.toNumber(),
reservedValue: accumulatedPositionReservedValue
}
participantPositionChanges.push(participantPositionChange)
binItem.result = { success: true }
}
transferStateChanges.push(transferStateChange)
Logger.isDebugEnabled && Logger.debug(`processPositionFulfilBin::transferStateChange: ${JSON.stringify(transferStateChange)}`)

accumulatedTransferStatesCopy[transferId] = transferStateId
Logger.isDebugEnabled && Logger.debug(`processPositionFulfilBin::accumulatedTransferStatesCopy:finalizedTransferState ${JSON.stringify(transferStateId)}`)
resultMessages.push({ binItem, message: resultMessage })

if (transferStateId) {
const transferStateChange = {
transferId,
transferStateId,
reason
}
transferStateChanges.push(transferStateChange)
Logger.isDebugEnabled && Logger.debug(`processPositionFulfilBin::transferStateChange: ${JSON.stringify(transferStateChange)}`)

accumulatedTransferStatesCopy[transferId] = transferStateId
Logger.isDebugEnabled && Logger.debug(`processPositionFulfilBin::accumulatedTransferStatesCopy:finalizedTransferState ${JSON.stringify(transferStateId)}`)
}
}
}
}
Expand Down
46 changes: 45 additions & 1 deletion src/models/position/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

const Db = require('../../lib/db')
const Logger = require('@mojaloop/central-services-logger')
const TransferExtensionModel = require('../transfer/transferExtension')
const { Enum } = require('@mojaloop/central-services-shared')

const startDbTransaction = async () => {
const knex = await Db.getKnex()
Expand Down Expand Up @@ -141,6 +143,47 @@ const bulkInsertParticipantPositionChanges = async (trx, participantPositionChan
return await knex.batchInsert('participantPositionChange', participantPositionChangeList).transacting(trx)
}

const getTransferByIdsForReserve = async (trx, transferIds) => {
if (transferIds && transferIds.length > 0) {
try {
const knex = await Db.getKnex()
const query = await knex('transfer')
.transacting(trx)
.leftJoin('transferStateChange AS tsc', 'tsc.transferId', 'transfer.transferId')
.leftJoin('transferState AS ts', 'ts.transferStateId', 'tsc.transferStateId')
.leftJoin('transferFulfilment AS tf', 'tf.transferId', 'transfer.transferId')
.leftJoin('transferError as te', 'te.transferId', 'transfer.transferId') // currently transferError.transferId is PK ensuring one error per transferId
.whereIn('transfer.transferId', transferIds)
.select(
'transfer.*',
'tsc.createdDate AS completedTimestamp',
'ts.enumeration as transferStateEnumeration',
'tf.ilpFulfilment AS fulfilment',
'te.errorCode',
'te.errorDescription'
)
const transfers = {}
for (const transfer of query) {
transfer.extensionList = await TransferExtensionModel.getByTransferId(transfer.transferId)
if (transfer.errorCode && transfer.transferStateEnumeration === Enum.Transfers.TransferState.ABORTED) {
if (!transfer.extensionList) transfer.extensionList = []
transfer.extensionList.push({
key: 'cause',
value: `${transfer.errorCode}: ${transfer.errorDescription}`.substr(0, 128)
})
}
transfer.isTransferReadModel = true
transfers[transfer.transferId] = transfer
}
return transfers
} catch (err) {
Logger.isErrorEnabled && Logger.error(err)
throw err
}
}
return {}
}

module.exports = {
startDbTransaction,
getLatestTransferStateChangesByTransferIdList,
Expand All @@ -149,5 +192,6 @@ module.exports = {
bulkInsertTransferStateChanges,
bulkInsertParticipantPositionChanges,
getAllParticipantCurrency,
getTransferInfoList
getTransferInfoList,
getTransferByIdsForReserve
}
Loading

0 comments on commit eb82c33

Please sign in to comment.