diff --git a/.circleci/config.yml b/.circleci/config.yml index 0d4d5e444..9940a5bae 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -873,6 +873,8 @@ workflows: filters: tags: only: /.*/ + # test-integration only on main and release branches. revert to /.*/ after integration test fixes + # ignore: /v[0-9]+(\.[0-9]+)*\-snapshot+((\.[0-9]+)?)/ branches: ignore: - /feature*/ diff --git a/.nycrc.yml b/.nycrc.yml index d028a91ca..8aa318701 100644 --- a/.nycrc.yml +++ b/.nycrc.yml @@ -28,6 +28,7 @@ exclude: [ 'src/handlers/transfers/FxFulfilService.js', 'src/models/position/batch.js', 'src/models/fxTransfer/**', - 'src/shared/fspiopErrorFactory.js' + 'src/shared/fspiopErrorFactory.js', + 'src/lib/proxyCache.js' # todo: remove this line after adding test coverage ] ## todo: increase test coverage before merging feat/fx-impl to main branch diff --git a/audit-ci.jsonc b/audit-ci.jsonc index 9314e72e9..eeb2349b2 100644 --- a/audit-ci.jsonc +++ b/audit-ci.jsonc @@ -12,6 +12,7 @@ "GHSA-g64q-3vg8-8f93", // https://github.com/advisories/GHSA-g64q-3vg8-8f93 "GHSA-mg85-8mv5-ffjr", // https://github.com/advisories/GHSA-mg85-8mv5-ffjr "GHSA-8hc4-vh64-cxmj", // https://github.com/advisories/GHSA-8hc4-vh64-cxmj - "GHSA-952p-6rrq-rcjv" // https://github.com/advisories/GHSA-952p-6rrq-rcjv + "GHSA-952p-6rrq-rcjv", // https://github.com/advisories/GHSA-952p-6rrq-rcjv + "GHSA-9wv6-86v2-598j" // https://github.com/advisories/GHSA-9wv6-86v2-598j ] } diff --git a/package-lock.json b/package-lock.json index d93eb0ebb..3642ee1c2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.22", + "version": "17.8.0-snapshot.27", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.22", + "version": "17.8.0-snapshot.27", "license": "Apache-2.0", "dependencies": { "@hapi/basic": "7.0.2", diff --git a/package.json b/package.json index 4690071dc..236e88e18 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.22", + "version": "17.8.0-snapshot.27", "description": "Central ledger hosted by a scheme to record and settle transfers", "license": "Apache-2.0", "author": "ModusBox", @@ -63,7 +63,7 @@ "migrate:current": "npx knex migrate:currentVersion $npm_package_config_knex", "seed:run": "npx knex seed:run $npm_package_config_knex", "docker:build": "docker build --build-arg NODE_VERSION=\"$(cat .nvmrc)-alpine\" -t mojaloop/central-ledger:local .", - "docker:up": ". ./docker/env.sh && docker-compose -f docker-compose.yml up", + "docker:up": ". ./docker/env.sh && docker-compose -f docker-compose.yml up -d", "docker:up:backend": "docker-compose up -d ml-api-adapter mysql mockserver kafka kowl temp_curl", "docker:up:int": "docker compose up -d kafka init-kafka objstore mysql", "docker:script:populateTestData": "sh ./test/util/scripts/populateTestData.sh", diff --git a/src/domain/position/fulfil.js b/src/domain/position/fulfil.js index 4d19f0627..e53f8bb1a 100644 --- a/src/domain/position/fulfil.js +++ b/src/domain/position/fulfil.js @@ -65,9 +65,11 @@ const processPositionFulfilBin = async ( // Find out the first item to be processed const positionChangeIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone) const positionChangeToBeProcessed = cyrilResult.positionChanges[positionChangeIndex] + let transferStateIdCopy if (positionChangeToBeProcessed.isFxTransferStateChange) { const { participantPositionChange, fxTransferStateChange, transferStateId, updatedRunningPosition } = _handleParticipantPositionChangeFx(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.commitRequestId, accumulatedPositionReservedValue) + transferStateIdCopy = transferStateId runningPosition = updatedRunningPosition participantPositionChanges.push(participantPositionChange) fxTransferStateChanges.push(fxTransferStateChange) @@ -76,6 +78,7 @@ const processPositionFulfilBin = async ( } else { const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } = _handleParticipantPositionChange(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.transferId, accumulatedPositionReservedValue) + transferStateIdCopy = transferStateId runningPosition = updatedRunningPosition participantPositionChanges.push(participantPositionChange) transferStateChanges.push(transferStateChange) @@ -86,22 +89,19 @@ const processPositionFulfilBin = async ( const nextIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone) if (nextIndex === -1) { // All position changes are done - const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers) + const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateIdCopy) resultMessages.push({ binItem, message: resultMessage }) } else { // There are still position changes to be processed // Send position-commit kafka message again for the next item const participantCurrencyId = cyrilResult.positionChanges[nextIndex].participantCurrencyId - const followupMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers) + const followupMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateIdCopy) // Pass down the context to the followup message with mutated cyrilResult followupMessage.content.context = binItem.message.value.content.context followupMessages.push({ binItem, messageKey: participantCurrencyId.toString(), message: followupMessage }) } } else { const transferAmount = transferInfoList[transferId].amount - - const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers) - const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } = _handleParticipantPositionChange(runningPosition, transferAmount, transferId, accumulatedPositionReservedValue) runningPosition = updatedRunningPosition @@ -109,6 +109,7 @@ const processPositionFulfilBin = async ( participantPositionChanges.push(participantPositionChange) transferStateChanges.push(transferStateChange) accumulatedTransferStatesCopy[transferId] = transferStateId + const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateId) resultMessages.push({ binItem, message: resultMessage }) } } @@ -165,7 +166,7 @@ const _handleIncorrectTransferState = (binItem, payeeFsp, transferId, accumulate ) } -const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers) => { +const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateId) => { // forward same headers from the prepare message, except the content-length header const headers = { ...binItem.message.value.content.headers } delete headers['content-length'] @@ -197,6 +198,7 @@ const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, pa resultMessage.content.payload = TransferObjectTransform.toFulfil( reservedActionTransfers[transferId] ) + resultMessage.content.payload.transferState = transferStateId } return resultMessage } diff --git a/src/handlers/transfers/prepare.js b/src/handlers/transfers/prepare.js index 3df5161d1..32db5a1ef 100644 --- a/src/handlers/transfers/prepare.js +++ b/src/handlers/transfers/prepare.js @@ -36,9 +36,9 @@ const Participant = require('../../domain/participant') const createRemittanceEntity = require('./createRemittanceEntity') const Validator = require('./validator') const dto = require('./dto') -const TransferService = require('#src/domain/transfer/index') -const ProxyCache = require('#src/lib/proxyCache') -const FxTransferService = require('#src/domain/fx/index') +const TransferService = require('../../domain/transfer/index') +const ProxyCache = require('../../lib/proxyCache') +const FxTransferService = require('../../domain/fx/index') const { Kafka, Comparators } = Util const { TransferState } = Enum.Transfers @@ -436,10 +436,14 @@ const prepare = async (error, messages) => { } if (proxyEnabled) { const [initiatingFsp, counterPartyFsp] = isFx ? [payload.initiatingFsp, payload.counterPartyFsp] : [payload.payerFsp, payload.payeeFsp] + + const payeeFspLookupOptions = isFx ? null : { validateCurrencyAccounts: true, accounts: [{ currency: payload.amount.currency, accountType: Enum.Accounts.LedgerAccountType.POSITION }] } + ;[proxyObligation.initiatingFspProxyOrParticipantId, proxyObligation.counterPartyFspProxyOrParticipantId] = await Promise.all([ ProxyCache.getFSPProxy(initiatingFsp), - ProxyCache.getFSPProxy(counterPartyFsp) + ProxyCache.getFSPProxy(counterPartyFsp, payeeFspLookupOptions) ]) + logger.debug('Prepare proxy cache lookup results', { initiatingFsp, counterPartyFsp, @@ -449,6 +453,7 @@ const prepare = async (error, messages) => { proxyObligation.isInitiatingFspProxy = !proxyObligation.initiatingFspProxyOrParticipantId.inScheme && proxyObligation.initiatingFspProxyOrParticipantId.proxyId !== null + proxyObligation.isCounterPartyFspProxy = !proxyObligation.counterPartyFspProxyOrParticipantId.inScheme && proxyObligation.counterPartyFspProxyOrParticipantId.proxyId !== null diff --git a/src/lib/proxyCache.js b/src/lib/proxyCache.js index 21b4f6297..e2ed70d2d 100644 --- a/src/lib/proxyCache.js +++ b/src/lib/proxyCache.js @@ -33,11 +33,36 @@ const getCache = () => { return proxyCache } -const getFSPProxy = async (dfspId) => { +/** + * Get the proxy details for the given dfspId + * + * @param {*} dfspId + * @param {*} options - { validateCurrencyAccounts: boolean, accounts: [ { currency: string, accountType: Enum.Accounts.LedgerAccountType } ] } + * @returns {Promise<{ inScheme: boolean, proxyId: string }>} + */ +const getFSPProxy = async (dfspId, options = null) => { logger.debug('Checking if dfspId is in scheme or proxy', { dfspId }) const participant = await ParticipantService.getByName(dfspId) + let inScheme = !!participant + + if (inScheme && options?.validateCurrencyAccounts) { + logger.debug('Checking if participant currency accounts are active', { dfspId, options, participant }) + let accountsAreActive = false + for (const account of options.accounts) { + accountsAreActive = participant.currencyList.some((currAccount) => { + return ( + currAccount.currencyId === account.currency && + currAccount.ledgerAccountTypeId === account.accountType && + currAccount.isActive === 1 + ) + }) + if (!accountsAreActive) break + } + inScheme = accountsAreActive + } + return { - inScheme: !!participant, + inScheme, proxyId: !participant ? await getCache().lookupProxyByDfspId(dfspId) : null } } diff --git a/src/models/transfer/facade.js b/src/models/transfer/facade.js index 191f90aa0..2782bd8f7 100644 --- a/src/models/transfer/facade.js +++ b/src/models/transfer/facade.js @@ -1398,11 +1398,9 @@ const getTransferParticipant = async (participantName, transferId) => { .where({ 'participant.name': participantName, 'tp.transferId': transferId, - 'participant.isActive': 1, - 'pc.isActive': 1 + 'participant.isActive': 1 }) - .innerJoin('participantCurrency AS pc', 'pc.participantId', 'participant.participantId') - .innerJoin('transferParticipant AS tp', 'tp.participantCurrencyId', 'pc.participantCurrencyId') + .innerJoin('transferParticipant AS tp', 'tp.participantId', 'participant.participantId') .select( 'tp.*' ) diff --git a/test/unit/models/transfer/facade.test.js b/test/unit/models/transfer/facade.test.js index adc19e77d..7daebfded 100644 --- a/test/unit/models/transfer/facade.test.js +++ b/test/unit/models/transfer/facade.test.js @@ -2689,7 +2689,6 @@ Test('Transfer facade', async (transferFacadeTest) => { const participantName = 'fsp1' const transferId = '88416f4c-68a3-4819-b8e0-c23b27267cd5' const builderStub = sandbox.stub() - const participantCurrencyStub = sandbox.stub() const transferParticipantStub = sandbox.stub() const selectStub = sandbox.stub() @@ -2697,10 +2696,8 @@ Test('Transfer facade', async (transferFacadeTest) => { Db.participant.query.callsArgWith(0, builderStub) builderStub.where.returns({ - innerJoin: participantCurrencyStub.returns({ - innerJoin: transferParticipantStub.returns({ - select: selectStub.returns([1]) - }) + innerJoin: transferParticipantStub.returns({ + select: selectStub.returns([1]) }) }) @@ -2709,11 +2706,9 @@ Test('Transfer facade', async (transferFacadeTest) => { test.ok(builderStub.where.withArgs({ 'participant.name': participantName, 'tp.transferId': transferId, - 'participant.isActive': 1, - 'pc.isActive': 1 + 'participant.isActive': 1 }).calledOnce, 'query builder called once') - test.ok(participantCurrencyStub.withArgs('participantCurrency AS pc', 'pc.participantId', 'participant.participantId').calledOnce, 'participantCurrency inner joined') - test.ok(transferParticipantStub.withArgs('transferParticipant AS tp', 'tp.participantCurrencyId', 'pc.participantCurrencyId').calledOnce, 'transferParticipant inner joined') + test.ok(transferParticipantStub.withArgs('transferParticipant AS tp', 'tp.participantId', 'participant.participantId').calledOnce, 'transferParticipant inner joined') test.ok(selectStub.withArgs( 'tp.*' ).calledOnce, 'select all columns from transferParticipant')