Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mojaloop/#3998): proxy obligation tracking for position changes #1064

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ FROM node:${NODE_VERSION} as builder
WORKDIR /opt/app

RUN apk --no-cache add git
RUN apk add --no-cache -t build-dependencies make gcc g++ python3 libtool openssl-dev autoconf automake bash \
RUN apk add --no-cache -t build-dependencies make gcc g++ python3 py3-setuptools libtool openssl-dev autoconf automake bash \
&& cd $(npm root -g)/npm \
&& npm install -g node-gyp

Expand Down
28 changes: 15 additions & 13 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ services:
retries: 10
start_period: 40s
interval: 30s

redis:
image: redis:6.2.4-alpine
restart: "unless-stopped"
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT=6379
- REDIS_REPLICATION_MODE=master
- REDIS_TLS_ENABLED=no
healthcheck:
test: ["CMD", "redis-cli", "ping"]
ports:
- "6379:6379"
networks:
- cl-mojaloop-net

mockserver:
image: jamesdbloom/mockserver
Expand Down Expand Up @@ -219,16 +234,3 @@ services:
- cl-mojaloop-net
environment:
- KAFKA_BROKERS=kafka:29092

redis:
image: redis:6.2.4-alpine
restart: "unless-stopped"
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT=6379
- REDIS_REPLICATION_MODE=master
- REDIS_TLS_ENABLED=no
ports:
- "6379:6379"
networks:
- cl-mojaloop-net
2 changes: 1 addition & 1 deletion docker/central-ledger/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
"enabled": true,
"type": "redis",
"proxyConfig": {
"host": "localhost",
"host": "redis",
"port": 6379
}
},
Expand Down
89 changes: 51 additions & 38 deletions src/handlers/positions/handlerBatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,20 @@ const positions = async (error, messages) => {
binId
})

const accountID = message.key.toString()

/**
* Interscheme accounting rule:
* - If the creditor and debtor are represented by the same proxy, the message key will be 0.
* In such cases, we skip position changes.
*/
if (accountID === '0') {
histTimerEnd({ success: true })
return span.finish()
}

// Assign message to account-bin by accountID and child action-bin by action
// (References to the messages to be stored in bins, no duplication of messages)
const accountID = message.key.toString()
const action = message.value.metadata.event.action
const accountBin = bins[accountID] || (bins[accountID] = {})
const actionBin = accountBin[action] || (accountBin[action] = [])
Expand All @@ -129,54 +140,56 @@ const positions = async (error, messages) => {
return span.audit(message, EventSdk.AuditEventAction.start)
}))

// Start DB Transaction
const trx = await BatchPositionModel.startDbTransaction()
// Start DB Transaction if there are any bins to process
const trx = !!Object.keys(bins).length && await BatchPositionModel.startDbTransaction()

try {
// Call Bin Processor with the list of account-bins and trx
const result = await BinProcessor.processBins(bins, trx)
if (trx) {
// Call Bin Processor with the list of account-bins and trx
const result = await BinProcessor.processBins(bins, trx)

// If Bin Processor processed bins successfully, commit Kafka offset
// Commit the offset of last message in the array
for (const message of Object.values(lastPerPartition)) {
const params = { message, kafkaTopic: message.topic, consumer: Consumer }
// We are using Kafka.proceed() to just commit the offset of the last message in the array
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, hubName: Config.HUB_NAME })
}
// If Bin Processor processed bins successfully, commit Kafka offset
// Commit the offset of last message in the array
for (const message of Object.values(lastPerPartition)) {
const params = { message, kafkaTopic: message.topic, consumer: Consumer }
// We are using Kafka.proceed() to just commit the offset of the last message in the array
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, hubName: Config.HUB_NAME })
}

// Commit DB transaction
await trx.commit()
// Commit DB transaction
await trx.commit()

// Loop through results and produce notification messages and audit messages
await Promise.all(result.notifyMessages.map(item => {
// Produce notification message and audit message
const action = item.binItem.message?.value.metadata.event.action
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
return Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span)
}).concat(
// Loop through followup messages and produce position messages for further processing of the transfer
result.followupMessages.map(item => {
// Produce position message and audit message
// Loop through results and produce notification messages and audit messages
await Promise.all(result.notifyMessages.map(item => {
// Produce notification message and audit message
const action = item.binItem.message?.value.metadata.event.action
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
return Kafka.produceGeneralMessage(
Config.KAFKA_CONFIG,
Producer,
Enum.Events.Event.Type.POSITION,
action,
item.message,
eventStatus,
item.messageKey,
item.binItem.span,
Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT
)
})
))
return Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span)
}).concat(
// Loop through followup messages and produce position messages for further processing of the transfer
result.followupMessages.map(item => {
// Produce position message and audit message
const action = item.binItem.message?.value.metadata.event.action
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
return Kafka.produceGeneralMessage(
Config.KAFKA_CONFIG,
Producer,
Enum.Events.Event.Type.POSITION,
action,
item.message,
eventStatus,
item.messageKey,
item.binItem.span,
Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT
)
})
))
}
histTimerEnd({ success: true })
} catch (err) {
// If Bin Processor returns failure
// - Rollback DB transaction
await trx.rollback()
await trx?.rollback()

// - Audit Error for each message
const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err)
Expand Down
29 changes: 22 additions & 7 deletions src/lib/proxyCache.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,38 @@
'use strict'
const { createProxyCache } = require('@mojaloop/inter-scheme-proxy-cache-lib')
const Config = require('./config.js')
const { createProxyCache, STORAGE_TYPES } = require('@mojaloop/inter-scheme-proxy-cache-lib')
const ParticipantService = require('../../src/domain/participant')
const Config = require('./config.js')

let proxyCache

const init = async () => {
// enforce lazy connection for redis
const proxyConfig =
Config.PROXY_CACHE_CONFIG.type === STORAGE_TYPES.redis
? { ...Config.PROXY_CACHE_CONFIG.proxyConfig, lazyConnect: true }
: Config.PROXY_CACHE_CONFIG.proxyConfig

proxyCache = Object.freeze(
createProxyCache(Config.PROXY_CACHE_CONFIG.type, proxyConfig)
)
}

const connect = async () => {
return getCache().connect()
return !proxyCache?.isConnected && getCache().connect()
}

const disconnect = async () => {
return proxyCache?.isConnected && proxyCache.disconnect()
}

const reset = async () => {
await disconnect()
proxyCache = null
}

const getCache = () => {
if (!proxyCache) {
proxyCache = Object.freeze(createProxyCache(
Config.PROXY_CACHE_CONFIG.type,
Config.PROXY_CACHE_CONFIG.proxyConfig
))
init()
}
return proxyCache
}
Expand All @@ -40,6 +54,7 @@ const checkSameCreditorDebtorProxy = async (debtorDfspId, creditorDfspId) => {
}

module.exports = {
reset, // for testing
connect,
disconnect,
getCache,
Expand Down
74 changes: 71 additions & 3 deletions test/integration-override/handlers/positions/handlerBatch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const Config = require('#src/lib/config')
const ProxyCache = require('#src/lib/proxyCache')
const Db = require('@mojaloop/database-lib').Db
const Cache = require('#src/lib/cache')
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const { Producer, Consumer } = require('@mojaloop/central-services-stream').Util
const Utility = require('@mojaloop/central-services-shared').Util.Kafka
const Enum = require('@mojaloop/central-services-shared').Enum
const ParticipantHelper = require('#test/integration/helpers/participant')
Expand All @@ -58,6 +58,7 @@ const SettlementModelCached = require('#src/models/settlement/settlementModelCac
const Handlers = {
index: require('#src/handlers/register'),
positions: require('#src/handlers/positions/handler'),
positionsBatch: require('#src/handlers/positions/handlerBatch'),
transfers: require('#src/handlers/transfers/handler'),
timeouts: require('#src/handlers/timeouts/handler')
}
Expand Down Expand Up @@ -984,8 +985,14 @@ Test('Handlers test', async handlersTest => {
Enum.Kafka.Config.PRODUCER,
TransferEventType.TRANSFER.toUpperCase(),
TransferEventType.FULFIL.toUpperCase())
const positionConfig = Utility.getKafkaConfig(
Config.KAFKA_CONFIG,
Enum.Kafka.Config.PRODUCER,
TransferEventType.TRANSFER.toUpperCase(),
TransferEventType.POSITION.toUpperCase())
prepareConfig.logger = Logger
fulfilConfig.logger = Logger
positionConfig.logger = Logger

await transferPositionPrepare.test('process batch of messages with mixed keys (accountIds) and update transfer state to RESERVED', async (test) => {
// Construct test data for 10 transfers. Default object contains 10 transfers.
Expand Down Expand Up @@ -1687,6 +1694,63 @@ Test('Handlers test', async handlersTest => {
test.end()
})

await transferPositionPrepare.test('skip processing of prepare/commit message if messageKey is 0', async (test) => {
await Handlers.positionsBatch.registerPositionHandler()
const topicNameOverride = 'topic-transfer-position-batch'
const message = {
value: {
content: {},
from: 'payerFsp',
to: 'testFxp',
id: randomUUID(),
metadata: {
event: {
id: randomUUID(),
type: 'position',
action: 'prepare',
createdAt: new Date(),
state: { status: 'success', code: 0 }
},
type: 'application/json'
}
}
}
const params = {
message,
producer: Producer,
kafkaTopic: topicNameOverride,
consumer: Consumer,
decodedPayload: message.value,
span: null
}
const opts = {
consumerCommit: false,
eventDetail: { functionality: 'position', action: 'prepare' },
fromSwitch: false,
toDestination: 'payerFsp',
messageKey: '0',
topicNameOverride
}
await Utility.proceed(Config.KAFKA_CONFIG, params, opts)
await new Promise(resolve => setTimeout(resolve, 2000))

let notificationPrepareFiltered = []
try {
const notificationPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({
topicFilter: 'topic-notification-event',
action: 'perpare'
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)

notificationPrepareFiltered = notificationPrepare.filter((notification) => notification.to !== 'Hub')
test.notOk('Error should be thrown')
} catch (err) {
test.equal(notificationPrepareFiltered.length, 0, 'Notification Messages not received for transfer with accountId 0')
}

testConsumer.clearEvents()
test.end()
})

await transferPositionPrepare.test('timeout should', async timeoutTest => {
const td = await prepareTestData(testData)

Expand Down Expand Up @@ -1837,9 +1901,13 @@ Test('Handlers test', async handlersTest => {
await Db.disconnect()
assert.pass('database connection closed')
await testConsumer.destroy() // this disconnects the consumers

await Producer.disconnect()
await ProxyCache.disconnect()
await Producer.disconnect()
// Disconnect all consumers
await Promise.all(Consumer.getListOfTopics().map(async (topic) => {
Logger.info(`Disconnecting consumer for topic: ${topic}`)
return Consumer.getConsumer(topic).disconnect()
}))

if (debug) {
const elapsedTime = Math.round(((new Date()) - startTime) / 100) / 10
Expand Down
32 changes: 32 additions & 0 deletions test/unit/handlers/positions/handlerBatch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const prepareMessageValue = {
payload: {}
}
}

const commitMessageValue = {
metadata: {
event: {
Expand Down Expand Up @@ -565,6 +566,37 @@ Test('Position handler', positionBatchHandlerTest => {
}
})

positionsTest.test('skip processing if message key is 0', async test => {
// Arrange
await Consumer.createHandler(topicName, config, command)
Kafka.transformGeneralTopicName.returns(topicName)
Kafka.getKafkaConfig.returns(config)
Kafka.proceed.returns(true)
BinProcessor.processBins.resolves({
notifyMessages: [],
followupMessages: []
})

const message = {
key: '0',
value: prepareMessageValue,
topic: topicName
}

// Act
try {
await allTransferHandlers.positions(null, [message])
test.ok(BatchPositionModel.startDbTransaction.notCalled, 'startDbTransaction should not be called')
test.ok(BinProcessor.processBins.notCalled, 'processBins should not be called')
test.ok(Kafka.proceed.notCalled, 'kafkaProceed should not be called')
test.end()
} catch (err) {
Logger.info(err)
test.fail('Error should not be thrown')
test.end()
}
})

positionsTest.end()
})

Expand Down
2 changes: 1 addition & 1 deletion test/unit/handlers/transfers/prepare.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const Config = require('../../../../src/lib/config')
const fxTransferModel = require('../../../../src/models/fxTransfer')
const fxDuplicateCheck = require('../../../../src/models/fxTransfer/duplicateCheck')
const fxTransferStateChange = require('../../../../src/models/fxTransfer/stateChange')
const ProxyCache = require('#src/lib/proxyCache')
const ProxyCache = require('../../../../src/lib/proxyCache')

const { Action } = Enum.Events.Event

Expand Down
Loading