diff --git a/migrations/310404_participantPositionChange-change.js b/migrations/310404_participantPositionChange-change.js new file mode 100644 index 000000000..81632f9e3 --- /dev/null +++ b/migrations/310404_participantPositionChange-change.js @@ -0,0 +1,46 @@ +/***** + License + -------------- + Copyright © 2017 Bill & Melinda Gates Foundation + The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + Contributors + -------------- + This is the official list of the Mojaloop project contributors for this file. + Names of the original copyright holders (individuals or organizations) + should be listed with a '*' in the first column. People who have + contributed from an organization can be listed under the organization + that actually holds the copyright for their contributions (see the + Gates Foundation organization for an example). Those individuals should have + their names indented and be marked with a '-'. Email address can be added + optionally within square brackets . + * Gates Foundation + - Name Surname + + * ModusBox + - Vijaya Kumar Guthi + -------------- + ******/ + +'use strict' + +exports.up = async (knex) => { + return await knex.schema.hasTable('participantPositionChange').then(function(exists) { + if (exists) { + return knex.schema.alterTable('participantPositionChange', (t) => { + t.decimal('change', 18, 2).notNullable() + }) + } + }) +} + +exports.down = async (knex) => { + return await knex.schema.hasTable('participantPositionChange').then(function(exists) { + if (exists) { + return knex.schema.alterTable('participantPositionChange', (t) => { + t.dropColumn('change') + }) + } + }) +} diff --git a/package-lock.json b/package-lock.json index 9509d8e0a..b63e3fd3d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.28", + "version": "17.8.0-snapshot.31", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.28", + "version": "17.8.0-snapshot.31", "license": "Apache-2.0", "dependencies": { "@hapi/basic": "7.0.2", @@ -20,7 +20,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.5.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "v18.8.0", + "@mojaloop/central-services-shared": "18.8.0", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.6", "@mojaloop/event-sdk": "14.1.1", @@ -58,7 +58,7 @@ "jsdoc": "4.0.3", "jsonpath": "1.1.1", "nodemon": "3.1.5", - "npm-check-updates": "17.1.1", + "npm-check-updates": "17.1.2", "nyc": "17.0.0", "pre-commit": "1.2.2", "proxyquire": "2.1.3", @@ -9728,9 +9728,9 @@ } }, "node_modules/npm-check-updates": { - "version": "17.1.1", - "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-17.1.1.tgz", - "integrity": "sha512-2aqIzGAEWB7xPf0hKHTkNmUM5jHbn2S5r2/z/7dA5Ij2h/sVYAg9R/uVkaUC3VORPAfBm7pKkCWo6E9clEVQ9A==", + "version": "17.1.2", + "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-17.1.2.tgz", + "integrity": "sha512-k3osAbCNXIXqC7QAuF2uRHsKtTUS50KhOW1VAojRHlLdZRh/5EYfduvnVPGDWsbQXFakbSrSbWDdV8qIvDSUtA==", "dev": true, "bin": { "ncu": "build/cli.js", diff --git a/package.json b/package.json index c9d06567b..84f76456b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.28", + "version": "17.8.0-snapshot.31", "description": "Central ledger hosted by a scheme to record and settle transfers", "license": "Apache-2.0", "author": "ModusBox", @@ -92,7 +92,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.5.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "v18.8.0", + "@mojaloop/central-services-shared": "18.8.0", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.6", "@mojaloop/event-sdk": "14.1.1", @@ -133,7 +133,7 @@ "jsdoc": "4.0.3", "jsonpath": "1.1.1", "nodemon": "3.1.5", - "npm-check-updates": "17.1.1", + "npm-check-updates": "17.1.2", "nyc": "17.0.0", "pre-commit": "1.2.2", "proxyquire": "2.1.3", diff --git a/src/domain/fx/cyril.js b/src/domain/fx/cyril.js index 18d5a571a..69aa65969 100644 --- a/src/domain/fx/cyril.js +++ b/src/domain/fx/cyril.js @@ -224,7 +224,7 @@ const _getPositionChanges = async (commitRequestIdList, transferIdList) => { commitRequestId, notifyTo: fxRecord.initiatingFspName, participantCurrencyId: fxPositionChange.participantCurrencyId, - amount: -fxPositionChange.value + amount: -fxPositionChange.change }) }) } @@ -238,7 +238,7 @@ const _getPositionChanges = async (commitRequestIdList, transferIdList) => { transferId, notifyTo: transferRecord.payerFsp, participantCurrencyId: transferPositionChange.participantCurrencyId, - amount: -transferPositionChange.value + amount: -transferPositionChange.change }) }) } diff --git a/src/domain/position/abort.js b/src/domain/position/abort.js index 3fe24f4c4..6acf6685d 100644 --- a/src/domain/position/abort.js +++ b/src/domain/position/abort.js @@ -83,6 +83,7 @@ const processPositionAbortBin = async ( accumulatedTransferStatesCopy[positionChangeToBeProcessed.transferId] = transferStateId } binItem.result = { success: true } + const from = binItem.message.value.from cyrilResult.positionChanges[positionChangeIndex].isDone = true const nextIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone) if (nextIndex === -1) { @@ -91,11 +92,11 @@ const processPositionAbortBin = async ( for (const positionChange of cyrilResult.positionChanges) { if (positionChange.isFxTransferStateChange) { // Construct notification message for fx transfer state change - const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, Config.HUB_NAME, positionChange.notifyTo) + const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, from, positionChange.notifyTo) resultMessages.push({ binItem, message: resultMessage }) } else { // Construct notification message for transfer state change - const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, Config.HUB_NAME, positionChange.notifyTo) + const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, from, positionChange.notifyTo) resultMessages.push({ binItem, message: resultMessage }) } } @@ -127,7 +128,9 @@ const processPositionAbortBin = async ( const _constructAbortResultMessage = (binItem, id, from, notifyTo) => { let apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.PAYEE_REJECTION - if (binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.FX_ABORT_VALIDATION) { + let fromCalculated = from + if (binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.FX_ABORT_VALIDATION || binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.ABORT_VALIDATION) { + fromCalculated = Config.HUB_NAME apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR } const fspiopError = ErrorHandler.Factory.createFSPIOPError( @@ -153,8 +156,8 @@ const _constructAbortResultMessage = (binItem, id, from, notifyTo) => { ) const resultMessage = Utility.StreamingProtocol.createMessage( id, - from, notifyTo, + fromCalculated, metadata, binItem.message.value.content.headers, // Headers don't really matter here. ml-api-adapter will ignore them and create their own. fspiopError, @@ -173,6 +176,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, trans transferId, // Need to delete this in bin processor while updating transferStateChangeId transferStateChangeId: null, // Need to update this in bin processor while executing queries value: updatedRunningPosition.toNumber(), + change: transferAmount, reservedValue: accumulatedPositionReservedValue } @@ -194,6 +198,7 @@ const _handleParticipantPositionChangeFx = (runningPosition, transferAmount, com commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries value: updatedRunningPosition.toNumber(), + change: transferAmount, reservedValue: accumulatedPositionReservedValue } diff --git a/src/domain/position/fulfil.js b/src/domain/position/fulfil.js index 566bb5f8c..d34b71667 100644 --- a/src/domain/position/fulfil.js +++ b/src/domain/position/fulfil.js @@ -266,6 +266,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, trans transferId, // Need to delete this in bin processor while updating transferStateChangeId transferStateChangeId: null, // Need to update this in bin processor while executing queries value: updatedRunningPosition.toNumber(), + change: transferAmount, reservedValue: accumulatedPositionReservedValue } @@ -286,6 +287,7 @@ const _handleParticipantPositionChangeFx = (runningPosition, transferAmount, com commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries value: updatedRunningPosition.toNumber(), + change: transferAmount, reservedValue: accumulatedPositionReservedValue } diff --git a/src/domain/position/fx-prepare.js b/src/domain/position/fx-prepare.js index 098730db0..f3caf9a46 100644 --- a/src/domain/position/fx-prepare.js +++ b/src/domain/position/fx-prepare.js @@ -61,7 +61,7 @@ const processFxPositionPrepareBin = async ( let resultMessage const fxTransfer = binItem.decodedPayload const cyrilResult = binItem.message.value.content.context.cyrilResult - const transferAmount = fxTransfer.targetAmount.currency === cyrilResult.currencyId ? new MLNumber(fxTransfer.targetAmount.amount) : new MLNumber(fxTransfer.sourceAmount.amount) + const transferAmount = fxTransfer.targetAmount.currency === cyrilResult.currencyId ? fxTransfer.targetAmount.amount : fxTransfer.sourceAmount.amount Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::transfer:processingMessage: ${JSON.stringify(fxTransfer)}`) @@ -205,6 +205,7 @@ const processFxPositionPrepareBin = async ( commitRequestId: fxTransfer.commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries value: currentPosition.toNumber(), + change: transferAmount, reservedValue: accumulatedPositionReservedValue } participantPositionChanges.push(participantPositionChange) diff --git a/src/domain/position/fx-timeout-reserved.js b/src/domain/position/fx-timeout-reserved.js index ccd5dee3f..9bda53480 100644 --- a/src/domain/position/fx-timeout-reserved.js +++ b/src/domain/position/fx-timeout-reserved.js @@ -53,7 +53,7 @@ const processPositionFxTimeoutReservedBin = async ( } else { Logger.isDebugEnabled && Logger.debug(`accumulatedFxTransferStates: ${JSON.stringify(accumulatedFxTransferStates)}`) - const transferAmount = fetchedReservedPositionChangesByCommitRequestIds[commitRequestId][participantAccountId].value + const transferAmount = fetchedReservedPositionChangesByCommitRequestIds[commitRequestId][participantAccountId].change // Construct payee notification message const resultMessage = _constructFxTimeoutReservedResultMessage( @@ -141,6 +141,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, commi commitRequestId, // Need to delete this in bin processor while updating transferStateChangeId transferStateChangeId: null, // Need to update this in bin processor while executing queries value: updatedRunningPosition.toNumber(), + change: transferAmount, reservedValue: accumulatedPositionReservedValue } diff --git a/src/domain/position/prepare.js b/src/domain/position/prepare.js index 55f1f343f..5ae3dc883 100644 --- a/src/domain/position/prepare.js +++ b/src/domain/position/prepare.js @@ -207,6 +207,7 @@ const processPositionPrepareBin = async ( transferId: transfer.transferId, // Need to delete this in bin processor while updating transferStateChangeId transferStateChangeId: null, // Need to update this in bin processor while executing queries value: currentPosition.toNumber(), + change: transferAmount, reservedValue: accumulatedPositionReservedValue } participantPositionChanges.push(participantPositionChange) diff --git a/src/domain/position/timeout-reserved.js b/src/domain/position/timeout-reserved.js index 59844ac94..2ec7c0a07 100644 --- a/src/domain/position/timeout-reserved.js +++ b/src/domain/position/timeout-reserved.js @@ -144,6 +144,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, trans transferId, // Need to delete this in bin processor while updating transferStateChangeId transferStateChangeId: null, // Need to update this in bin processor while executing queries value: updatedRunningPosition.toNumber(), + change: transferAmount, reservedValue: accumulatedPositionReservedValue } diff --git a/src/handlers/transfers/handler.js b/src/handlers/transfers/handler.js index 1e474d43a..4ad013e37 100644 --- a/src/handlers/transfers/handler.js +++ b/src/handlers/transfers/handler.js @@ -408,14 +408,40 @@ const processFulfilMessage = async (message, functionality, span) => { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorInvalidFulfilment--${actionLetter}9`)) const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, 'invalid fulfilment') const apiFSPIOPError = fspiopError.toApiErrorObject(Config.ERROR_HANDLING) - await TransferService.handlePayeeResponse(transferId, payload, action, apiFSPIOPError) + await TransferService.handlePayeeResponse(transferId, payload, TransferEventAction.ABORT_VALIDATION, apiFSPIOPError) const eventDetail = { functionality: TransferEventType.POSITION, action: TransferEventAction.ABORT_VALIDATION } /** * TODO: BulkProcessingHandler (not in scope of #967) The individual transfer is ABORTED by notification is never sent. */ // Key position validation abort with payer account id - const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME }) + + const cyrilResult = await FxService.Cyril.processAbortMessage(transferId) + + params.message.value.content.context = { + ...params.message.value.content.context, + cyrilResult + } + if (cyrilResult.positionChanges.length > 0) { + const participantCurrencyId = cyrilResult.positionChanges[0].participantCurrencyId + await Kafka.proceed( + Config.KAFKA_CONFIG, + params, + { + consumerCommit, + fspiopError: apiFSPIOPError, + eventDetail, + messageKey: participantCurrencyId.toString(), + topicNameOverride: Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.ABORT, + hubName: Config.HUB_NAME + } + ) + } else { + const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError('Invalid cyril result') + throw fspiopError + } + + // const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION) + // await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME }) // emit an extra message - RESERVED_ABORTED if action === TransferEventAction.RESERVE if (action === TransferEventAction.RESERVE) { diff --git a/src/handlers/transfers/prepare.js b/src/handlers/transfers/prepare.js index 32db5a1ef..87ebbda89 100644 --- a/src/handlers/transfers/prepare.js +++ b/src/handlers/transfers/prepare.js @@ -437,6 +437,7 @@ const prepare = async (error, messages) => { if (proxyEnabled) { const [initiatingFsp, counterPartyFsp] = isFx ? [payload.initiatingFsp, payload.counterPartyFsp] : [payload.payerFsp, payload.payeeFsp] + // TODO: We need to double check the following validation logic incase of payee side currency conversion const payeeFspLookupOptions = isFx ? null : { validateCurrencyAccounts: true, accounts: [{ currency: payload.amount.currency, accountType: Enum.Accounts.LedgerAccountType.POSITION }] } ;[proxyObligation.initiatingFspProxyOrParticipantId, proxyObligation.counterPartyFspProxyOrParticipantId] = await Promise.all([ diff --git a/src/handlers/transfers/validator.js b/src/handlers/transfers/validator.js index c2fb110c3..8e43a433e 100644 --- a/src/handlers/transfers/validator.js +++ b/src/handlers/transfers/validator.js @@ -43,6 +43,8 @@ const Config = require('../../lib/config') const Participant = require('../../domain/participant') const Transfer = require('../../domain/transfer') const FxTransferModel = require('../../models/fxTransfer') +// const TransferStateChangeModel = require('../../models/transfer/transferStateChange') +const FxTransferStateChangeModel = require('../../models/fxTransfer/stateChange') const CryptoConditions = require('../../cryptoConditions') const Crypto = require('crypto') const base64url = require('base64url') @@ -208,6 +210,30 @@ const validatePrepare = async (payload, headers, isFx = false, determiningTransf const initiatingFsp = isFx ? payload.initiatingFsp : payload.payerFsp const counterPartyFsp = isFx ? payload.counterPartyFsp : payload.payeeFsp + // Check if determining transfers are failed + if (determiningTransferCheckResult.watchListRecords && determiningTransferCheckResult.watchListRecords.length > 0) { + // Iterate through determiningTransferCheckResult.watchListRecords + for (const watchListRecord of determiningTransferCheckResult.watchListRecords) { + if (isFx) { + // TODO: Check the transfer state of determiningTransferId + // const latestTransferStateChange = await TransferStateChangeModel.getByTransferId(watchListRecord.determiningTransferId) + // if (latestTransferStateChange.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED) { + // reasons.push('Related Transfer is not in reserved state') + // validationPassed = false + // return { validationPassed, reasons } + // } + } else { + // Check the transfer state of commitRequestId + const latestFxTransferStateChange = await FxTransferStateChangeModel.getByCommitRequestId(watchListRecord.commitRequestId) + if (latestFxTransferStateChange.transferStateId !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL_DEPENDENT) { + reasons.push('Related FX Transfer is not fulfilled') + validationPassed = false + return { validationPassed, reasons } + } + } + } + } + // Skip usual validation if preparing a proxy transfer or fxTransfer if (!(proxyObligation?.isInitiatingFspProxy || proxyObligation?.isCounterPartyFspProxy)) { validationPassed = ( diff --git a/src/models/position/facade.js b/src/models/position/facade.js index b064c314a..12a36100d 100644 --- a/src/models/position/facade.js +++ b/src/models/position/facade.js @@ -229,12 +229,13 @@ const prepareChangeParticipantPositionTransaction = async (transferList) => { const processedTransfersKeysList = Object.keys(processedTransfers) const batchParticipantPositionChange = [] for (const keyIndex in processedTransfersKeysList) { - const { runningPosition, runningReservedValue } = processedTransfers[processedTransfersKeysList[keyIndex]] + const { transferAmount, runningPosition, runningReservedValue } = processedTransfers[processedTransfersKeysList[keyIndex]] const participantPositionChange = { participantPositionId: initialParticipantPosition.participantPositionId, participantCurrencyId: participantCurrency.participantCurrencyId, transferStateChangeId: processedTransferStateChangeIdList[keyIndex], value: runningPosition, + change: transferAmount.toNumber(), // processBatch: - a single value uuid for this entire batch to make sure the set of transfers in this batch can be clearly grouped reservedValue: runningReservedValue } @@ -294,6 +295,7 @@ const changeParticipantPositionTransaction = async (participantCurrencyId, isRev participantCurrencyId, transferStateChangeId: insertedTransferStateChange.transferStateChangeId, value: latestPosition, + change: isReversal ? -amount : amount, reservedValue: participantPosition.reservedValue, createdDate: transactionTimestamp } diff --git a/src/models/transfer/facade.js b/src/models/transfer/facade.js index 2782bd8f7..08de158df 100644 --- a/src/models/transfer/facade.js +++ b/src/models/transfer/facade.js @@ -730,6 +730,22 @@ const _processFxTimeoutEntries = async (knex, trx, transactionTimestamp) => { .andWhere('ftsc.transferStateId', `${Enum.Transfers.TransferState.RESERVED}`) .select('ftt.commitRequestId', knex.raw('?', Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT), knex.raw('?', 'Marked for expiration by Timeout Handler')) }) + + // Insert `fxTransferStateChange` records for RECEIVED_FULFIL_DEPENDENT + await knex.from(knex.raw('fxTransferStateChange (commitRequestId, transferStateId, reason)')).transacting(trx) + .insert(function () { + this.from('fxTransferTimeout AS ftt') + .innerJoin(knex('fxTransferStateChange AS ftsc1') + .select('ftsc1.commitRequestId') + .max('ftsc1.fxTransferStateChangeId AS maxFxTransferStateChangeId') + .innerJoin('fxTransferTimeout AS ftt1', 'ftt1.commitRequestId', 'ftsc1.commitRequestId') + .groupBy('ftsc1.commitRequestId').as('fts'), 'fts.commitRequestId', 'ftt.commitRequestId' + ) + .innerJoin('fxTransferStateChange AS ftsc', 'ftsc.fxTransferStateChangeId', 'fts.maxFxTransferStateChangeId') + .where('ftt.expirationDate', '<', transactionTimestamp) + .andWhere('ftsc.transferStateId', `${Enum.Transfers.TransferInternalState.RECEIVED_FULFIL_DEPENDENT}`) + .select('ftt.commitRequestId', knex.raw('?', Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT), knex.raw('?', 'Marked for expiration by Timeout Handler')) + }) } const _insertFxTransferErrorEntries = async (knex, trx, transactionTimestamp) => { @@ -858,8 +874,12 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax, fxSegm .leftJoin('fxTransferTimeout AS ftt', 'ftt.commitRequestId', 'ft.commitRequestId') .leftJoin('fxTransfer AS ft1', 'ft1.determiningTransferId', 'ft.determiningTransferId') .whereNull('ftt.commitRequestId') - .whereIn('ftsc.transferStateId', [`${Enum.Transfers.TransferInternalState.RECEIVED_PREPARE}`, `${Enum.Transfers.TransferState.RESERVED}`]) // TODO: this needs to be updated to proper states for fx - .select('ft1.commitRequestId', 'ft.expirationDate') // Passing expiration date of the timedout fxTransfer for all related fxTransfers + .whereIn('ftsc.transferStateId', [ + `${Enum.Transfers.TransferInternalState.RECEIVED_PREPARE}`, + `${Enum.Transfers.TransferState.RESERVED}`, + `${Enum.Transfers.TransferInternalState.RECEIVED_FULFIL_DEPENDENT}` + ]) // TODO: this needs to be updated to proper states for fx + .select('ft1.commitRequestId', 'ft.expirationDate') // Passing expiration date of the timed out fxTransfer for all related fxTransfers }) await _processTimeoutEntries(knex, trx, transactionTimestamp) @@ -1079,6 +1099,7 @@ const transferStateAndPositionUpdate = async function (param1, enums, trx = null participantCurrencyId: info.drAccountId, transferStateChangeId, value: new MLNumber(info.drPositionValue).add(info.drAmount).toFixed(Config.AMOUNT.SCALE), + change: info.drAmount, reservedValue: info.drReservedValue, createdDate: param1.createdDate }) @@ -1103,6 +1124,7 @@ const transferStateAndPositionUpdate = async function (param1, enums, trx = null participantCurrencyId: info.crAccountId, transferStateChangeId, value: new MLNumber(info.crPositionValue).add(info.crAmount).toFixed(Config.AMOUNT.SCALE), + change: info.crAmount, reservedValue: info.crReservedValue, createdDate: param1.createdDate }) diff --git a/test/integration-override/handlers/transfers/fxAbort.test.js b/test/integration-override/handlers/transfers/fxAbort.test.js index a4975c46c..16d787a28 100644 --- a/test/integration-override/handlers/transfers/fxAbort.test.js +++ b/test/integration-override/handlers/transfers/fxAbort.test.js @@ -162,7 +162,7 @@ const prepareFxTestData = async (dataObj) => { const fxTransferPayload = { commitRequestId: randomUUID(), determiningTransferId: transferId, - condition: 'YlK5TZyhflbXaDRPtR5zhCu8FrbgvrQwwmzuH0iQ0AI', + condition: 'GRzLaTP7DJ9t4P-a_BA0WA9wzzlsugf00-Tn6kESAfM', expiration: dataObj.expiration, initiatingFsp: payer.participant.name, counterPartyFsp: fxp.participant.name, @@ -323,8 +323,8 @@ const prepareFxTestData = async (dataObj) => { const messageProtocolPayerInitiatedConversionFxFulfil = Util.clone(messageProtocolPayerInitiatedConversionFxPrepare) messageProtocolPayerInitiatedConversionFxFulfil.id = randomUUID() - messageProtocolPayerInitiatedConversionFxFulfil.from = transferPayload.counterPartyFsp - messageProtocolPayerInitiatedConversionFxFulfil.to = transferPayload.initiatingFsp + messageProtocolPayerInitiatedConversionFxFulfil.from = fxTransferPayload.counterPartyFsp + messageProtocolPayerInitiatedConversionFxFulfil.to = fxTransferPayload.initiatingFsp messageProtocolPayerInitiatedConversionFxFulfil.content.headers = fxFulfilHeaders messageProtocolPayerInitiatedConversionFxFulfil.content.uriParams = { id: fxTransferPayload.commitRequestId } messageProtocolPayerInitiatedConversionFxFulfil.content.payload = fulfilPayload @@ -362,6 +362,12 @@ const prepareFxTestData = async (dataObj) => { TransferEventType.PREPARE ) + const topicConfFxTransferFulfil = Utility.createGeneralTopicConf( + Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, + TransferEventType.TRANSFER, + TransferEventType.FULFIL + ) + const topicConfTransferFulfil = Utility.createGeneralTopicConf( Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, TransferEventType.TRANSFER, @@ -385,6 +391,7 @@ const prepareFxTestData = async (dataObj) => { topicConfTransferPrepare, topicConfTransferFulfil, topicConfFxTransferPrepare, + topicConfFxTransferFulfil, payer, payerLimitAndInitialPosition, fxp, @@ -477,7 +484,7 @@ Test('Handlers test', async handlersTest => { }) }) - await handlersTest.test('When only tranfer is sent and followed by transfer abort', async abortTest => { + await handlersTest.test('When only transfer is sent and followed by transfer abort', async abortTest => { const td = await prepareFxTestData(testFxData) await abortTest.test('update transfer state to RESERVED by PREPARE request', async (test) => { @@ -587,6 +594,56 @@ Test('Handlers test', async handlersTest => { const payerPositionAfterReserve = await ParticipantService.getPositionByParticipantCurrencyId(td.payer.participantCurrencyId) test.equal(payerPositionAfterReserve.value, testFxData.sourceAmount.amount) + testConsumer.clearEvents() + test.end() + }) + + await abortTest.test('update fxTransfer state to RECEIVED_FULFIL_DEPENDENT by FULFIL request', async (test) => { + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventAction.FULFIL.toUpperCase() + ) + fulfilConfig.logger = Logger + + await Producer.produceMessage( + td.messageProtocolPayerInitiatedConversionFxFulfil, + td.topicConfFxTransferFulfil, + fulfilConfig + ) + + try { + const positionFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: TOPIC_POSITION_BATCH, + action: Enum.Events.Event.Action.FX_RESERVE + // NOTE: The key is the fxp participantCurrencyId of the source currency (USD) + // Is that correct...? + // keyFilter: td.fxp.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFulfil[0], 'Position fx-fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + try { + await wrapWithRetries(async () => { + const fxTransfer = await FxTransferModels.fxTransfer.getAllDetailsByCommitRequestId( + td.messageProtocolPayerInitiatedConversionFxPrepare.content.payload.commitRequestId) || {} + + if (fxTransfer?.transferState !== TransferInternalState.RECEIVED_FULFIL_DEPENDENT) { + if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) + return null + } + return fxTransfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + + testConsumer.clearEvents() test.end() }) @@ -619,6 +676,7 @@ Test('Handlers test', async handlersTest => { const fxpTargetPositionAfterReserve = await ParticipantService.getPositionByParticipantCurrencyId(td.fxp.participantCurrencyIdSecondary) test.equal(fxpTargetPositionAfterReserve.value, testFxData.targetAmount.amount) + testConsumer.clearEvents() test.end() }) @@ -650,7 +708,8 @@ Test('Handlers test', async handlersTest => { // Check for the fxTransfer state to be ABORTED try { await wrapWithRetries(async () => { - const fxTransfer = await FxTransferModels.fxTransfer.getAllDetailsByCommitRequestId(td.messageProtocolPayerInitiatedConversionFxPrepare.content.payload.commitRequestId) || {} + const fxTransfer = await FxTransferModels.fxTransfer.getAllDetailsByCommitRequestId( + td.messageProtocolPayerInitiatedConversionFxPrepare.content.payload.commitRequestId) || {} if (fxTransfer?.transferState !== TransferInternalState.ABORTED_ERROR) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) return null @@ -678,6 +737,7 @@ Test('Handlers test', async handlersTest => { const fxpSourcePositionAfterAbort = await ParticipantService.getPositionByParticipantCurrencyId(td.fxp.participantCurrencyId) test.equal(fxpSourcePositionAfterAbort.value, 0) + testConsumer.clearEvents() test.end() }) @@ -727,6 +787,7 @@ Test('Handlers test', async handlersTest => { test.fail(err.message) } + testConsumer.clearEvents() test.end() }) @@ -743,7 +804,8 @@ Test('Handlers test', async handlersTest => { // Check for the fxTransfer state to be ABORTED try { await wrapWithRetries(async () => { - const fxTransfer = await FxTransferModels.fxTransfer.getAllDetailsByCommitRequestId(td.messageProtocolPayerInitiatedConversionFxPrepare.content.payload.commitRequestId) || {} + const fxTransfer = await FxTransferModels.fxTransfer.getAllDetailsByCommitRequestId( + td.messageProtocolPayerInitiatedConversionFxPrepare.content.payload.commitRequestId) || {} if (fxTransfer?.transferState !== TransferInternalState.ABORTED_ERROR) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) return null @@ -754,7 +816,7 @@ Test('Handlers test', async handlersTest => { Logger.error(err) test.fail(err.message) } - + testConsumer.clearEvents() test.end() }) diff --git a/test/integration-override/handlers/transfers/fxFulfil.test.js b/test/integration-override/handlers/transfers/fxFulfil.test.js index 93515a5f8..25df61641 100644 --- a/test/integration-override/handlers/transfers/fxFulfil.test.js +++ b/test/integration-override/handlers/transfers/fxFulfil.test.js @@ -96,6 +96,7 @@ const storeFxTransferPreparePayload = async (fxTransfer, transferStateId = '', a fxTransferStateChangeId: fxTransferStateChangeId[0].fxTransferStateChangeId, participantCurrencyId: 1, value: 0, + change: 0, reservedValue: 0 }) } diff --git a/test/integration-override/handlers/transfers/fxTimeout.test.js b/test/integration-override/handlers/transfers/fxTimeout.test.js index fbee6d783..9764db06f 100644 --- a/test/integration-override/handlers/transfers/fxTimeout.test.js +++ b/test/integration-override/handlers/transfers/fxTimeout.test.js @@ -33,6 +33,7 @@ const Cache = require('#src/lib/cache') const ProxyCache = require('#src/lib/proxyCache') const Producer = require('@mojaloop/central-services-stream').Util.Producer const Utility = require('@mojaloop/central-services-shared').Util.Kafka +const Util = require('@mojaloop/central-services-shared').Util const Enum = require('@mojaloop/central-services-shared').Enum const ParticipantHelper = require('#test/integration/helpers/participant') const ParticipantLimitHelper = require('#test/integration/helpers/participantLimit') @@ -161,7 +162,7 @@ const prepareFxTestData = async (dataObj) => { const fxTransferPayload = { commitRequestId: randomUUID(), determiningTransferId: transferId, - condition: 'YlK5TZyhflbXaDRPtR5zhCu8FrbgvrQwwmzuH0iQ0AI', + condition: 'GRzLaTP7DJ9t4P-a_BA0WA9wzzlsugf00-Tn6kESAfM', expiration: dataObj.expiration, initiatingFsp: payer.participant.name, counterPartyFsp: fxp.participant.name, @@ -280,14 +281,45 @@ const prepareFxTestData = async (dataObj) => { TransferEventType.PREPARE ) + const topicConfFxTransferFulfil = Utility.createGeneralTopicConf( + Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, + TransferEventType.TRANSFER, + TransferEventType.FULFIL + ) + + const fxFulfilHeaders = { + 'fspiop-source': fxp.participant.name, + 'fspiop-destination': payer.participant.name, + 'content-type': 'application/vnd.interoperability.fxTransfers+json;version=2.0' + } + + const fulfilPayload = { + fulfilment: 'UNlJ98hZTY_dsw0cAqw4i_UN3v4utt7CZFB4yfLbVFA', + completedTimestamp: dataObj.now, + transferState: 'COMMITTED' + } + + const messageProtocolPayerInitiatedConversionFxFulfil = Util.clone(messageProtocolPayerInitiatedConversionFxPrepare) + messageProtocolPayerInitiatedConversionFxFulfil.id = randomUUID() + messageProtocolPayerInitiatedConversionFxFulfil.from = fxTransferPayload.counterPartyFsp + messageProtocolPayerInitiatedConversionFxFulfil.to = fxTransferPayload.initiatingFsp + messageProtocolPayerInitiatedConversionFxFulfil.content.headers = fxFulfilHeaders + messageProtocolPayerInitiatedConversionFxFulfil.content.uriParams = { id: fxTransferPayload.commitRequestId } + messageProtocolPayerInitiatedConversionFxFulfil.content.payload = fulfilPayload + messageProtocolPayerInitiatedConversionFxFulfil.metadata.event.id = randomUUID() + messageProtocolPayerInitiatedConversionFxFulfil.metadata.event.type = TransferEventType.FULFIL + messageProtocolPayerInitiatedConversionFxFulfil.metadata.event.action = TransferEventAction.FX_RESERVE + return { fxTransferPayload, transfer1Payload, errorPayload, messageProtocolPayerInitiatedConversionFxPrepare, + messageProtocolPayerInitiatedConversionFxFulfil, messageProtocolPrepare1, topicConfTransferPrepare, topicConfFxTransferPrepare, + topicConfFxTransferFulfil, payer, payerLimitAndInitialPosition, fxp, @@ -609,6 +641,55 @@ Test('Handlers test', async handlersTest => { test.end() }) + await timeoutTest.test('update fxTransfer state to RECEIVED_FULFIL_DEPENDENT by FULFIL request', async (test) => { + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventAction.FULFIL.toUpperCase() + ) + fulfilConfig.logger = Logger + + await Producer.produceMessage( + td.messageProtocolPayerInitiatedConversionFxFulfil, + td.topicConfFxTransferFulfil, + fulfilConfig + ) + + try { + const positionFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: TOPIC_POSITION_BATCH, + action: Enum.Events.Event.Action.FX_RESERVE + // NOTE: The key is the fxp participantCurrencyId of the source currency (USD) + // Is that correct...? + // keyFilter: td.fxp.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFulfil[0], 'Position fx-fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + try { + await wrapWithRetries(async () => { + const fxTransfer = await FxTransferModels.fxTransfer.getAllDetailsByCommitRequestId( + td.messageProtocolPayerInitiatedConversionFxPrepare.content.payload.commitRequestId) || {} + + if (fxTransfer?.transferState !== TransferInternalState.RECEIVED_FULFIL_DEPENDENT) { + if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) + return null + } + return fxTransfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + + testConsumer.clearEvents() + test.end() + }) + await timeoutTest.test('update transfer state to RESERVED by PREPARE request', async (test) => { const config = Utility.getKafkaConfig( Config.KAFKA_CONFIG, @@ -648,7 +729,6 @@ Test('Handlers test', async handlersTest => { try { // Fetch FxTransfer record const fxTransfer = await FxTransferModels.fxTransfer.getAllDetailsByCommitRequestId(td.messageProtocolPayerInitiatedConversionFxPrepare.content.payload.commitRequestId) || {} - // Check Transfer for correct state if (fxTransfer?.transferState === Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) { // We have a Transfer with the correct state, lets check if we can get the TransferError record diff --git a/test/integration-override/handlers/transfers/handlers.test.js b/test/integration-override/handlers/transfers/handlers.test.js index cb57f4844..78aa5c5b3 100644 --- a/test/integration-override/handlers/transfers/handlers.test.js +++ b/test/integration-override/handlers/transfers/handlers.test.js @@ -1554,7 +1554,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger - td.messageProtocolFxPrepare.content.to = creditor + td.messageProtocolFxPrepare.to = creditor td.messageProtocolFxPrepare.content.headers['fspiop-destination'] = creditor td.messageProtocolFxPrepare.content.payload.counterPartyFsp = creditor await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) @@ -1597,7 +1597,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.FULFIL.toUpperCase()) fulfilConfig.logger = Logger - td.messageProtocolFxPrepare.content.to = creditor + td.messageProtocolFxPrepare.to = creditor td.messageProtocolFxPrepare.content.headers['fspiop-destination'] = creditor td.messageProtocolFxPrepare.content.payload.counterPartyFsp = creditor await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) @@ -1642,8 +1642,8 @@ Test('Handlers test', async handlersTest => { test.fail(err.message) } - td.messageProtocolFxFulfil.content.to = td.payer.participant.name - td.messageProtocolFxFulfil.content.from = 'regionalSchemeFXP' + td.messageProtocolFxFulfil.to = td.payer.participant.name + td.messageProtocolFxFulfil.from = 'regionalSchemeFXP' td.messageProtocolFxFulfil.content.headers['fspiop-destination'] = td.payer.participant.name td.messageProtocolFxFulfil.content.headers['fspiop-source'] = 'regionalSchemeFXP' await Producer.produceMessage(td.messageProtocolFxFulfil, td.topicConfTransferFulfil, fulfilConfig) @@ -1664,7 +1664,7 @@ Test('Handlers test', async handlersTest => { creditor = 'regionalSchemePayeeFsp' await ProxyCache.getCache().addDfspIdToProxyMapping(creditor, td.proxyAR.participant.name) - td.messageProtocolPrepare.content.to = creditor + td.messageProtocolPrepare.to = creditor td.messageProtocolPrepare.content.headers['fspiop-destination'] = creditor td.messageProtocolPrepare.content.payload.payeeFsp = creditor @@ -1709,7 +1709,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger - td.messageProtocolFxPrepare.content.from = debtor + td.messageProtocolFxPrepare.from = debtor td.messageProtocolFxPrepare.content.headers['fspiop-source'] = debtor td.messageProtocolFxPrepare.content.payload.initiatingFsp = debtor await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) @@ -1745,7 +1745,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger - td.messageProtocolFxPrepare.content.from = debtor + td.messageProtocolFxPrepare.from = debtor td.messageProtocolFxPrepare.content.headers['fspiop-source'] = debtor td.messageProtocolFxPrepare.content.payload.initiatingFsp = debtor await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) @@ -1763,11 +1763,37 @@ Test('Handlers test', async handlersTest => { console.error(err) } + // Fulfil the fxTransfer + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger + + td.messageProtocolFxFulfil.to = debtor + td.messageProtocolFxFulfil.content.headers['fspiop-destination'] = debtor + + testConsumer.clearEvents() + await Producer.produceMessage(td.messageProtocolFxFulfil, td.topicConfTransferFulfil, fulfilConfig) + + try { + const positionFxFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-notification-event', + action: 'fx-reserve', + valueToFilter: td.payer.name + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFxFulfil[0], 'Position fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + // Create subsequent transfer const creditor = 'regionalSchemePayeeFsp' await ProxyCache.getCache().addDfspIdToProxyMapping(creditor, td.proxyRB.participant.name) - td.messageProtocolPrepare.content.to = creditor + td.messageProtocolPrepare.to = creditor td.messageProtocolPrepare.content.headers['fspiop-destination'] = creditor td.messageProtocolPrepare.content.payload.payeeFsp = creditor @@ -1814,7 +1840,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger - td.messageProtocolPrepare.content.from = debtor + td.messageProtocolPrepare.from = debtor td.messageProtocolPrepare.content.headers['fspiop-source'] = debtor td.messageProtocolPrepare.content.payload.payerFsp = debtor td.messageProtocolPrepare.content.payload.amount.currency = 'XXX' @@ -1866,7 +1892,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger - td.messageProtocolPrepare.content.from = transferPrepareFrom + td.messageProtocolPrepare.from = transferPrepareFrom td.messageProtocolPrepare.content.headers['fspiop-source'] = transferPrepareFrom td.messageProtocolPrepare.content.payload.payerFsp = transferPrepareFrom @@ -1893,7 +1919,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.FULFIL.toUpperCase()) fulfilConfig.logger = Logger - td.messageProtocolFulfil.content.to = transferPrepareFrom + td.messageProtocolFulfil.to = transferPrepareFrom td.messageProtocolFulfil.content.headers['fspiop-destination'] = transferPrepareFrom testConsumer.clearEvents() @@ -1944,8 +1970,8 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger - td.messageProtocolPrepare.content.from = transferPrepareFrom - td.messageProtocolPrepare.content.to = transferPrepareTo + td.messageProtocolPrepare.from = transferPrepareFrom + td.messageProtocolPrepare.to = transferPrepareTo td.messageProtocolPrepare.content.headers['fspiop-source'] = transferPrepareFrom td.messageProtocolPrepare.content.headers['fspiop-destination'] = transferPrepareTo td.messageProtocolPrepare.content.payload.payerFsp = transferPrepareFrom @@ -1973,8 +1999,8 @@ Test('Handlers test', async handlersTest => { TransferEventType.FULFIL.toUpperCase()) fulfilConfig.logger = Logger - td.messageProtocolFulfil.content.from = transferPrepareTo - td.messageProtocolFulfil.content.to = transferPrepareFrom + td.messageProtocolFulfil.from = transferPrepareTo + td.messageProtocolFulfil.to = transferPrepareFrom td.messageProtocolFulfil.content.headers['fspiop-source'] = transferPrepareTo td.messageProtocolFulfil.content.headers['fspiop-destination'] = transferPrepareFrom @@ -2012,7 +2038,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger - td.messageProtocolFxPrepare.content.from = debtor + td.messageProtocolFxPrepare.from = debtor td.messageProtocolFxPrepare.content.headers['fspiop-source'] = debtor td.messageProtocolFxPrepare.content.payload.initiatingFsp = debtor await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) @@ -2038,7 +2064,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.FULFIL.toUpperCase()) fulfilConfig.logger = Logger - td.messageProtocolFxFulfil.content.to = debtor + td.messageProtocolFxFulfil.to = debtor td.messageProtocolFxFulfil.content.headers['fspiop-destination'] = debtor testConsumer.clearEvents() @@ -2075,7 +2101,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger - td.messageProtocolFxPrepare.content.from = debtor + td.messageProtocolFxPrepare.from = debtor td.messageProtocolFxPrepare.content.headers['fspiop-source'] = debtor td.messageProtocolFxPrepare.content.payload.initiatingFsp = debtor await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) @@ -2101,7 +2127,7 @@ Test('Handlers test', async handlersTest => { TransferEventType.FULFIL.toUpperCase()) fulfilConfig.logger = Logger - td.messageProtocolFxFulfil.content.to = debtor + td.messageProtocolFxFulfil.to = debtor td.messageProtocolFxFulfil.content.headers['fspiop-destination'] = debtor // If initiatingFsp is proxy, fx fulfil handler doesn't validate fspiop-destination header. @@ -2149,9 +2175,15 @@ Test('Handlers test', async handlersTest => { TransferEventType.TRANSFER.toUpperCase(), TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger // FX Transfer from proxyAR to FXP - td.messageProtocolFxPrepare.content.from = transferPrepareFrom + td.messageProtocolFxPrepare.from = transferPrepareFrom td.messageProtocolFxPrepare.content.headers['fspiop-source'] = transferPrepareFrom td.messageProtocolFxPrepare.content.payload.initiatingFsp = transferPrepareFrom await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) @@ -2169,9 +2201,31 @@ Test('Handlers test', async handlersTest => { console.error(err) } + // Fulfil the fxTransfer + td.messageProtocolFxFulfil.to = transferPrepareFrom + td.messageProtocolFxFulfil.content.headers['fspiop-destination'] = transferPrepareFrom + td.messageProtocolFxFulfil.from = td.fxp.participant.name + td.messageProtocolFxFulfil.content.headers['fspiop-source'] = td.fxp.participant.name + + testConsumer.clearEvents() + Logger.warn(`td.messageProtocolFxFulfil: ${JSON.stringify(td.messageProtocolFxFulfil)}`) + await Producer.produceMessage(td.messageProtocolFxFulfil, td.topicConfTransferFulfil, fulfilConfig) + + try { + const positionFxFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-notification-event', + action: 'fx-reserve', + valueToFilter: transferPrepareFrom + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFxFulfil[0], 'Position fxFulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + // Create subsequent transfer - td.messageProtocolPrepare.content.from = transferPrepareFrom - td.messageProtocolPrepare.content.to = transferPrepareTo + td.messageProtocolPrepare.from = transferPrepareFrom + td.messageProtocolPrepare.to = transferPrepareTo td.messageProtocolPrepare.content.headers['fspiop-source'] = transferPrepareFrom td.messageProtocolPrepare.content.headers['fspiop-destination'] = transferPrepareTo td.messageProtocolPrepare.content.payload.payerFsp = transferPrepareFrom @@ -2193,15 +2247,8 @@ Test('Handlers test', async handlersTest => { } // Fulfil the transfer - const fulfilConfig = Utility.getKafkaConfig( - Config.KAFKA_CONFIG, - Enum.Kafka.Config.PRODUCER, - TransferEventType.TRANSFER.toUpperCase(), - TransferEventType.FULFIL.toUpperCase()) - fulfilConfig.logger = Logger - - td.messageProtocolFulfil.content.from = transferPrepareTo - td.messageProtocolFulfil.content.to = transferPrepareFrom + td.messageProtocolFulfil.from = transferPrepareTo + td.messageProtocolFulfil.to = transferPrepareFrom td.messageProtocolFulfil.content.headers['fspiop-source'] = transferPrepareTo td.messageProtocolFulfil.content.headers['fspiop-destination'] = transferPrepareFrom @@ -2247,9 +2294,15 @@ Test('Handlers test', async handlersTest => { TransferEventType.TRANSFER.toUpperCase(), TransferEventType.PREPARE.toUpperCase()) prepareConfig.logger = Logger + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger // FX Transfer from payer to proxyAR - td.messageProtocolFxPrepare.content.to = fxTransferPrepareTo + td.messageProtocolFxPrepare.to = fxTransferPrepareTo td.messageProtocolFxPrepare.content.headers['fspiop-destination'] = fxTransferPrepareTo td.messageProtocolFxPrepare.content.payload.counterPartyFsp = fxTransferPrepareTo await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) @@ -2267,8 +2320,27 @@ Test('Handlers test', async handlersTest => { console.error(err) } + // Fulfil the fxTransfer + td.messageProtocolFulfil.from = fxTransferPrepareTo + td.messageProtocolFulfil.content.headers['fspiop-source'] = fxTransferPrepareTo + + testConsumer.clearEvents() + await Producer.produceMessage(td.messageProtocolFxFulfil, td.topicConfTransferFulfil, fulfilConfig) + + try { + const positionFxFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-notification-event', + action: 'fx-reserve', + valueToFilter: td.payer.name + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFxFulfil[0], 'Position fxFulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + // Create subsequent transfer - td.messageProtocolPrepare.content.to = transferPrepareTo + td.messageProtocolPrepare.to = transferPrepareTo td.messageProtocolPrepare.content.headers['fspiop-destination'] = transferPrepareTo td.messageProtocolPrepare.content.payload.payeeFsp = transferPrepareTo @@ -2302,14 +2374,7 @@ Test('Handlers test', async handlersTest => { } // Fulfil the transfer - const fulfilConfig = Utility.getKafkaConfig( - Config.KAFKA_CONFIG, - Enum.Kafka.Config.PRODUCER, - TransferEventType.TRANSFER.toUpperCase(), - TransferEventType.FULFIL.toUpperCase()) - fulfilConfig.logger = Logger - - td.messageProtocolFulfil.content.from = transferPrepareTo + td.messageProtocolFulfil.from = transferPrepareTo td.messageProtocolFulfil.content.headers['fspiop-source'] = transferPrepareTo testConsumer.clearEvents() await Producer.produceMessage(td.messageProtocolFulfil, td.topicConfTransferFulfil, fulfilConfig) diff --git a/test/unit/domain/fx/cyril.test.js b/test/unit/domain/fx/cyril.test.js index 809f23c11..3032c5f36 100644 --- a/test/unit/domain/fx/cyril.test.js +++ b/test/unit/domain/fx/cyril.test.js @@ -1105,7 +1105,7 @@ Test('Cyril', cyrilTest => { ParticipantPositionChangesModel.getReservedPositionChangesByCommitRequestId.returns(Promise.resolve([ { participantCurrencyId: 1, - value: payload.amount.amount + change: payload.amount.amount } ])) TransferFacade.getById.returns(Promise.resolve({ @@ -1114,7 +1114,7 @@ Test('Cyril', cyrilTest => { ParticipantPositionChangesModel.getReservedPositionChangesByTransferId.returns(Promise.resolve([ { participantCurrencyId: 1, - value: payload.amount.amount + change: payload.amount.amount } ])) @@ -1148,7 +1148,7 @@ Test('Cyril', cyrilTest => { ParticipantPositionChangesModel.getReservedPositionChangesByCommitRequestId.returns(Promise.resolve([ { participantCurrencyId: 1, - value: payload.amount.amount + change: payload.amount.amount } ])) TransferFacade.getById.returns(Promise.resolve({ @@ -1157,7 +1157,7 @@ Test('Cyril', cyrilTest => { ParticipantPositionChangesModel.getReservedPositionChangesByTransferId.returns(Promise.resolve([ { participantCurrencyId: 1, - value: payload.amount.amount + change: payload.amount.amount } ])) diff --git a/test/unit/domain/position/binProcessor.test.js b/test/unit/domain/position/binProcessor.test.js index 9274fde4a..74aee7211 100644 --- a/test/unit/domain/position/binProcessor.test.js +++ b/test/unit/domain/position/binProcessor.test.js @@ -389,7 +389,8 @@ Test('BinProcessor', async (binProcessorTest) => { BatchPositionModel.getReservedPositionChangesByCommitRequestIds.returns({ 'ed6848e0-e2a8-45b0-9f98-59a2ffba8c10': { 15: { - value: 100 + value: 100, + change: 100 } } }) diff --git a/test/unit/domain/position/fx-timeout-reserved.test.js b/test/unit/domain/position/fx-timeout-reserved.test.js index 50acb5741..0b24dc55e 100644 --- a/test/unit/domain/position/fx-timeout-reserved.test.js +++ b/test/unit/domain/position/fx-timeout-reserved.test.js @@ -237,12 +237,14 @@ Test('timeout reserved domain', positionIndexTest => { fetchedReservedPositionChangesByCommitRequestIds: { 'd6a036a5-65a3-48af-a0c7-ee089c412ada': { 51: { - value: 10 + value: 10, + change: 10 } }, '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': { 51: { - value: 5 + value: 5, + change: 5 } } } diff --git a/test/unit/handlers/transfers/handler.test.js b/test/unit/handlers/transfers/handler.test.js index 32363fcfc..8110deb0a 100644 --- a/test/unit/handlers/transfers/handler.test.js +++ b/test/unit/handlers/transfers/handler.test.js @@ -629,6 +629,12 @@ Test('Transfer handler', transferHandlerTest => { })) Validator.validateFulfilCondition.returns(false) Kafka.proceed.returns(true) + Cyril.processAbortMessage.returns({ + isFx: false, + positionChanges: [{ + participantCurrencyId: 1 + }] + }) // Act const result = await allTransferHandlers.fulfil(null, localfulfilMessages)