Skip to content

Commit

Permalink
fix the switch header to support inter-scheme setup
Browse files Browse the repository at this point in the history
  • Loading branch information
Shashikant Hirugade committed Nov 4, 2024
1 parent 033ef63 commit 0341206
Show file tree
Hide file tree
Showing 9 changed files with 4,420 additions and 6,015 deletions.
10,353 changes: 4,378 additions & 5,975 deletions package-lock.json

Large diffs are not rendered by default.

30 changes: 15 additions & 15 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,36 +80,36 @@
},
"dependencies": {
"@hapi/good": "9.0.1",
"@hapi/hapi": "21.3.10",
"@hapi/hapi": "21.3.12",
"@hapi/basic": "7.0.2",
"@hapi/inert": "7.1.0",
"@hapi/joi": "17.1.1",
"@hapi/vision": "7.0.3",
"@hapi/catbox-memory": "6.0.2",
"@mojaloop/database-lib": "11.0.5",
"@mojaloop/central-services-error-handling": "13.0.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/central-services-error-handling": "13.0.2",
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.3.1",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.3.8",
"@mojaloop/central-services-shared": "18.11.0",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/event-sdk": "14.1.1",
"@mojaloop/ml-number": "11.2.4",
"@mojaloop/object-store-lib": "12.0.3",
"@now-ims/hapi-now-auth": "2.1.0",
"ajv": "8.16.0",
"ajv": "8.17.1",
"ajv-keywords": "5.1.0",
"base64url": "3.0.1",
"blipp": "4.0.2",
"commander": "12.1.0",
"cron": "3.1.7",
"cron": "3.1.8",
"decimal.js": "10.4.3",
"docdash": "2.0.2",
"event-stream": "4.0.1",
"five-bells-condition": "5.0.1",
"glob": "10.4.1",
"glob": "11.0.0",
"hapi-auth-bearer-token": "8.0.0",
"hapi-swagger": "17.2.1",
"hapi-swagger": "17.3.0",
"ilp-packet": "2.2.0",
"knex": "3.1.0",
"lodash": "4.17.21",
Expand All @@ -123,18 +123,18 @@
},
"devDependencies": {
"async-retry": "1.3.3",
"audit-ci": "^7.0.1",
"audit-ci": "^7.1.0",
"get-port": "5.1.1",
"jsdoc": "4.0.3",
"jsdoc": "4.0.4",
"jsonpath": "1.1.1",
"nodemon": "3.1.3",
"npm-check-updates": "16.14.20",
"nyc": "17.0.0",
"nodemon": "3.1.7",
"npm-check-updates": "17.1.10",
"nyc": "17.1.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
"replace": "^1.2.2",
"sinon": "17.0.0",
"standard": "17.1.0",
"standard": "17.1.2",
"standard-version": "^9.5.0",
"tap-spec": "^5.0.0",
"tap-xunit": "2.4.1",
Expand Down
4 changes: 2 additions & 2 deletions src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const processPositionFulfilBin = async (
// set destination to payeefsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = payeeFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(
Expand All @@ -73,7 +73,7 @@ const processPositionFulfilBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
transferId,
payeeFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand Down
12 changes: 6 additions & 6 deletions src/domain/position/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const processPositionPrepareBin = async (
// set destination to payerfsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payerFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -87,7 +87,7 @@ const processPositionPrepareBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payerFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand All @@ -106,7 +106,7 @@ const processPositionPrepareBin = async (
// set destination to payerfsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payerFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -129,7 +129,7 @@ const processPositionPrepareBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payerFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand All @@ -148,7 +148,7 @@ const processPositionPrepareBin = async (
// set destination to payerfsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payerFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -171,7 +171,7 @@ const processPositionPrepareBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payerFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/bulk/processing/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ const bulkProcessing = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `bulkFulfil--${actionLetter}3`))
const participants = await BulkTransferService.getParticipantsById(bulkTransferInfo.bulkTransferId)
const normalizedKeys = Object.keys(headers).reduce((keys, k) => { keys[k.toLowerCase()] = k; return keys }, {})
const payeeBulkResponseHeaders = Util.Headers.transformHeaders(headers, { httpMethod: headers[normalizedKeys[Enum.Http.Headers.FSPIOP.HTTP_METHOD]], sourceFsp: Enum.Http.Headers.FSPIOP.SWITCH.value, destinationFsp: participants.payeeFsp })
const payeeBulkResponseHeaders = Util.Headers.transformHeaders(headers, { httpMethod: headers[normalizedKeys[Enum.Http.Headers.FSPIOP.HTTP_METHOD]], sourceFsp: Config.HUB_NAME, destinationFsp: participants.payeeFsp })
delete payeeBulkResponseHeaders[normalizedKeys[Enum.Http.Headers.FSPIOP.SIGNATURE]]
const payerBulkResponse = Object.assign({}, { messageId: message.value.id, headers: Util.clone(headers) }, getBulkTransferByIdResult.payerBulkTransfer)
const payeeBulkResponse = Object.assign({}, { messageId: message.value.id, headers: payeeBulkResponseHeaders }, getBulkTransferByIdResult.payeeBulkTransfer)
Expand Down Expand Up @@ -344,7 +344,7 @@ const bulkProcessing = async (error, messages) => {
payerParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payerFsp, payerBulkResponse.headers[normalizedKeys[Enum.Http.Headers.FSPIOP.SOURCE]], payerMetadata, payerBulkResponse.headers, payerPayload)

const payeeMetadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, payeeParams.message.value.metadata.type, payeeParams.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS)
payeeParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payeeFsp, Enum.Http.Headers.FSPIOP.SWITCH.value, payeeMetadata, payeeBulkResponse.headers, payeePayload)
payeeParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payeeFsp, Config.HUB_NAME, payeeMetadata, payeeBulkResponse.headers, payeePayload)
if ([Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
eventDetail.action = Enum.Events.Event.Action.BULK_COMMIT
} else if ([Enum.Events.Event.Action.BULK_ABORT].includes(action)) {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/bulk/shared/validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ const validateFspiopSourceAndDestination = async (payload, headers) => {
// Due to the Bulk [Design Considerations](https://docs.mojaloop.io/technical/central-bulk-transfers/#_2-design-considerations),
// it is possible that the Switch may send a POST Request to the Payee FSP with the Source Header containing "Switch",
// and the Payee FSP thus responding with a PUT Callback and destination header containing the same value (Switch).
(headers[Enum.Http.Headers.FSPIOP.DESTINATION] === Enum.Http.Headers.FSPIOP.SWITCH.value)
(headers[Enum.Http.Headers.FSPIOP.DESTINATION] === Config.HUB_NAME)
)
)

Expand Down
6 changes: 3 additions & 3 deletions src/handlers/timeouts/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const timeout = async () => {
try {
const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(result[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state)
const headers = Utility.Http.SwitchDefaultHeaders(result[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value, resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion)
const headers = Utility.Http.SwitchDefaultHeaders(result[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Config.HUB_NAME, resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion)
const message = Utility.StreamingProtocol.createMessage(result[i].transferId, result[i].payeeFsp, result[i].payerFsp, metadata, headers, fspiopError, { id: result[i].transferId }, `application/vnd.interoperability.${Enum.Http.HeaderResources.TRANSFERS}+json;version=${resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion}`)
span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED))
await span.audit({
Expand All @@ -93,7 +93,7 @@ const timeout = async () => {
if (result[i].bulkTransferId === null) { // regular transfer
if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
message.to = message.from
message.from = Enum.Http.Headers.FSPIOP.SWITCH.value
message.from = Config.HUB_NAME
// event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, message, state, null, span)
} else if (result[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
Expand All @@ -105,7 +105,7 @@ const timeout = async () => {
} else { // individual transfer from a bulk
if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
message.to = message.from
message.from = Enum.Http.Headers.FSPIOP.SWITCH.value
message.from = Config.HUB_NAME
message.metadata.event.type = Enum.Events.Event.Type.BULK_PROCESSING
message.metadata.event.action = Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.BULK_PROCESSING, Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, message, state, null, span)
Expand Down
5 changes: 3 additions & 2 deletions test/unit/domain/position/fulfil.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

const Test = require('tapes')(require('tape'))
const { Enum } = require('@mojaloop/central-services-shared')
const Config = require('../../../../src/lib/config')
const Sinon = require('sinon')
const { processPositionFulfilBin } = require('../../../../src/domain/position/fulfil')

Expand Down Expand Up @@ -673,14 +674,14 @@ Test('Fulfil domain', processPositionFulfilBinTest => {

test.equal(result.notifyMessages[0].message.content.headers.accept, transferMessage1.value.content.headers.accept)
test.equal(result.notifyMessages[0].message.content.headers['fspiop-destination'], transferMessage1.value.content.headers['fspiop-source'])
test.equal(result.notifyMessages[0].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value)
test.equal(result.notifyMessages[0].message.content.headers['fspiop-source'], Config.HUB_NAME)
test.equal(result.notifyMessages[0].message.content.headers['content-type'], transferMessage1.value.content.headers['content-type'])
test.equal(result.accumulatedTransferStates[transferMessage1.value.id], Enum.Transfers.TransferInternalState.INVALID)

console.log(transferMessage2.value.content.headers['fspiop-source'])
test.equal(result.notifyMessages[1].message.content.headers.accept, transferMessage2.value.content.headers.accept)
test.equal(result.notifyMessages[1].message.content.headers['fspiop-destination'], transferMessage2.value.content.headers['fspiop-source'])
test.equal(result.notifyMessages[1].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value)
test.equal(result.notifyMessages[1].message.content.headers['fspiop-source'], Config.HUB_NAME)
test.equal(result.notifyMessages[1].message.content.headers['content-type'], transferMessage2.value.content.headers['content-type'])
test.equal(result.accumulatedTransferStates[transferMessage2.value.id], Enum.Transfers.TransferInternalState.INVALID)

Expand Down
Loading

0 comments on commit 0341206

Please sign in to comment.