Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gp failure fixes for interscheme and fx changes #1091

Merged
merged 27 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
08d56be
fix: check participant.isActive in prepare
oderayi Sep 3, 2024
d5bab38
chore(snapshot): 17.8.0-snapshot.16
oderayi Sep 3, 2024
4a1e56e
chore(snapshot): 17.8.0-snapshot.17
oderayi Sep 3, 2024
2f0d36e
fix: check position account is active in prepare
oderayi Sep 3, 2024
6e64cd6
chore(snapshot): 17.8.0-snapshot.18
oderayi Sep 3, 2024
b3b6d1d
test: temporarily disable coverage for proxy
oderayi Sep 3, 2024
d02cd5e
chore(snapshot): 17.8.0-snapshot.19
oderayi Sep 3, 2024
aaf02b4
chore(snapshot): 17.8.0-snapshot.20
oderayi Sep 4, 2024
d1ee459
ci: temporarily disable int tests for snapshots
oderayi Sep 4, 2024
d6afaf5
chore(snapshot): 17.8.0-snapshot.21
oderayi Sep 4, 2024
4c40271
fix: fix typos
oderayi Sep 4, 2024
53b3246
refactor: reactor getFSPProxy
oderayi Sep 4, 2024
3240c41
chore(snapshot): 17.8.0-snapshot.22
oderayi Sep 4, 2024
e1eae99
doc: update comment
oderayi Sep 4, 2024
88319fa
chore(snapshot): 17.8.0-snapshot.23
oderayi Sep 4, 2024
cbe72a5
fix(csi-603): fix getTransferParticipant query join
oderayi Sep 5, 2024
3b5a194
ci: re-enable integration tests for snapshots
oderayi Sep 5, 2024
091740a
chore(snapshot): 17.8.0-snapshot.24
oderayi Sep 5, 2024
58109b2
chore: merge with upstream
oderayi Sep 5, 2024
8ccc816
chore(snapshot): 17.8.0-snapshot.25
oderayi Sep 5, 2024
077bb08
fix: fix query
oderayi Sep 5, 2024
416b1f0
chore(snapshot): 17.8.0-snapshot.26
oderayi Sep 5, 2024
27224ad
refactor: refactor
oderayi Sep 6, 2024
054da86
refactor: refactor
oderayi Sep 6, 2024
be73098
fix(csi-610): fix hub responding with RESERVED instead of COMMITED fo…
oderayi Sep 10, 2024
ef86193
chore(snapshot): 17.8.0-snapshot.27
oderayi Sep 10, 2024
017022c
Merge branch feat/fx-impl of https://github.com/mojaloop/central-ledg…
vijayg10 Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,8 @@ workflows:
filters:
tags:
only: /.*/
# test-integration only on main and release branches. revert to /.*/ after integration test fixes
# ignore: /v[0-9]+(\.[0-9]+)*\-snapshot+((\.[0-9]+)?)/
branches:
ignore:
- /feature*/
Expand Down
3 changes: 2 additions & 1 deletion .nycrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ exclude: [
'src/handlers/transfers/FxFulfilService.js',
'src/models/position/batch.js',
'src/models/fxTransfer/**',
'src/shared/fspiopErrorFactory.js'
'src/shared/fspiopErrorFactory.js',
'src/lib/proxyCache.js' # todo: remove this line after adding test coverage
]
## todo: increase test coverage before merging feat/fx-impl to main branch
3 changes: 2 additions & 1 deletion audit-ci.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"GHSA-g64q-3vg8-8f93", // https://github.com/advisories/GHSA-g64q-3vg8-8f93
"GHSA-mg85-8mv5-ffjr", // https://github.com/advisories/GHSA-mg85-8mv5-ffjr
"GHSA-8hc4-vh64-cxmj", // https://github.com/advisories/GHSA-8hc4-vh64-cxmj
"GHSA-952p-6rrq-rcjv" // https://github.com/advisories/GHSA-952p-6rrq-rcjv
"GHSA-952p-6rrq-rcjv", // https://github.com/advisories/GHSA-952p-6rrq-rcjv
"GHSA-9wv6-86v2-598j" // https://github.com/advisories/GHSA-9wv6-86v2-598j
]
}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
"version": "17.8.0-snapshot.16",
"version": "17.8.0-snapshot.27",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
Expand Down Expand Up @@ -63,7 +63,7 @@
"migrate:current": "npx knex migrate:currentVersion $npm_package_config_knex",
"seed:run": "npx knex seed:run $npm_package_config_knex",
"docker:build": "docker build --build-arg NODE_VERSION=\"$(cat .nvmrc)-alpine\" -t mojaloop/central-ledger:local .",
"docker:up": ". ./docker/env.sh && docker-compose -f docker-compose.yml up",
"docker:up": ". ./docker/env.sh && docker-compose -f docker-compose.yml up -d",
"docker:up:backend": "docker-compose up -d ml-api-adapter mysql mockserver kafka kowl temp_curl",
"docker:up:int": "docker compose up -d kafka init-kafka objstore mysql",
"docker:script:populateTestData": "sh ./test/util/scripts/populateTestData.sh",
Expand Down
14 changes: 8 additions & 6 deletions src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ const processPositionFulfilBin = async (
// Find out the first item to be processed
const positionChangeIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone)
const positionChangeToBeProcessed = cyrilResult.positionChanges[positionChangeIndex]
let transferStateIdCopy
if (positionChangeToBeProcessed.isFxTransferStateChange) {
const { participantPositionChange, fxTransferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChangeFx(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.commitRequestId, accumulatedPositionReservedValue)
transferStateIdCopy = transferStateId
runningPosition = updatedRunningPosition
participantPositionChanges.push(participantPositionChange)
fxTransferStateChanges.push(fxTransferStateChange)
Expand All @@ -76,6 +78,7 @@ const processPositionFulfilBin = async (
} else {
const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChange(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.transferId, accumulatedPositionReservedValue)
transferStateIdCopy = transferStateId
runningPosition = updatedRunningPosition
participantPositionChanges.push(participantPositionChange)
transferStateChanges.push(transferStateChange)
Expand All @@ -86,29 +89,27 @@ const processPositionFulfilBin = async (
const nextIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone)
if (nextIndex === -1) {
// All position changes are done
const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers)
const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateIdCopy)
resultMessages.push({ binItem, message: resultMessage })
} else {
// There are still position changes to be processed
// Send position-commit kafka message again for the next item
const participantCurrencyId = cyrilResult.positionChanges[nextIndex].participantCurrencyId
const followupMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers)
const followupMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateIdCopy)
// Pass down the context to the followup message with mutated cyrilResult
followupMessage.content.context = binItem.message.value.content.context
followupMessages.push({ binItem, messageKey: participantCurrencyId.toString(), message: followupMessage })
}
} else {
const transferAmount = transferInfoList[transferId].amount

const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers)

const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChange(runningPosition, transferAmount, transferId, accumulatedPositionReservedValue)
runningPosition = updatedRunningPosition
binItem.result = { success: true }
participantPositionChanges.push(participantPositionChange)
transferStateChanges.push(transferStateChange)
accumulatedTransferStatesCopy[transferId] = transferStateId
const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateId)
resultMessages.push({ binItem, message: resultMessage })
}
}
Expand Down Expand Up @@ -165,7 +166,7 @@ const _handleIncorrectTransferState = (binItem, payeeFsp, transferId, accumulate
)
}

const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers) => {
const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers, transferStateId) => {
// forward same headers from the prepare message, except the content-length header
const headers = { ...binItem.message.value.content.headers }
delete headers['content-length']
Expand Down Expand Up @@ -197,6 +198,7 @@ const _constructTransferFulfilResultMessage = (binItem, transferId, payerFsp, pa
resultMessage.content.payload = TransferObjectTransform.toFulfil(
reservedActionTransfers[transferId]
)
resultMessage.content.payload.transferState = transferStateId
}
return resultMessage
}
Expand Down
13 changes: 9 additions & 4 deletions src/handlers/transfers/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ const Participant = require('../../domain/participant')
const createRemittanceEntity = require('./createRemittanceEntity')
const Validator = require('./validator')
const dto = require('./dto')
const TransferService = require('#src/domain/transfer/index')
const ProxyCache = require('#src/lib/proxyCache')
const FxTransferService = require('#src/domain/fx/index')
const TransferService = require('../../domain/transfer/index')
const ProxyCache = require('../../lib/proxyCache')
const FxTransferService = require('../../domain/fx/index')

const { Kafka, Comparators } = Util
const { TransferState } = Enum.Transfers
Expand Down Expand Up @@ -415,10 +415,14 @@ const prepare = async (error, messages) => {
}
if (proxyEnabled) {
const [initiatingFsp, counterPartyFsp] = isFx ? [payload.initiatingFsp, payload.counterPartyFsp] : [payload.payerFsp, payload.payeeFsp]

const payeeFspLookupOptions = isFx ? null : { validateCurrencyAccounts: true, accounts: [{ currency: payload.amount.currency, accountType: Enum.Accounts.LedgerAccountType.POSITION }] }

;[proxyObligation.initiatingFspProxyOrParticipantId, proxyObligation.counterPartyFspProxyOrParticipantId] = await Promise.all([
ProxyCache.getFSPProxy(initiatingFsp),
ProxyCache.getFSPProxy(counterPartyFsp)
ProxyCache.getFSPProxy(counterPartyFsp, payeeFspLookupOptions)
])

logger.debug('Prepare proxy cache lookup results', {
initiatingFsp,
counterPartyFsp,
Expand All @@ -428,6 +432,7 @@ const prepare = async (error, messages) => {

proxyObligation.isInitiatingFspProxy = !proxyObligation.initiatingFspProxyOrParticipantId.inScheme &&
proxyObligation.initiatingFspProxyOrParticipantId.proxyId !== null

proxyObligation.isCounterPartyFspProxy = !proxyObligation.counterPartyFspProxyOrParticipantId.inScheme &&
proxyObligation.counterPartyFspProxyOrParticipantId.proxyId !== null

Expand Down
29 changes: 27 additions & 2 deletions src/lib/proxyCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,36 @@ const getCache = () => {
return proxyCache
}

const getFSPProxy = async (dfspId) => {
/**
* Get the proxy details for the given dfspId
*
* @param {*} dfspId
* @param {*} options - { validateCurrencyAccounts: boolean, accounts: [ { currency: string, accountType: Enum.Accounts.LedgerAccountType } ] }
* @returns {Promise<{ inScheme: boolean, proxyId: string }>}
*/
const getFSPProxy = async (dfspId, options = null) => {
logger.debug('Checking if dfspId is in scheme or proxy', { dfspId })
const participant = await ParticipantService.getByName(dfspId)
let inScheme = !!participant

if (inScheme && options?.validateCurrencyAccounts) {
logger.debug('Checking if participant currency accounts are active', { dfspId, options, participant })
let accountsAreActive = false
for (const account of options.accounts) {
accountsAreActive = participant.currencyList.some((currAccount) => {
return (
currAccount.currencyId === account.currency &&
currAccount.ledgerAccountTypeId === account.accountType &&
currAccount.isActive === 1
)
})
if (!accountsAreActive) break
}
inScheme = accountsAreActive
}

return {
inScheme: !!participant,
inScheme,
proxyId: !participant ? await getCache().lookupProxyByDfspId(dfspId) : null
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/models/transfer/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -1398,11 +1398,9 @@ const getTransferParticipant = async (participantName, transferId) => {
.where({
'participant.name': participantName,
'tp.transferId': transferId,
'participant.isActive': 1,
'pc.isActive': 1
'participant.isActive': 1
})
.innerJoin('participantCurrency AS pc', 'pc.participantId', 'participant.participantId')
.innerJoin('transferParticipant AS tp', 'tp.participantCurrencyId', 'pc.participantCurrencyId')
.innerJoin('transferParticipant AS tp', 'tp.participantId', 'participant.participantId')
.select(
'tp.*'
)
Expand Down
13 changes: 4 additions & 9 deletions test/unit/models/transfer/facade.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2689,18 +2689,15 @@ Test('Transfer facade', async (transferFacadeTest) => {
const participantName = 'fsp1'
const transferId = '88416f4c-68a3-4819-b8e0-c23b27267cd5'
const builderStub = sandbox.stub()
const participantCurrencyStub = sandbox.stub()
const transferParticipantStub = sandbox.stub()
const selectStub = sandbox.stub()

builderStub.where = sandbox.stub()
Db.participant.query.callsArgWith(0, builderStub)

builderStub.where.returns({
innerJoin: participantCurrencyStub.returns({
innerJoin: transferParticipantStub.returns({
select: selectStub.returns([1])
})
innerJoin: transferParticipantStub.returns({
select: selectStub.returns([1])
})
})

Expand All @@ -2709,11 +2706,9 @@ Test('Transfer facade', async (transferFacadeTest) => {
test.ok(builderStub.where.withArgs({
'participant.name': participantName,
'tp.transferId': transferId,
'participant.isActive': 1,
'pc.isActive': 1
'participant.isActive': 1
}).calledOnce, 'query builder called once')
test.ok(participantCurrencyStub.withArgs('participantCurrency AS pc', 'pc.participantId', 'participant.participantId').calledOnce, 'participantCurrency inner joined')
test.ok(transferParticipantStub.withArgs('transferParticipant AS tp', 'tp.participantCurrencyId', 'pc.participantCurrencyId').calledOnce, 'transferParticipant inner joined')
test.ok(transferParticipantStub.withArgs('transferParticipant AS tp', 'tp.participantId', 'participant.participantId').calledOnce, 'transferParticipant inner joined')
test.ok(selectStub.withArgs(
'tp.*'
).calledOnce, 'select all columns from transferParticipant')
Expand Down