Skip to content

Commit

Permalink
feat(mojaloop/#3998): proxy obligation tracking for position changes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
oderayi authored Jul 23, 2024
1 parent eaa0ce0 commit 2cc0af6
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ FROM node:${NODE_VERSION} as builder
WORKDIR /opt/app

RUN apk --no-cache add git
RUN apk add --no-cache -t build-dependencies make gcc g++ python3 libtool openssl-dev autoconf automake bash \
RUN apk add --no-cache -t build-dependencies make gcc g++ python3 py3-setuptools libtool openssl-dev autoconf automake bash \
&& cd $(npm root -g)/npm \
&& npm install -g node-gyp

Expand Down
28 changes: 15 additions & 13 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ services:
retries: 10
start_period: 40s
interval: 30s

redis:
image: redis:6.2.4-alpine
restart: "unless-stopped"
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT=6379
- REDIS_REPLICATION_MODE=master
- REDIS_TLS_ENABLED=no
healthcheck:
test: ["CMD", "redis-cli", "ping"]
ports:
- "6379:6379"
networks:
- cl-mojaloop-net

mockserver:
image: jamesdbloom/mockserver
Expand Down Expand Up @@ -219,16 +234,3 @@ services:
- cl-mojaloop-net
environment:
- KAFKA_BROKERS=kafka:29092

redis:
image: redis:6.2.4-alpine
restart: "unless-stopped"
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT=6379
- REDIS_REPLICATION_MODE=master
- REDIS_TLS_ENABLED=no
ports:
- "6379:6379"
networks:
- cl-mojaloop-net
2 changes: 1 addition & 1 deletion docker/central-ledger/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
"enabled": true,
"type": "redis",
"proxyConfig": {
"host": "localhost",
"host": "redis",
"port": 6379
}
},
Expand Down
89 changes: 51 additions & 38 deletions src/handlers/positions/handlerBatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,20 @@ const positions = async (error, messages) => {
binId
})

const accountID = message.key.toString()

/**
* Interscheme accounting rule:
* - If the creditor and debtor are represented by the same proxy, the message key will be 0.
* In such cases, we skip position changes.
*/
if (accountID === '0') {
histTimerEnd({ success: true })
return span.finish()
}

// Assign message to account-bin by accountID and child action-bin by action
// (References to the messages to be stored in bins, no duplication of messages)
const accountID = message.key.toString()
const action = message.value.metadata.event.action
const accountBin = bins[accountID] || (bins[accountID] = {})
const actionBin = accountBin[action] || (accountBin[action] = [])
Expand All @@ -129,54 +140,56 @@ const positions = async (error, messages) => {
return span.audit(message, EventSdk.AuditEventAction.start)
}))

// Start DB Transaction
const trx = await BatchPositionModel.startDbTransaction()
// Start DB Transaction if there are any bins to process
const trx = !!Object.keys(bins).length && await BatchPositionModel.startDbTransaction()

try {
// Call Bin Processor with the list of account-bins and trx
const result = await BinProcessor.processBins(bins, trx)
if (trx) {
// Call Bin Processor with the list of account-bins and trx
const result = await BinProcessor.processBins(bins, trx)

// If Bin Processor processed bins successfully, commit Kafka offset
// Commit the offset of last message in the array
for (const message of Object.values(lastPerPartition)) {
const params = { message, kafkaTopic: message.topic, consumer: Consumer }
// We are using Kafka.proceed() to just commit the offset of the last message in the array
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, hubName: Config.HUB_NAME })
}
// If Bin Processor processed bins successfully, commit Kafka offset
// Commit the offset of last message in the array
for (const message of Object.values(lastPerPartition)) {
const params = { message, kafkaTopic: message.topic, consumer: Consumer }
// We are using Kafka.proceed() to just commit the offset of the last message in the array
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, hubName: Config.HUB_NAME })
}

// Commit DB transaction
await trx.commit()
// Commit DB transaction
await trx.commit()

// Loop through results and produce notification messages and audit messages
await Promise.all(result.notifyMessages.map(item => {
// Produce notification message and audit message
const action = item.binItem.message?.value.metadata.event.action
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
return Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span)
}).concat(
// Loop through followup messages and produce position messages for further processing of the transfer
result.followupMessages.map(item => {
// Produce position message and audit message
// Loop through results and produce notification messages and audit messages
await Promise.all(result.notifyMessages.map(item => {
// Produce notification message and audit message
const action = item.binItem.message?.value.metadata.event.action
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
return Kafka.produceGeneralMessage(
Config.KAFKA_CONFIG,
Producer,
Enum.Events.Event.Type.POSITION,
action,
item.message,
eventStatus,
item.messageKey,
item.binItem.span,
Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT
)
})
))
return Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span)
}).concat(
// Loop through followup messages and produce position messages for further processing of the transfer
result.followupMessages.map(item => {
// Produce position message and audit message
const action = item.binItem.message?.value.metadata.event.action
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
return Kafka.produceGeneralMessage(
Config.KAFKA_CONFIG,
Producer,
Enum.Events.Event.Type.POSITION,
action,
item.message,
eventStatus,
item.messageKey,
item.binItem.span,
Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT
)
})
))
}
histTimerEnd({ success: true })
} catch (err) {
// If Bin Processor returns failure
// - Rollback DB transaction
await trx.rollback()
await trx?.rollback()

// - Audit Error for each message
const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err)
Expand Down
29 changes: 22 additions & 7 deletions src/lib/proxyCache.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,38 @@
'use strict'
const { createProxyCache } = require('@mojaloop/inter-scheme-proxy-cache-lib')
const Config = require('./config.js')
const { createProxyCache, STORAGE_TYPES } = require('@mojaloop/inter-scheme-proxy-cache-lib')
const ParticipantService = require('../../src/domain/participant')
const Config = require('./config.js')

let proxyCache

const init = async () => {
// enforce lazy connection for redis
const proxyConfig =
Config.PROXY_CACHE_CONFIG.type === STORAGE_TYPES.redis
? { ...Config.PROXY_CACHE_CONFIG.proxyConfig, lazyConnect: true }
: Config.PROXY_CACHE_CONFIG.proxyConfig

proxyCache = Object.freeze(
createProxyCache(Config.PROXY_CACHE_CONFIG.type, proxyConfig)
)
}

const connect = async () => {
return getCache().connect()
return !proxyCache?.isConnected && getCache().connect()
}

const disconnect = async () => {
return proxyCache?.isConnected && proxyCache.disconnect()
}

const reset = async () => {
await disconnect()
proxyCache = null
}

const getCache = () => {
if (!proxyCache) {
proxyCache = Object.freeze(createProxyCache(
Config.PROXY_CACHE_CONFIG.type,
Config.PROXY_CACHE_CONFIG.proxyConfig
))
init()
}
return proxyCache
}
Expand All @@ -40,6 +54,7 @@ const checkSameCreditorDebtorProxy = async (debtorDfspId, creditorDfspId) => {
}

module.exports = {
reset, // for testing
connect,
disconnect,
getCache,
Expand Down
74 changes: 71 additions & 3 deletions test/integration-override/handlers/positions/handlerBatch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const Config = require('#src/lib/config')
const ProxyCache = require('#src/lib/proxyCache')
const Db = require('@mojaloop/database-lib').Db
const Cache = require('#src/lib/cache')
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const { Producer, Consumer } = require('@mojaloop/central-services-stream').Util
const Utility = require('@mojaloop/central-services-shared').Util.Kafka
const Enum = require('@mojaloop/central-services-shared').Enum
const ParticipantHelper = require('#test/integration/helpers/participant')
Expand All @@ -58,6 +58,7 @@ const SettlementModelCached = require('#src/models/settlement/settlementModelCac
const Handlers = {
index: require('#src/handlers/register'),
positions: require('#src/handlers/positions/handler'),
positionsBatch: require('#src/handlers/positions/handlerBatch'),
transfers: require('#src/handlers/transfers/handler'),
timeouts: require('#src/handlers/timeouts/handler')
}
Expand Down Expand Up @@ -984,8 +985,14 @@ Test('Handlers test', async handlersTest => {
Enum.Kafka.Config.PRODUCER,
TransferEventType.TRANSFER.toUpperCase(),
TransferEventType.FULFIL.toUpperCase())
const positionConfig = Utility.getKafkaConfig(
Config.KAFKA_CONFIG,
Enum.Kafka.Config.PRODUCER,
TransferEventType.TRANSFER.toUpperCase(),
TransferEventType.POSITION.toUpperCase())
prepareConfig.logger = Logger
fulfilConfig.logger = Logger
positionConfig.logger = Logger

await transferPositionPrepare.test('process batch of messages with mixed keys (accountIds) and update transfer state to RESERVED', async (test) => {
// Construct test data for 10 transfers. Default object contains 10 transfers.
Expand Down Expand Up @@ -1687,6 +1694,63 @@ Test('Handlers test', async handlersTest => {
test.end()
})

await transferPositionPrepare.test('skip processing of prepare/commit message if messageKey is 0', async (test) => {
await Handlers.positionsBatch.registerPositionHandler()
const topicNameOverride = 'topic-transfer-position-batch'
const message = {
value: {
content: {},
from: 'payerFsp',
to: 'testFxp',
id: randomUUID(),
metadata: {
event: {
id: randomUUID(),
type: 'position',
action: 'prepare',
createdAt: new Date(),
state: { status: 'success', code: 0 }
},
type: 'application/json'
}
}
}
const params = {
message,
producer: Producer,
kafkaTopic: topicNameOverride,
consumer: Consumer,
decodedPayload: message.value,
span: null
}
const opts = {
consumerCommit: false,
eventDetail: { functionality: 'position', action: 'prepare' },
fromSwitch: false,
toDestination: 'payerFsp',
messageKey: '0',
topicNameOverride
}
await Utility.proceed(Config.KAFKA_CONFIG, params, opts)
await new Promise(resolve => setTimeout(resolve, 2000))

let notificationPrepareFiltered = []
try {
const notificationPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({
topicFilter: 'topic-notification-event',
action: 'perpare'
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)

notificationPrepareFiltered = notificationPrepare.filter((notification) => notification.to !== 'Hub')
test.notOk('Error should be thrown')
} catch (err) {
test.equal(notificationPrepareFiltered.length, 0, 'Notification Messages not received for transfer with accountId 0')
}

testConsumer.clearEvents()
test.end()
})

await transferPositionPrepare.test('timeout should', async timeoutTest => {
const td = await prepareTestData(testData)

Expand Down Expand Up @@ -1837,9 +1901,13 @@ Test('Handlers test', async handlersTest => {
await Db.disconnect()
assert.pass('database connection closed')
await testConsumer.destroy() // this disconnects the consumers

await Producer.disconnect()
await ProxyCache.disconnect()
await Producer.disconnect()
// Disconnect all consumers
await Promise.all(Consumer.getListOfTopics().map(async (topic) => {
Logger.info(`Disconnecting consumer for topic: ${topic}`)
return Consumer.getConsumer(topic).disconnect()
}))

if (debug) {
const elapsedTime = Math.round(((new Date()) - startTime) / 100) / 10
Expand Down
32 changes: 32 additions & 0 deletions test/unit/handlers/positions/handlerBatch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const prepareMessageValue = {
payload: {}
}
}

const commitMessageValue = {
metadata: {
event: {
Expand Down Expand Up @@ -565,6 +566,37 @@ Test('Position handler', positionBatchHandlerTest => {
}
})

positionsTest.test('skip processing if message key is 0', async test => {
// Arrange
await Consumer.createHandler(topicName, config, command)
Kafka.transformGeneralTopicName.returns(topicName)
Kafka.getKafkaConfig.returns(config)
Kafka.proceed.returns(true)
BinProcessor.processBins.resolves({
notifyMessages: [],
followupMessages: []
})

const message = {
key: '0',
value: prepareMessageValue,
topic: topicName
}

// Act
try {
await allTransferHandlers.positions(null, [message])
test.ok(BatchPositionModel.startDbTransaction.notCalled, 'startDbTransaction should not be called')
test.ok(BinProcessor.processBins.notCalled, 'processBins should not be called')
test.ok(Kafka.proceed.notCalled, 'kafkaProceed should not be called')
test.end()
} catch (err) {
Logger.info(err)
test.fail('Error should not be thrown')
test.end()
}
})

positionsTest.end()
})

Expand Down
2 changes: 1 addition & 1 deletion test/unit/handlers/transfers/prepare.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const Config = require('../../../../src/lib/config')
const fxTransferModel = require('../../../../src/models/fxTransfer')
const fxDuplicateCheck = require('../../../../src/models/fxTransfer/duplicateCheck')
const fxTransferStateChange = require('../../../../src/models/fxTransfer/stateChange')
const ProxyCache = require('#src/lib/proxyCache')
const ProxyCache = require('../../../../src/lib/proxyCache')

const { Action } = Enum.Events.Event

Expand Down
Loading

0 comments on commit 2cc0af6

Please sign in to comment.