Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mojaloop/#3489): add cached calls for participant currency, update tests #973

Merged
merged 15 commits into from
Sep 19, 2023
558 changes: 276 additions & 282 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
"@mojaloop/central-services-health": "14.0.2",
"@mojaloop/central-services-logger": "11.2.2",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.1.1",
"@mojaloop/central-services-shared": "18.1.2",
"@mojaloop/central-services-stream": "11.1.1",
"@mojaloop/event-sdk": "13.0.0",
"@mojaloop/ml-number": "11.2.3",
Expand Down Expand Up @@ -129,7 +129,7 @@
"jsdoc": "4.0.2",
"jsonpath": "1.1.1",
"nodemon": "3.0.1",
"npm-check-updates": "16.14.2",
"npm-check-updates": "16.14.4",
"nyc": "15.1.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
Expand Down
7 changes: 5 additions & 2 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@

const Logger = require('@mojaloop/central-services-logger')
const BatchPositionModel = require('../../models/position/batch')
const BatchPositionModelCached = require('../../models/position/batchCached')
const PositionPrepareDomain = require('./prepare')
const SettlementModelCached = require('../../models/settlement/settlementModelCached')
const Enum = require('@mojaloop/central-services-shared').Enum
const ErrorHandler = require('@mojaloop/central-services-error-handling')
// TODO: We may not need this if we optimize the participantLimit query
const participantFacade = require('../../models/participant/facade')
const decodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodePayload

/**
* @function processBins
Expand All @@ -52,6 +54,7 @@ const participantFacade = require('../../models/participant/facade')
const processBins = async (bins, trx) => {
const transferIdList = []
await iterateThroughBins(bins, async (_accountID, _action, item) => {
Logger.info(decodePayload(item.message.value.content.payload))
if (item.decodedPayload?.transferId) {
transferIdList.push(item.decodedPayload.transferId)
}
Expand All @@ -67,7 +70,7 @@ const processBins = async (bins, trx) => {

// Pre fetch all settlement accounts corresponding to the position accounts
// Get all participantIdMap for the accountIds
const participantCurrencyIds = await BatchPositionModel.getParticipantCurrencyIds(trx, accountIds)
const participantCurrencyIds = await BatchPositionModelCached.getParticipantCurrencyByIds(trx, accountIds)

// TODO: Validate if all the participantCurrencyIds exist for all the accountIds

Expand Down Expand Up @@ -95,7 +98,7 @@ const processBins = async (bins, trx) => {
// TODO: Verify all maps are correctly constructed

// Get all participantCurrencyIds for the participantIdMap
const allParticipantCurrencyIds = await BatchPositionModel.getParticipantCurrencyIdsByParticipantIds(trx, Object.keys(participantIdMap))
const allParticipantCurrencyIds = await BatchPositionModelCached.getParticipantCurrencyByParticipantIds(trx, Object.keys(participantIdMap))
const settlementCurrencyIds = []
allParticipantCurrencyIds.forEach(pc => {
const correspondingParticipantCurrencyId = participantIdMap[pc.participantId][pc.currencyId]
Expand Down
4 changes: 4 additions & 0 deletions src/domain/position/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const processPositionPrepareBin = async (
{ id: transfer.transferId },
'application/json'
)
binItem.result = { success: false }
// Check if payer has insufficient liquidity, produce an error message and abort transfer
} else if (availablePositionBasedOnLiquidityCover.toNumber() < transfer.amount.amount) {
transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
Expand Down Expand Up @@ -127,6 +128,7 @@ const processPositionPrepareBin = async (
{ id: transfer.transferId },
'application/json'
)
binItem.result = { success: false }
// Check if payer has surpassed their limit, produce an error message and abort transfer
} else if (availablePositionBasedOnPayerLimit.toNumber() < transfer.amount.amount) {
transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
Expand Down Expand Up @@ -163,6 +165,7 @@ const processPositionPrepareBin = async (
{ id: transfer.transferId },
'application/json'
)
binItem.result = { success: false }
// Payer has sufficient liquidity and limit
} else {
transferStateId = Enum.Transfers.TransferState.RESERVED
Expand Down Expand Up @@ -208,6 +211,7 @@ const processPositionPrepareBin = async (
}
participantPositionChanges.push(participantPositionChange)
Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::participantPositionChange: ${JSON.stringify(participantPositionChange)}`)
binItem.result = { success: true }
}

resultMessages.push({ binItem, message: resultMessage })
Expand Down
5 changes: 2 additions & 3 deletions src/handlers/positions/handlerBatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,15 @@ const positions = async (error, messages) => {
// 6.2. Audit Error for each message
const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err)
const state = new EventSdk.EventStateMetadata(EventSdk.EventStatusType.failed, fspiopError.apiErrorCode.code, fspiopError.apiErrorCode.message)
await BinProcessor.iterateThroughBins(bins, async (item) => {
await BinProcessor.iterateThroughBins(bins, async (_accountID, _action, item) => {
const span = item.span
await span.error(fspiopError, state)
})
histTimerEnd({ success: false })
} finally {
// Finish span for each message
await BinProcessor.iterateThroughBins(bins, async (_accountID, action, item) => {
// TODO: We need to get the success status properly for each message
item.histTimerMsgEnd({ success: true, action })
item.histTimerMsgEnd({ ...item.result, action })
const span = item.span
if (!span.isFinished) {
await span.finish()
Expand Down
17 changes: 16 additions & 1 deletion src/models/position/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ const getLatestTransferStateChangesByTransferIdList = async (trx, transfersIdLis
}
}

const getAllParticipantCurrency = async (trx) => {
const knex = Db.getKnex()
if (trx) {
const result = await knex('participantCurrency')
.transacting(trx)
.select('*')
return result
} else {
const result = await knex('participantCurrency')
.select('*')
return result
}
}

const getParticipantCurrencyIds = async (trx, accountIds) => {
const participantCurrencies = await knex('participantCurrency')
.transacting(trx)
Expand Down Expand Up @@ -140,5 +154,6 @@ module.exports = {
getParticipantCurrencyIdsByParticipantIds,
updateParticipantPosition,
bulkInsertTransferStateChanges,
bulkInsertParticipantPositionChanges
bulkInsertParticipantPositionChanges,
getAllParticipantCurrency
}
134 changes: 134 additions & 0 deletions src/models/position/batchCached.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*****
License
--------------
Copyright © 2017 Bill & Melinda Gates Foundation
The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Contributors
--------------
This is the official list of the Mojaloop project contributors for this file.
Names of the original copyright holders (individuals or organizations)
should be listed with a '*' in the first column. People who have
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets <email>.

* Gates Foundation
- Name Surname <[email protected]>

* INFITX
- Kevin Leyow <[email protected]>

--------------
******/

'use strict'

const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Cache = require('../../lib/cache')
const Metrics = require('@mojaloop/central-services-metrics')
const BatchPositionModel = require('./batch')

let cacheClient
let participantCurrencyAllCacheKey

/*
Private API
*/

const buildUnifiedParticipantCurrencyData = (allParticipantCurrency) => {
// build indexes (or indices?) - optimization for byId and byName access
const indexById = {}
const indexByParticipantId = {}

allParticipantCurrency.forEach((oneParticipantCurrency) => {
// Participant API returns Date type, but cache internals will serialize it to String
// by calling JSON.stringify(), which calls .toISOString() on a Date object.
// Let's ensure all places return same kind of String.
oneParticipantCurrency.createdDate = JSON.stringify(oneParticipantCurrency.createdDate)

// Add to indexes
if (!(oneParticipantCurrency.participantId in indexByParticipantId)) {
indexByParticipantId[oneParticipantCurrency.participantId] = []
}
indexByParticipantId[oneParticipantCurrency.participantId].push(oneParticipantCurrency)
indexById[oneParticipantCurrency.participantCurrencyId] = oneParticipantCurrency
})

// build unified structure - indexes + data
const unifiedParticipantsCurrency = {
indexById,
indexByParticipantId,
allParticipantCurrency
}
return unifiedParticipantsCurrency
}

const getParticipantCurrencyCached = async (trx) => {
const histTimer = Metrics.getHistogram(
'model_participant',
'model_getParticipantsCached - Metrics for participant model',
['success', 'queryName', 'hit']
).startTimer()
// Do we have valid participants list in the cache ?
let cachedParticipantCurrency = cacheClient.get(participantCurrencyAllCacheKey)
if (!cachedParticipantCurrency) {
const allParticipantCurrency = await BatchPositionModel.getAllParticipantCurrency(trx)
cachedParticipantCurrency = buildUnifiedParticipantCurrencyData(allParticipantCurrency)

// store in cache
cacheClient.set(participantCurrencyAllCacheKey, cachedParticipantCurrency)
histTimer({ success: true, queryName: 'model_getParticipantCurrencyCached', hit: false })
} else {
// unwrap participants list from catbox structure
cachedParticipantCurrency = cachedParticipantCurrency.item
histTimer({ success: true, queryName: 'model_getParticipantCurrencyCached', hit: true })
}
return cachedParticipantCurrency
}

/*
Public API
*/
exports.initialize = async () => {
/* Register as cache client */
const participantCurrencyCacheClientMeta = {
id: 'participantCurrency',
preloadCache: getParticipantCurrencyCached
}

cacheClient = Cache.registerCacheClient(participantCurrencyCacheClientMeta)
participantCurrencyAllCacheKey = cacheClient.createKey('participantCurrency')
}

exports.getParticipantCurrencyByIds = async (trx, participantCurrencyIds) => {
try {
let participantCurrencies = []
const cachedParticipantCurrency = await getParticipantCurrencyCached(trx)
for (const participantCurrencyId of participantCurrencyIds) {
participantCurrencies = participantCurrencies.concat(cachedParticipantCurrency.indexById[participantCurrencyId])
}
return participantCurrencies
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

exports.getParticipantCurrencyByParticipantIds = async (trx, participantIds) => {
try {
let participantCurrencies = []
const cachedParticipantCurrency = await getParticipantCurrencyCached(trx)
for (const participantId of participantIds) {
participantCurrencies = participantCurrencies.concat(cachedParticipantCurrency.indexByParticipantId[participantId])
}
return participantCurrencies
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}
2 changes: 2 additions & 0 deletions src/shared/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const EnumCached = require('../lib/enumCached')
const ParticipantCached = require('../models/participant/participantCached')
const ParticipantCurrencyCached = require('../models/participant/participantCurrencyCached')
const ParticipantLimitCached = require('../models/participant/participantLimitCached')
const BatchPositionModelCached = require('../models/position/batchCached')
const MongoUriBuilder = require('mongo-uri-builder')

const migrate = (runMigrations) => {
Expand Down Expand Up @@ -234,6 +235,7 @@ const initializeCache = async () => {
await ParticipantCached.initialize()
await ParticipantCurrencyCached.initialize()
await ParticipantLimitCached.initialize()
await BatchPositionModelCached.initialize()
await Cache.initCache()
}

Expand Down
6 changes: 4 additions & 2 deletions test/unit/domain/position/binProcessor-copy.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const Test = require('tapes')(require('tape'))
const Enum = require('@mojaloop/central-services-shared').Enum
const BinProcessor = require('../../../../src/domain/position/binProcessor')
const BatchPositionModel = require('../../../../src/models/position/batch')
const BatchPositionModelCached = require('../../../../src/models/position/batchCached')
const SettlementModelCached = require('../../../../src/models/settlement/settlementModelCached')
const participantFacade = require('../../../../src/models/participant/facade')
const sampleBins = require('./sampleBins')
Expand Down Expand Up @@ -69,6 +70,7 @@ Test('BinProcessor', async (binProcessorTest) => {
binProcessorTest.beforeEach(async test => {
sandbox = Sinon.createSandbox()
sandbox.stub(BatchPositionModel)
sandbox.stub(BatchPositionModelCached)
sandbox.stub(SettlementModelCached)
sandbox.stub(participantFacade)

Expand All @@ -79,7 +81,7 @@ Test('BinProcessor', async (binProcessorTest) => {
...fulfillTransfersStates
})

BatchPositionModel.getParticipantCurrencyIds.returns([
BatchPositionModelCached.getParticipantCurrencyByIds.returns([
{
participantCurrencyId: 7,
participantId: 2,
Expand Down Expand Up @@ -159,7 +161,7 @@ Test('BinProcessor', async (binProcessorTest) => {
}
])

BatchPositionModel.getParticipantCurrencyIdsByParticipantIds.returns([
BatchPositionModelCached.getParticipantCurrencyByParticipantIds.returns([
{
participantCurrencyId: 9,
participantId: 2,
Expand Down
6 changes: 4 additions & 2 deletions test/unit/domain/position/binProcessor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const Enum = require('@mojaloop/central-services-shared').Enum
const BinProcessor = require('../../../../src/domain/position/binProcessor')
const PositionPrepareDomain = require('../../../../src/domain/position/prepare')
const BatchPositionModel = require('../../../../src/models/position/batch')
const BatchPositionModelCached = require('../../../../src/models/position/batchCached')
const SettlementModelCached = require('../../../../src/models/settlement/settlementModelCached')
const participantFacade = require('../../../../src/models/participant/facade')
const sampleBins = require('./sampleBins')
Expand Down Expand Up @@ -70,6 +71,7 @@ Test('BinProcessor', async (binProcessorTest) => {
binProcessorTest.beforeEach(async test => {
sandbox = Sinon.createSandbox()
sandbox.stub(BatchPositionModel)
sandbox.stub(BatchPositionModelCached)
sandbox.stub(SettlementModelCached)
sandbox.stub(participantFacade)

Expand All @@ -80,7 +82,7 @@ Test('BinProcessor', async (binProcessorTest) => {
...fulfillTransfersStates
})

BatchPositionModel.getParticipantCurrencyIds.returns([
BatchPositionModelCached.getParticipantCurrencyByIds.returns([
{
participantCurrencyId: 7,
participantId: 2,
Expand Down Expand Up @@ -160,7 +162,7 @@ Test('BinProcessor', async (binProcessorTest) => {
}
])

BatchPositionModel.getParticipantCurrencyIdsByParticipantIds.returns([
BatchPositionModelCached.getParticipantCurrencyByParticipantIds.returns([
{
participantCurrencyId: 9,
participantId: 2,
Expand Down
Loading