Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: position changes #1108

Merged
merged 24 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4f8d97c
fix: from argument in kafka notification for abort
vijayg10 Sep 17, 2024
fabc345
fix: position changes
vijayg10 Sep 17, 2024
6343565
fix: to number
vijayg10 Sep 17, 2024
315063a
Merge branch 'feat/fx-impl' of https://github.com/mojaloop/central-le…
vijayg10 Sep 17, 2024
db666c9
Merge branch 'fix/fx-abort-callback' into fix/position-changes
vijayg10 Sep 17, 2024
d12553c
fix: position change in timeout
vijayg10 Sep 17, 2024
13111ed
fix: related fxtransfer check
vijayg10 Sep 17, 2024
6666f28
fix: unit tests
vijayg10 Sep 17, 2024
f6ea8d3
fix: timeout
vijayg10 Sep 17, 2024
07171a6
Merge remote-tracking branch 'origin/feat/fx-impl' into fix/position-…
kleyow Sep 18, 2024
8d707c0
chore: deps
kleyow Sep 18, 2024
776fd61
fix fx-abort tests
kleyow Sep 18, 2024
6f3dbfc
fix fx-timeout tests
kleyow Sep 18, 2024
df3740d
Merge branch 'feat/fx-impl' of https://github.com/mojaloop/central-le…
vijayg10 Sep 18, 2024
d27b6ca
chore: added a comment
vijayg10 Sep 18, 2024
b2d2067
fix more tests
kleyow Sep 18, 2024
a49b1c7
fix: invalid fulfilment
vijayg10 Sep 18, 2024
485b91e
Merge branch feat/fx-impl of https://github.com/mojaloop/central-ledg…
vijayg10 Sep 18, 2024
59eba03
fix: unit test
vijayg10 Sep 18, 2024
ea2e945
chore(snapshot): 17.8.0-snapshot.28
vijayg10 Sep 18, 2024
f488ac3
chore(snapshot): 17.8.0-snapshot.29
vijayg10 Sep 18, 2024
ae3517b
fix: lint
vijayg10 Sep 18, 2024
5d3b2b4
chore(snapshot): 17.8.0-snapshot.30
vijayg10 Sep 18, 2024
9909a09
Merge branch feat/fx-impl of https://github.com/mojaloop/central-ledg…
vijayg10 Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions migrations/310404_participantPositionChange-change.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*****
License
--------------
Copyright © 2017 Bill & Melinda Gates Foundation
The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Contributors
--------------
This is the official list of the Mojaloop project contributors for this file.
Names of the original copyright holders (individuals or organizations)
should be listed with a '*' in the first column. People who have
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets <email>.
* Gates Foundation
- Name Surname <[email protected]>

* ModusBox
- Vijaya Kumar Guthi <[email protected]>
--------------
******/

'use strict'

exports.up = async (knex) => {
return await knex.schema.hasTable('participantPositionChange').then(function(exists) {
if (exists) {
return knex.schema.alterTable('participantPositionChange', (t) => {
t.decimal('change', 18, 2).notNullable()
})
}
})
}

exports.down = async (knex) => {
return await knex.schema.hasTable('participantPositionChange').then(function(exists) {
if (exists) {
return knex.schema.alterTable('participantPositionChange', (t) => {
t.dropColumn('change')
})
}
})
}
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
"version": "17.8.0-snapshot.28",
"version": "17.8.0-snapshot.31",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
Expand Down Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "v18.8.0",
"@mojaloop/central-services-shared": "18.8.0",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
Expand Down Expand Up @@ -133,7 +133,7 @@
"jsdoc": "4.0.3",
"jsonpath": "1.1.1",
"nodemon": "3.1.5",
"npm-check-updates": "17.1.1",
"npm-check-updates": "17.1.2",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
Expand Down
4 changes: 2 additions & 2 deletions src/domain/fx/cyril.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ const _getPositionChanges = async (commitRequestIdList, transferIdList) => {
commitRequestId,
notifyTo: fxRecord.initiatingFspName,
participantCurrencyId: fxPositionChange.participantCurrencyId,
amount: -fxPositionChange.value
amount: -fxPositionChange.change
})
})
}
Expand All @@ -238,7 +238,7 @@ const _getPositionChanges = async (commitRequestIdList, transferIdList) => {
transferId,
notifyTo: transferRecord.payerFsp,
participantCurrencyId: transferPositionChange.participantCurrencyId,
amount: -transferPositionChange.value
amount: -transferPositionChange.change
})
})
}
Expand Down
13 changes: 9 additions & 4 deletions src/domain/position/abort.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const processPositionAbortBin = async (
accumulatedTransferStatesCopy[positionChangeToBeProcessed.transferId] = transferStateId
}
binItem.result = { success: true }
const from = binItem.message.value.from
cyrilResult.positionChanges[positionChangeIndex].isDone = true
const nextIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone)
if (nextIndex === -1) {
Expand All @@ -91,11 +92,11 @@ const processPositionAbortBin = async (
for (const positionChange of cyrilResult.positionChanges) {
if (positionChange.isFxTransferStateChange) {
// Construct notification message for fx transfer state change
const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, Config.HUB_NAME, positionChange.notifyTo)
const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, from, positionChange.notifyTo)
resultMessages.push({ binItem, message: resultMessage })
} else {
// Construct notification message for transfer state change
const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, Config.HUB_NAME, positionChange.notifyTo)
const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, from, positionChange.notifyTo)
resultMessages.push({ binItem, message: resultMessage })
}
}
Expand Down Expand Up @@ -127,7 +128,9 @@ const processPositionAbortBin = async (

const _constructAbortResultMessage = (binItem, id, from, notifyTo) => {
let apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.PAYEE_REJECTION
if (binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.FX_ABORT_VALIDATION) {
let fromCalculated = from
if (binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.FX_ABORT_VALIDATION || binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.ABORT_VALIDATION) {
fromCalculated = Config.HUB_NAME
apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR
}
const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -153,8 +156,8 @@ const _constructAbortResultMessage = (binItem, id, from, notifyTo) => {
)
const resultMessage = Utility.StreamingProtocol.createMessage(
id,
from,
notifyTo,
fromCalculated,
metadata,
binItem.message.value.content.headers, // Headers don't really matter here. ml-api-adapter will ignore them and create their own.
fspiopError,
Expand All @@ -173,6 +176,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, trans
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand All @@ -194,6 +198,7 @@ const _handleParticipantPositionChangeFx = (runningPosition, transferAmount, com
commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId
fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand Down
2 changes: 2 additions & 0 deletions src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, trans
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand All @@ -286,6 +287,7 @@ const _handleParticipantPositionChangeFx = (runningPosition, transferAmount, com
commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId
fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand Down
3 changes: 2 additions & 1 deletion src/domain/position/fx-prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const processFxPositionPrepareBin = async (
let resultMessage
const fxTransfer = binItem.decodedPayload
const cyrilResult = binItem.message.value.content.context.cyrilResult
const transferAmount = fxTransfer.targetAmount.currency === cyrilResult.currencyId ? new MLNumber(fxTransfer.targetAmount.amount) : new MLNumber(fxTransfer.sourceAmount.amount)
const transferAmount = fxTransfer.targetAmount.currency === cyrilResult.currencyId ? fxTransfer.targetAmount.amount : fxTransfer.sourceAmount.amount

Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::transfer:processingMessage: ${JSON.stringify(fxTransfer)}`)

Expand Down Expand Up @@ -205,6 +205,7 @@ const processFxPositionPrepareBin = async (
commitRequestId: fxTransfer.commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId
fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries
value: currentPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}
participantPositionChanges.push(participantPositionChange)
Expand Down
3 changes: 2 additions & 1 deletion src/domain/position/fx-timeout-reserved.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const processPositionFxTimeoutReservedBin = async (
} else {
Logger.isDebugEnabled && Logger.debug(`accumulatedFxTransferStates: ${JSON.stringify(accumulatedFxTransferStates)}`)

const transferAmount = fetchedReservedPositionChangesByCommitRequestIds[commitRequestId][participantAccountId].value
const transferAmount = fetchedReservedPositionChangesByCommitRequestIds[commitRequestId][participantAccountId].change

// Construct payee notification message
const resultMessage = _constructFxTimeoutReservedResultMessage(
Expand Down Expand Up @@ -141,6 +141,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, commi
commitRequestId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand Down
1 change: 1 addition & 0 deletions src/domain/position/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ const processPositionPrepareBin = async (
transferId: transfer.transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: currentPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}
participantPositionChanges.push(participantPositionChange)
Expand Down
1 change: 1 addition & 0 deletions src/domain/position/timeout-reserved.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, trans
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand Down
32 changes: 29 additions & 3 deletions src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,40 @@ const processFulfilMessage = async (message, functionality, span) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorInvalidFulfilment--${actionLetter}9`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, 'invalid fulfilment')
const apiFSPIOPError = fspiopError.toApiErrorObject(Config.ERROR_HANDLING)
await TransferService.handlePayeeResponse(transferId, payload, action, apiFSPIOPError)
await TransferService.handlePayeeResponse(transferId, payload, TransferEventAction.ABORT_VALIDATION, apiFSPIOPError)
const eventDetail = { functionality: TransferEventType.POSITION, action: TransferEventAction.ABORT_VALIDATION }
/**
* TODO: BulkProcessingHandler (not in scope of #967) The individual transfer is ABORTED by notification is never sent.
*/
// Key position validation abort with payer account id
const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME })

const cyrilResult = await FxService.Cyril.processAbortMessage(transferId)

params.message.value.content.context = {
...params.message.value.content.context,
cyrilResult
}
if (cyrilResult.positionChanges.length > 0) {
const participantCurrencyId = cyrilResult.positionChanges[0].participantCurrencyId
await Kafka.proceed(
Config.KAFKA_CONFIG,
params,
{
consumerCommit,
fspiopError: apiFSPIOPError,
eventDetail,
messageKey: participantCurrencyId.toString(),
topicNameOverride: Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.ABORT,
hubName: Config.HUB_NAME
}
)
} else {
const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError('Invalid cyril result')
throw fspiopError
}

// const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
// await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME })

// emit an extra message - RESERVED_ABORTED if action === TransferEventAction.RESERVE
if (action === TransferEventAction.RESERVE) {
Expand Down
1 change: 1 addition & 0 deletions src/handlers/transfers/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ const prepare = async (error, messages) => {
if (proxyEnabled) {
const [initiatingFsp, counterPartyFsp] = isFx ? [payload.initiatingFsp, payload.counterPartyFsp] : [payload.payerFsp, payload.payeeFsp]

// TODO: We need to double check the following validation logic incase of payee side currency conversion
const payeeFspLookupOptions = isFx ? null : { validateCurrencyAccounts: true, accounts: [{ currency: payload.amount.currency, accountType: Enum.Accounts.LedgerAccountType.POSITION }] }

;[proxyObligation.initiatingFspProxyOrParticipantId, proxyObligation.counterPartyFspProxyOrParticipantId] = await Promise.all([
Expand Down
26 changes: 26 additions & 0 deletions src/handlers/transfers/validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const Config = require('../../lib/config')
const Participant = require('../../domain/participant')
const Transfer = require('../../domain/transfer')
const FxTransferModel = require('../../models/fxTransfer')
// const TransferStateChangeModel = require('../../models/transfer/transferStateChange')
const FxTransferStateChangeModel = require('../../models/fxTransfer/stateChange')
const CryptoConditions = require('../../cryptoConditions')
const Crypto = require('crypto')
const base64url = require('base64url')
Expand Down Expand Up @@ -208,6 +210,30 @@ const validatePrepare = async (payload, headers, isFx = false, determiningTransf
const initiatingFsp = isFx ? payload.initiatingFsp : payload.payerFsp
const counterPartyFsp = isFx ? payload.counterPartyFsp : payload.payeeFsp

// Check if determining transfers are failed
if (determiningTransferCheckResult.watchListRecords && determiningTransferCheckResult.watchListRecords.length > 0) {
// Iterate through determiningTransferCheckResult.watchListRecords
for (const watchListRecord of determiningTransferCheckResult.watchListRecords) {
if (isFx) {
// TODO: Check the transfer state of determiningTransferId
// const latestTransferStateChange = await TransferStateChangeModel.getByTransferId(watchListRecord.determiningTransferId)
// if (latestTransferStateChange.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED) {
// reasons.push('Related Transfer is not in reserved state')
// validationPassed = false
// return { validationPassed, reasons }
// }
} else {
// Check the transfer state of commitRequestId
const latestFxTransferStateChange = await FxTransferStateChangeModel.getByCommitRequestId(watchListRecord.commitRequestId)
if (latestFxTransferStateChange.transferStateId !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL_DEPENDENT) {
reasons.push('Related FX Transfer is not fulfilled')
validationPassed = false
return { validationPassed, reasons }
}
}
}
}

// Skip usual validation if preparing a proxy transfer or fxTransfer
if (!(proxyObligation?.isInitiatingFspProxy || proxyObligation?.isCounterPartyFspProxy)) {
validationPassed = (
Expand Down
4 changes: 3 additions & 1 deletion src/models/position/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,13 @@ const prepareChangeParticipantPositionTransaction = async (transferList) => {
const processedTransfersKeysList = Object.keys(processedTransfers)
const batchParticipantPositionChange = []
for (const keyIndex in processedTransfersKeysList) {
const { runningPosition, runningReservedValue } = processedTransfers[processedTransfersKeysList[keyIndex]]
const { transferAmount, runningPosition, runningReservedValue } = processedTransfers[processedTransfersKeysList[keyIndex]]
const participantPositionChange = {
participantPositionId: initialParticipantPosition.participantPositionId,
participantCurrencyId: participantCurrency.participantCurrencyId,
transferStateChangeId: processedTransferStateChangeIdList[keyIndex],
value: runningPosition,
change: transferAmount.toNumber(),
// processBatch: <uuid> - a single value uuid for this entire batch to make sure the set of transfers in this batch can be clearly grouped
reservedValue: runningReservedValue
}
Expand Down Expand Up @@ -294,6 +295,7 @@ const changeParticipantPositionTransaction = async (participantCurrencyId, isRev
participantCurrencyId,
transferStateChangeId: insertedTransferStateChange.transferStateChangeId,
value: latestPosition,
change: isReversal ? -amount : amount,
reservedValue: participantPosition.reservedValue,
createdDate: transactionTimestamp
}
Expand Down
Loading