Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mojaloop/#3498): alter message keys for prepare, fulfil and timeout #965

Merged
merged 18 commits into from
Sep 4, 2023
Merged
13 changes: 6 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,17 @@ services:
retries: 10
interval: 30s

kowl:
image: quay.io/cloudhut/kowl:v1.4.0
container_name: cl_kowl
## Debug utilities
kafka-debug-console:
# image: quay.io/cloudhut/kowl:v1.4.0
image: docker.redpanda.com/redpandadata/console:latest
deploy:
replicas: 1
restart: on-failure
hostname: kowl
hostname: kafka-debug-console
ports:
- "8080:8080"
- "9080:8080"
networks:
- cl-mojaloop-net
environment:
- KAFKA_BROKERS=kafka:29092
depends_on:
- kafka
6 changes: 4 additions & 2 deletions src/handlers/timeouts/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ const timeout = async () => {
} else if (result[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
message.metadata.event.type = Enum.Events.Event.Type.POSITION
message.metadata.event.action = Enum.Events.Event.Action.TIMEOUT_RESERVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.POSITION, Enum.Events.Event.Action.TIMEOUT_RESERVED, message, state, result[i].payerFsp, span)
// Key position timeouts with payer account id
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.POSITION, Enum.Events.Event.Action.TIMEOUT_RESERVED, message, state, result[i].payerParticipantCurrencyId?.toString(), span)
}
} else { // individual transfer from a bulk
if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
Expand All @@ -108,7 +109,8 @@ const timeout = async () => {
} else if (result[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
message.metadata.event.type = Enum.Events.Event.Type.POSITION
message.metadata.event.action = Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.POSITION, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED, message, state, result[i].payerFsp, span)
// Key position timeouts with payer account id
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.POSITION, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED, message, state, result[i].payerParticipantCurrencyId?.toString(), span)
}
}
} catch (err) {
Expand Down
40 changes: 24 additions & 16 deletions src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ const Config = require('../../lib/config')
const decodePayload = Util.StreamingProtocol.decodePayload
const Comparators = require('@mojaloop/central-services-shared').Util.Comparators
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Participant = require('../../domain/participant')

const consumerCommit = true
const fromSwitch = true
const toDestination = true

/**
* @function TransferPrepareHandler
Expand Down Expand Up @@ -198,7 +198,9 @@ const prepare = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `positionTopic1--${actionLetter}7`))
functionality = TransferEventType.POSITION
const eventDetail = { functionality, action }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, toDestination })
// Key position prepare message with payer account id
const payerAccount = await Participant.getAccountByNameAndCurrency(payload.payerFsp, payload.amount.currency, Enum.Accounts.LedgerAccountType.POSITION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this particular case here(happy path), if we want to save a DB query, the participant account id is already fetched in saveTransferPrepared of facade. You can pass it down to access here. But as long as the query is cached, I don't think its a must. @mdebarros What do you suggest?

Copy link
Member

@mdebarros mdebarros Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets create a story for this. The reason I say that is because this in practice shouldn't be an issue as long as caching is enabled.

We should definitely look at such improvements going forward. A dedicated story may also give us the opportunities to reduce other similar duplicate queries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created a story to optimize duplicated queries for participant currency id with a clause to investigate other instances of duplicated queries (still updating the story, just a placeholder) mojaloop/project#3515

await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, messageKey: payerAccount.participantCurrencyId.toString() })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else {
Expand Down Expand Up @@ -372,10 +374,6 @@ const fulfil = async (error, messages) => {

const apiFSPIOPError = fspiopError.toApiErrorObject(Config.ERROR_HANDLING)

// Overriding global boolean declaration with a string value for local as we should handle notifications only to FSPs involved with this transfer
const toPayerDestination = transfer.payerFsp
const toPayeeDestination = transfer.payeeFsp

// Set the event details to map to an ABORT_VALIDATION event targeted to the Position Handler
const eventDetail = { functionality: TransferEventType.POSITION, action: TransferEventAction.ABORT_VALIDATION }

Expand All @@ -390,7 +388,9 @@ const fulfil = async (error, messages) => {
*/

// Publish message to Position Handler
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, fromSwitch, toDestination: toPayerDestination })
// Key position abort with payer account id
const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, fromSwitch, messageKey: payerAccount.participantCurrencyId.toString() })

/**
* Send patch notification callback to original payee fsp if they asked for a a patch response.
Expand Down Expand Up @@ -420,7 +420,7 @@ const fulfil = async (error, messages) => {
}
}
message.value.content.payload = reservedAbortedPayload
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail: reserveAbortedEventDetail, toDestination: toPayeeDestination, fromSwitch: true })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail: reserveAbortedEventDetail, fromSwitch: true })
}

throw apiFSPIOPError
Expand All @@ -446,7 +446,7 @@ const fulfil = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, 'handleResend'))

// This is a duplicate message for a transfer that is already in a finalized state
// respond as if we recieved a GET /transfers/{ID} from the client
// respond as if we received a GET /transfers/{ID} from the client
if (transferStateEnum === TransferState.COMMITTED || transferStateEnum === TransferState.ABORTED) {
message.value.content.payload = TransferObjectTransform.toFulfil(transfer)
const eventDetail = { functionality, action }
Expand Down Expand Up @@ -555,7 +555,9 @@ const fulfil = async (error, messages) => {
/**
* TODO: BulkProcessingHandler (not in scope of #967) The individual transfer is ABORTED by notification is never sent.
*/
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, toDestination })
// Key position validation abort with payer account id
const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, messageKey: payerAccount.participantCurrencyId.toString() })

// emit an extra message - RESERVED_ABORTED if action === TransferEventAction.RESERVE
if (action === TransferEventAction.RESERVE) {
Expand Down Expand Up @@ -585,7 +587,7 @@ const fulfil = async (error, messages) => {
}
}
message.value.content.payload = reservedAbortedPayload
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, toDestination: transfer.payeeFsp, fromSwitch: true })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch: true })
}
throw fspiopError
}
Expand Down Expand Up @@ -613,7 +615,7 @@ const fulfil = async (error, messages) => {
transferState: TransferState.ABORTED
}
message.value.content.payload = reservedAbortedPayload
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, toDestination: transfer.payeeFsp, fromSwitch: true })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch: true })
}
throw fspiopError
}
Expand Down Expand Up @@ -641,7 +643,7 @@ const fulfil = async (error, messages) => {
transferState: TransferState.ABORTED
}
message.value.content.payload = reservedAbortedPayload
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, toDestination: transfer.payeeFsp, fromSwitch: true })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch: true })
}
throw fspiopError
}
Expand All @@ -655,7 +657,9 @@ const fulfil = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `positionTopic2--${actionLetter}12`))
await TransferService.handlePayeeResponse(transferId, payload, action)
const eventDetail = { functionality: TransferEventType.POSITION, action }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, toDestination })
// Key position fulfil message with payee account id
const payeeAccount = await Participant.getAccountByNameAndCurrency(transfer.payeeFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, messageKey: payeeAccount.participantCurrencyId.toString() })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
}
Expand Down Expand Up @@ -686,12 +690,16 @@ const fulfil = async (error, messages) => {
fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, 'API specification undefined errorCode')
await TransferService.handlePayeeResponse(transferId, payload, action, fspiopError.toApiErrorObject(Config.ERROR_HANDLING))
const eventDetail = { functionality: TransferEventType.POSITION, action }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, toDestination })
// Key position abort with payer account id
const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, messageKey: payerAccount.participantCurrencyId.toString() })
throw fspiopError
}
await TransferService.handlePayeeResponse(transferId, payload, action, fspiopError.toApiErrorObject(Config.ERROR_HANDLING))
const eventDetail = { functionality: TransferEventType.POSITION, action }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, toDestination })
// Key position abort with payer account id
const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, messageKey: payerAccount.participantCurrencyId.toString() })
// TODO(2556): I don't think we should emit an extra notification here
// this is the case where the Payee sent an ABORT, so we don't need to tell them to abort
throw fspiopError
Expand Down
4 changes: 2 additions & 2 deletions src/models/transfer/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,8 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
.leftJoin('bulkTransferAssociation AS bta', 'bta.transferId', 'tt.transferId')

.where('tt.expirationDate', '<', transactionTimestamp)
.select('tt.*', 'tsc.transferStateId', 'tp1.participantCurrencyId AS payerParticipantId',
'p1.name AS payerFsp', 'p2.name AS payeeFsp', 'tp2.participantCurrencyId AS payeeParticipantId',
.select('tt.*', 'tsc.transferStateId', 'tp1.participantCurrencyId AS payerParticipantCurrencyId',
'p1.name AS payerFsp', 'p2.name AS payeeFsp', 'tp2.participantCurrencyId AS payeeParticipantCurrencyId',
'bta.bulkTransferId')
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
Expand Down
114 changes: 113 additions & 1 deletion test/integration/handlers/transfers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,19 @@ Test('Handlers test', async handlersTest => {
Enum.Events.Event.Type.NOTIFICATION.toUpperCase(),
Enum.Events.Event.Action.EVENT.toUpperCase()
)
},
{
topicName: Utility.transformGeneralTopicName(
Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE,
Enum.Events.Event.Type.TRANSFER,
Enum.Events.Event.Action.POSITION
),
config: Utility.getKafkaConfig(
Config.KAFKA_CONFIG,
Enum.Kafka.Config.CONSUMER,
Enum.Events.Event.Type.TRANSFER.toUpperCase(),
Enum.Events.Event.Action.POSITION.toUpperCase()
)
}
])

Expand All @@ -386,6 +399,36 @@ Test('Handlers test', async handlersTest => {
await registerAllHandlers.end()
})

await handlersTest.test('transferPrepare should', async transferPrepare => {
await transferPrepare.test('should create position prepare message keyed with payer account id', async (test) => {
// Arrange
const td = await prepareTestData(testData)
// 1. send a PREPARE request (from Payer)
const prepareConfig = Utility.getKafkaConfig(
Config.KAFKA_CONFIG,
Enum.Kafka.Config.PRODUCER,
TransferEventType.TRANSFER.toUpperCase(),
TransferEventType.PREPARE.toUpperCase())
prepareConfig.logger = Logger
await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig)

try {
const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({
topicFilter: 'topic-transfer-position',
action: 'prepare',
keyFilter: td.payer.participantCurrencyId.toString()
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionPrepare[0], 'Position prepare message with key found')
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
}
test.end()
})

transferPrepare.end()
})

await handlersTest.test('transferFulfilReserve should', async transferFulfilReserve => {
await transferFulfilReserve.test('Does not send a RESERVED_ABORTED notification when the Payee aborts the transfer', async (test) => {
// Arrange
Expand Down Expand Up @@ -559,6 +602,18 @@ Test('Handlers test', async handlersTest => {
console.error(err)
}

try {
const positionAbort = await wrapWithRetries(() => testConsumer.getEventsForFilter({
topicFilter: 'topic-transfer-position',
action: 'timeout-reserved',
keyFilter: td.payer.participantCurrencyId.toString()
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionAbort[0], 'Position timeout reserved message with key found')
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
}

// Cleanup
testConsumer.clearEvents()
test.end()
Expand Down Expand Up @@ -714,6 +769,18 @@ Test('Handlers test', async handlersTest => {
console.error(err)
}

try {
const positionAbortValidation = await wrapWithRetries(() => testConsumer.getEventsForFilter({
topicFilter: 'topic-transfer-position',
action: 'abort-validation',
keyFilter: td.payer.participantCurrencyId.toString()
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionAbortValidation[0], 'Position abort message with key found')
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
}

const updatedTransfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId)
test.equal(updatedTransfer?.transferState, 'ABORTED_ERROR', 'Transfer is in ABORTED_ERROR state')

Expand Down Expand Up @@ -851,10 +918,25 @@ Test('Handlers test', async handlersTest => {
test.end()
})

await transferFulfilCommit.test('transfer position fulfil should be keyed with payee account id', async (test) => {
try {
const positionFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({
topicFilter: 'topic-transfer-position',
action: 'commit',
keyFilter: td.payee.participantCurrencyId.toString()
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionFulfil[0], 'Position fulfil message with key found')
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
}
test.end()
})

transferFulfilCommit.end()
})

await handlersTest.test('tranferFulfilCommit with default settlement model should', async transferFulfilCommit => {
await handlersTest.test('transferFulfilCommit with default settlement model should', async transferFulfilCommit => {
const td = await prepareTestData(testDataZAR)
await transferFulfilCommit.test('update transfer state to RESERVED by PREPARE request', async (test) => {
const config = Utility.getKafkaConfig(
Expand Down Expand Up @@ -1094,6 +1176,21 @@ Test('Handlers test', async handlersTest => {
test.end()
})

await transferAbort.test('transfer position abort should be keyed with payer account id', async (test) => {
try {
const positionAbort = await wrapWithRetries(() => testConsumer.getEventsForFilter({
topicFilter: 'topic-transfer-position',
action: 'abort',
keyFilter: td.payer.participantCurrencyId.toString()
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionAbort[0], 'Position abort message with key found')
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
}
test.end()
})

transferAbort.end()
})

Expand Down Expand Up @@ -1205,6 +1302,21 @@ Test('Handlers test', async handlersTest => {
}
})

await timeoutTest.test('transfer position timeout should be keyed with payer account id', async (test) => {
try {
const positionTimeout = await wrapWithRetries(() => testConsumer.getEventsForFilter({
topicFilter: 'topic-transfer-position',
action: 'timeout-reserved',
keyFilter: td.payer.participantCurrencyId.toString()
}), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
test.ok(positionTimeout[0], 'Position timeout message with key found')
} catch (err) {
test.notOk('Error should not be thrown')
console.error(err)
}
test.end()
})

await timeoutTest.test('position resets after a timeout', async (test) => {
// Arrange
const payerInitialPosition = td.payerLimitAndInitialPosition.participantPosition.value
Expand Down
Loading