Skip to content

Commit

Permalink
feat(csi/643): add fx-notify publishing on payer init fxTranfer succe…
Browse files Browse the repository at this point in the history
…ss (#1105)

* feat(csi/643): add fx-notify publishing on payer init fxTranfer success

* loop

* deps

* tests

* list
  • Loading branch information
kleyow authored Sep 18, 2024
1 parent 0fb97a7 commit 60ea20b
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 40 deletions.
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.7.6",
"@mojaloop/central-services-shared": "v18.8.0",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
Expand Down Expand Up @@ -132,7 +132,7 @@
"get-port": "5.1.1",
"jsdoc": "4.0.3",
"jsonpath": "1.1.1",
"nodemon": "3.1.4",
"nodemon": "3.1.5",
"npm-check-updates": "17.1.1",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
Expand Down
7 changes: 6 additions & 1 deletion src/domain/fx/cyril.js
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,12 @@ const processFulfilMessage = async (transferId, payload, transfer) => {
amount: -fxTransferRecord.sourceAmount
})
}
// TODO: Send PATCH notification to FXP
result.patchNotifications.push({
commitRequestId: watchListRecord.commitRequestId,
fxpName: fxTransferRecord.counterPartyFspName,
fulfilment: fxTransferRecord.fulfilment,
completedTimestamp: fxTransferRecord.completedTimestamp
})
}
}

Expand Down
56 changes: 55 additions & 1 deletion src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ const processPositionFulfilBin = async (
participantPositionChanges.push(participantPositionChange)
fxTransferStateChanges.push(fxTransferStateChange)
accumulatedFxTransferStatesCopy[positionChangeToBeProcessed.commitRequestId] = transferStateId
// TODO: Send required FX PATCH notifications
const patchMessages = _constructPatchNotificationResultMessage(
binItem,
cyrilResult
)
for (const patchMessage of patchMessages) {
resultMessages.push({ binItem, message: patchMessage })
}
} else {
const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChange(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.transferId, accumulatedPositionReservedValue)
Expand Down Expand Up @@ -203,6 +209,54 @@ const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, pa
return resultMessage
}

const _constructPatchNotificationResultMessage = (binItem, cyrilResult) => {
const messages = []
const patchNotifications = cyrilResult.patchNotifications
for (const patchNotification of patchNotifications) {
const commitRequestId = patchNotification.commitRequestId
const fxpName = patchNotification.fxpName
const fulfilment = patchNotification.fulfilment
const completedTimestamp = patchNotification.completedTimestamp
const headers = {
...binItem.message.value.content.headers,
'fspiop-source': Config.HUB_NAME,
'fspiop-destination': fxpName
}

const fulfil = {
conversionState: Enum.Transfers.TransferState.COMMITTED,
fulfilment,
completedTimestamp
}

const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.SUCCESS.status,
null,
null
)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
commitRequestId,
Enum.Kafka.Topics.TRANSFER,
Enum.Events.Event.Action.FX_NOTIFY,
state
)

const resultMessage = Utility.StreamingProtocol.createMessage(
commitRequestId,
fxpName,
Config.HUB_NAME,
metadata,
headers,
fulfil,
{ id: commitRequestId },
'application/json'
)

messages.push(resultMessage)
}
return messages
}

const _handleParticipantPositionChange = (runningPosition, transferAmount, transferId, accumulatedPositionReservedValue) => {
const transferStateId = Enum.Transfers.TransferState.COMMITTED
// Amounts in `transferParticipant` for the payee are stored as negative values
Expand Down
11 changes: 9 additions & 2 deletions src/handlers/positions/handlerBatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ const { randomUUID } = require('crypto')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const BatchPositionModel = require('../../models/position/batch')
const decodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodePayload

const consumerCommit = true

/**
Expand Down Expand Up @@ -152,7 +151,15 @@ const positions = async (error, messages) => {
// Loop through results and produce notification messages and audit messages
await Promise.all(result.notifyMessages.map(item => {
// Produce notification message and audit message
const action = item.binItem.message?.value.metadata.event.action
// NOTE: Not sure why we're checking the binItem for the action vs the message
// that is being created.
// Handled FX_NOTIFY differently so as not to break existing functionality.
let action
if (item?.message.metadata.event.action !== Enum.Events.Event.Action.FX_NOTIFY) {
action = item.binItem.message?.value.metadata.event.action
} else {
action = item.message.metadata.event.action
}
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
return Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span)
}).concat(
Expand Down
26 changes: 21 additions & 5 deletions test/integration-override/handlers/transfers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ Test('Handlers test', async handlersTest => {
action: TransferEventAction.FX_RESERVE,
valueToFilter: td.payer.name
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionFxFulfil[0], 'Position fulfil message with key found')
test.ok(positionFxFulfil[0], 'Position fulfil notification message found')
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
Expand All @@ -767,7 +767,7 @@ Test('Handlers test', async handlersTest => {
topicFilter: 'topic-notification-event',
action: TransferEventAction.PREPARE
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionFxFulfil[0], 'Prepare message with key found')
test.ok(positionFxFulfil[0], 'Prepare notification message found')
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
Expand All @@ -780,14 +780,30 @@ Test('Handlers test', async handlersTest => {
topicFilter: 'topic-notification-event',
action: TransferEventAction.COMMIT
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionFxFulfil[0], 'Fulfil message with key found')
test.ok(positionFxFulfil[0], 'Fulfil notification message found')
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
}

// Assert FXP notification message is produced
try {
const notifyFxp = await wrapWithRetries(() => testConsumer.getEventsForFilter({
topicFilter: 'topic-notification-event',
action: TransferEventAction.FX_NOTIFY
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(notifyFxp[0], 'FXP notify notification message found')
test.equal(notifyFxp[0].value.content.payload.conversionState, TransferStateEnum.COMMITTED)
test.equal(notifyFxp[0].value.content.uriParams.id, td.messageProtocolFxPrepare.content.payload.commitRequestId)
test.ok(notifyFxp[0].value.content.payload.completedTimestamp)
test.equal(notifyFxp[0].value.to, td.fxp.participant.name)
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
}
testConsumer.clearEvents()

// Resend fx-prepare after fxtransfer state is COMMITTED
// Resend fx-prepare after fxTransfer state is COMMITTED
await new Promise(resolve => setTimeout(resolve, 2000))
await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig)

Expand All @@ -797,7 +813,7 @@ Test('Handlers test', async handlersTest => {
topicFilter: 'topic-notification-event',
action: TransferEventAction.FX_PREPARE_DUPLICATE
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionPrepare[0], 'Position prepare duplicate message with key found')
test.ok(positionPrepare[0], 'Position prepare duplicate notification found')
// Check if the error message is correct
test.equal(positionPrepare[0].value.content.payload.conversionState, TransferStateEnum.COMMITTED)
} catch (err) {
Expand Down
Loading

0 comments on commit 60ea20b

Please sign in to comment.