Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csi-650): updated transferTimeout handler to take into account externalParticipant #1107

Merged
merged 9 commits into from
Sep 17, 2024
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.7.6",
"@mojaloop/central-services-shared": "18.8.0",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
Expand Down Expand Up @@ -132,7 +132,7 @@
"get-port": "5.1.1",
"jsdoc": "4.0.3",
"jsonpath": "1.1.1",
"nodemon": "3.1.4",
"nodemon": "3.1.5",
"npm-check-updates": "17.1.1",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
Expand Down
151 changes: 102 additions & 49 deletions src/handlers/timeouts/handler.js

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions src/handlers/transfers/FxFulfilService.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class FxFulfilService {
}

async getFxTransferDetails(commitRequestId, functionality) {
const transfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId)
const fxTransfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId)

if (!transfer) {
if (!fxTransfer) {
const fspiopError = fspiopErrorFactory.fxTransferNotFound()
const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING)
const eventDetail = {
Expand All @@ -72,8 +72,8 @@ class FxFulfilService {
throw fspiopError
}

this.log.debug('fxTransfer is found', { transfer })
return transfer
this.log.debug('fxTransfer is found', { fxTransfer })
return fxTransfer
}

async validateHeaders({ transfer, headers, payload }) {
Expand Down Expand Up @@ -302,12 +302,13 @@ class FxFulfilService {
const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING)
const eventDetail = {
functionality: Type.POSITION,
action
action // FX_ABORT
}
this.log.warn('FX_ABORT case', { eventDetail, apiFSPIOPError })

await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, action, apiFSPIOPError)
const cyrilResult = await this.cyril.processFxAbortMessage(transfer.commitRequestId)
// todo: add externalParticipantId to the message here?

this.params.message.value.content.context = {
...this.params.message.value.content.context,
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/transfers/createRemittanceEntity.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ const createRemittanceEntity = (isFx) => {
},

/**
* A determiningTransferCheckResult.
* @typedef {Object} DeterminingTransferCheckResult
*
* @property {boolean} determiningTransferExists - Indicates if the determining transfer exists.
* @property {Array<{participantName, currencyId}>} participantCurrencyValidationList - List of validations for participant currencies.
* @property {Object} [transferRecord] - Determining transfer for the FX transfer (optional).
Expand Down
1 change: 1 addition & 0 deletions src/lib/proxyCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const getCache = () => {

/**
* @typedef {Object} ProxyOrParticipant - An object containing the inScheme status, proxyId and FSP name
*
* @property {boolean} inScheme - Is FSP in the scheme.
* @property {string|null} proxyId - Proxy, associated with the FSP, if FSP is not in the scheme.
* @property {string} name - FSP name.
Expand Down
13 changes: 6 additions & 7 deletions src/models/fxTransfer/fxTransfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const { TABLE_NAMES } = require('../../shared/constants')
const Db = require('../../lib/db')
const participant = require('../participant/facade')
const ParticipantCachedModel = require('../participant/participantCached')
const externalParticipantModel = require('../participant/externalParticipant')
const TransferExtensionModel = require('./fxTransferExtension')

const { TransferInternalState } = Enum.Transfers
Expand Down Expand Up @@ -196,6 +195,7 @@ const getAllDetailsByCommitRequestIdForProxiedFxTransfer = async (commitRequestI
return transferResult
})
} catch (err) {
logger.warn('error in getAllDetailsByCommitRequestIdForProxiedFxTransfer', err)
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}
Expand Down Expand Up @@ -272,8 +272,8 @@ const savePreparedRequest = async (
ledgerEntryTypeId: Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE
}
if (proxyObligation.isInitiatingFspProxy) {
initiatingParticipantRecord.externalParticipantId = await externalParticipantModel
.getIdByNameOrCreate(proxyObligation.initiatingFspProxyOrParticipantId)
initiatingParticipantRecord.externalParticipantId = await participant
.getExternalParticipantIdByNameOrCreate(proxyObligation.initiatingFspProxyOrParticipantId)
}

const counterPartyParticipantRecord1 = {
Expand All @@ -286,8 +286,8 @@ const savePreparedRequest = async (
ledgerEntryTypeId: Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE
}
if (proxyObligation.isCounterPartyFspProxy) {
counterPartyParticipantRecord1.externalParticipantId = await externalParticipantModel
.getIdByNameOrCreate(proxyObligation.counterPartyFspProxyOrParticipantId)
counterPartyParticipantRecord1.externalParticipantId = await participant
.getExternalParticipantIdByNameOrCreate(proxyObligation.counterPartyFspProxyOrParticipantId)
}

let counterPartyParticipantRecord2 = null
Expand Down Expand Up @@ -377,7 +377,6 @@ const savePreparedRequest = async (
}
}

// todo: clarify this code
const saveFxFulfilResponse = async (commitRequestId, payload, action, fspiopError) => {
const histTimerSaveFulfilResponseEnd = Metrics.getHistogram(
'fx_model_transfer',
Expand Down Expand Up @@ -562,10 +561,10 @@ module.exports = {
getByDeterminingTransferId,
getByIdLight,
getAllDetailsByCommitRequestId,
getAllDetailsByCommitRequestIdForProxiedFxTransfer,
getFxTransferParticipant,
savePreparedRequest,
saveFxFulfilResponse,
saveFxTransfer,
getAllDetailsByCommitRequestIdForProxiedFxTransfer,
updateFxPrepareReservedForwarded
}
33 changes: 14 additions & 19 deletions src/models/participant/externalParticipant.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,24 @@ const create = async ({ name, proxyId }) => {
return result
} catch (err) {
log.error('error in create', err)
// If the cache is not up-to-date, then will get an error when inserting a record and that record already exists
// reload the cache at that point.
// todo: to implement above requirement, we need to detect duplication restriction error, and don't rethrow error
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

// const getAll = async (options = {}) => {
// try {
// const result = await Db.from(TABLE).find({}, options)
// log.debug('getAll result:', { result })
// return result
// } catch (err) {
// log.error('error in getAll:', err)
// throw ErrorHandler.Factory.reformatFSPIOPError(err)
// }
// }

const getOneBy = async (criteria, options) => {
try {
const result = await Db.from(TABLE).findOne(criteria, options)
Expand All @@ -80,24 +94,6 @@ const getOneByNameCached = async (name, options = {}) => {
return data
}

const getIdByNameOrCreate = async ({ name, proxyId }) => {
try {
let dfsp = await getOneByNameCached(name)
if (!dfsp) {
await create({ name, proxyId })
// todo: check if create returns id (to avoid getOneByNameCached call)
dfsp = await getOneByNameCached(name)
}
const id = dfsp?.[ID_FIELD]
log.verbose('getIdByNameOrCreate result:', { id, name })
return id
} catch (err) {
log.child({ name, proxyId }).warn('error in getIdByNameOrCreate:', err)
return null
// todo: think, if we need to rethrow an error here?
}
}

const destroyBy = async (criteria) => {
try {
const result = await Db.from(TABLE).destroy(criteria)
Expand All @@ -114,7 +110,6 @@ const destroyByName = async (name) => destroyBy({ name })
// todo: think, if we need update method
module.exports = {
create,
getIdByNameOrCreate,
getOneByNameCached,
getOneByName,
getOneById,
Expand Down
36 changes: 33 additions & 3 deletions src/models/participant/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@
* @module src/models/participant/facade/
*/

const Db = require('../../lib/db')
const Time = require('@mojaloop/central-services-shared').Util.Time
const { Enum } = require('@mojaloop/central-services-shared')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Metrics = require('@mojaloop/central-services-metrics')

const Db = require('../../lib/db')
const Cache = require('../../lib/cache')
const ParticipantModelCached = require('../../models/participant/participantCached')
const ParticipantCurrencyModelCached = require('../../models/participant/participantCurrencyCached')
const ParticipantLimitCached = require('../../models/participant/participantLimitCached')
const externalParticipant = require('../../models/participant/externalParticipant')
const Config = require('../../lib/config')
const SettlementModelModel = require('../settlement/settlementModel')
const { Enum } = require('@mojaloop/central-services-shared')
const { logger } = require('../../shared/logger')

const getByNameAndCurrency = async (name, currencyId, ledgerAccountTypeId, isCurrencyActive) => {
const histTimerParticipantGetByNameAndCurrencyEnd = Metrics.getHistogram(
Expand Down Expand Up @@ -773,6 +776,32 @@ const getAllNonHubParticipantsWithCurrencies = async (trx) => {
}
}

const getExternalParticipantIdByNameOrCreate = async ({ name, proxyId }) => {
try {
let external = await externalParticipant.getOneByNameCached(name)
if (!external) {
const proxy = await ParticipantModelCached.getByName(proxyId)
if (!proxy) {
throw new Error(`Proxy participant not found: ${proxyId}`)
}
await externalParticipant.create({
name,
proxyId: proxy.participantId
})
// todo: - check if create returns id (to avoid getOneByNameCached call)
// - if isCreated === false, re-load all external participants cache
external = await externalParticipant.getOneByNameCached(name)
}
const id = external?.externalParticipantId
logger.verbose('getExternalParticipantIdByNameOrCreate result:', { id, name })
return id
} catch (err) {
logger.child({ name, proxyId }).warn('error in getExternalParticipantIdByNameOrCreate:', err)
return null
// todo: think, if we need to rethrow an error here?
}
}

module.exports = {
addHubAccountAndInitPosition,
getByNameAndCurrency,
Expand All @@ -789,5 +818,6 @@ module.exports = {
getParticipantLimitsByParticipantId,
getAllAccountsByNameAndCurrency,
getLimitsForAllParticipants,
getAllNonHubParticipantsWithCurrencies
getAllNonHubParticipantsWithCurrencies,
getExternalParticipantIdByNameOrCreate
}
Loading