Skip to content

Commit

Permalink
chore(mojaloop/#3819): update functional tests and move fulfil int te…
Browse files Browse the repository at this point in the history
…st (#1009)

* chore: update functional tests

* version

* snap

* test

* name

* func

* update

* test-function

* snap

* changes

* feat: implemented fx

* fix: unit tests

* fix: unit tests

* chore: removed fx-fulfil in non batch mode

* add back functions

* feat: refactored position fulfil handler for fx

* chore: removed fx from non batch position fulfil

* chore: removed fx references from non batch position handler

* chore: simplified existing tests

* chore: added unit tests

* fix: prepare position fx

* publish messages to batch topic

* update script

* move fxfulfil tests to batch tests

---------

Co-authored-by: Vijay <[email protected]>
  • Loading branch information
kleyow and vijayg10 authored May 2, 2024
1 parent cb9de40 commit 13c95ae
Show file tree
Hide file tree
Showing 18 changed files with 162 additions and 86 deletions.
12 changes: 6 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ ARG NODE_VERSION=lts-alpine

# NOTE: Ensure you set NODE_VERSION Build Argument as follows...
#
# export NODE_VERSION="$(cat .nvmrc)-alpine" \
# docker build \
# --build-arg NODE_VERSION=$NODE_VERSION \
# -t mojaloop/central-ledger:local \
# . \
# export NODE_VERSION="$(cat .nvmrc)-alpine"
# docker build \
# --build-arg NODE_VERSION=$NODE_VERSION \
# -t mojaloop/central-ledger:local \
# .
#

# Build Image
Expand All @@ -32,7 +32,7 @@ RUN mkdir ./logs && touch ./logs/combined.log
RUN ln -sf /dev/stdout ./logs/combined.log

# Create a non-root user: ml-user
RUN adduser -D ml-user
RUN adduser -D ml-user
USER ml-user

COPY --chown=ml-user --from=builder /opt/app .
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ It will handle docker start up, migration, service starting and testing. Be sure
If you want to run functional tests locally utilizing the [ml-core-test-harness](https://github.com/mojaloop/ml-core-test-harness), you can run the following commands:

```bash
docker build -t mojaloop/central-ledger:local .
export NODE_VERSION="$(cat .nvmrc)-alpine"
docker build \
--build-arg NODE_VERSION=$NODE_VERSION \
-t mojaloop/central-ledger:local \
.
```

```bash
Expand Down
4 changes: 3 additions & 1 deletion audit-ci.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"GHSA-w5p7-h5w8-2hfq", // tap-spec>tap-out>trim
"GHSA-p9pc-299p-vxgp", // widdershins>yargs>yargs-parser
"GHSA-f5x3-32g6-xq36", // https://github.com/advisories/GHSA-f5x3-32g6-xq36
"GHSA-cgfm-xwp7-2cvr" // https://github.com/advisories/GHSA-cgfm-xwp7-2cvr
"GHSA-cgfm-xwp7-2cvr", // https://github.com/advisories/GHSA-cgfm-xwp7-2cvr
"GHSA-ghr5-ch3p-vcr6" // https://github.com/advisories/GHSA-ghr5-ch3p-vcr6

]
}
16 changes: 16 additions & 0 deletions docker/config-modifier/configs/central-ledger.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ module.exports = {
DATABASE: 'mlos'
},
KAFKA: {
EVENT_TYPE_ACTION_TOPIC_MAP: {
POSITION: {
PREPARE: 'topic-transfer-position-batch',
BULK_PREPARE: null,
COMMIT: 'topic-transfer-position-batch',
BULK_COMMIT: null,
RESERVE: 'topic-transfer-position-batch'
}
},
CONSUMER: {
BULK: {
PREPARE: {
Expand Down Expand Up @@ -72,6 +81,13 @@ module.exports = {
'metadata.broker.list': 'kafka:29092'
}
}
},
POSITION_BATCH: {
config: {
rdkafkaConf: {
'metadata.broker.list': 'kafka:29092'
}
}
}
},
ADMIN: {
Expand Down
95 changes: 65 additions & 30 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
"async-retry": "1.3.3",
"audit-ci": "^6.6.1",
"get-port": "5.1.1",
"jsdoc": "4.0.2",
"jsdoc": "4.0.3",
"jsonpath": "1.1.1",
"nodemon": "3.1.0",
"npm-check-updates": "16.14.20",
Expand Down
22 changes: 10 additions & 12 deletions src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ const processPositionFulfilBin = async (
for (const binItems of commitReserveFulfilBins) {
if (binItems && binItems.length > 0) {
for (const binItem of binItems) {
let reason
const transferId = binItem.message.value.content.uriParams.id
const payeeFsp = binItem.message.value.from
const payerFsp = binItem.message.value.to
const transfer = binItem.decodedPayload

// Inform payee dfsp if transfer is not in RECEIVED_FULFIL state, skip making any transfer state changes
if (accumulatedTransferStates[transferId] !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL) {
const resultMessage = _handleIncorrectTransferState(binItem, payeeFsp, transferId, accumulatedTransferStates)
Expand All @@ -62,16 +61,16 @@ const processPositionFulfilBin = async (
const positionChangeIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone)
const positionChangeToBeProcessed = cyrilResult.positionChanges[positionChangeIndex]
if (positionChangeToBeProcessed.isFxTransferStateChange) {
const { participantPositionChange, fxTransferStateChange, transferStateId, updatedRunningPosition }
= _handleParticipantPositionChangeFx(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.commitRequestId, accumulatedPositionReservedValue)
const { participantPositionChange, fxTransferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChangeFx(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.commitRequestId, accumulatedPositionReservedValue)
runningPosition = updatedRunningPosition
participantPositionChanges.push(participantPositionChange)
fxTransferStateChanges.push(fxTransferStateChange)
accumulatedFxTransferStatesCopy[positionChangeToBeProcessed.commitRequestId] = transferStateId
// TODO: Send required FX PATCH notifications
} else {
const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition }
= _handleParticipantPositionChange(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.transferId, accumulatedPositionReservedValue)
const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChange(runningPosition, positionChangeToBeProcessed.amount, positionChangeToBeProcessed.transferId, accumulatedPositionReservedValue)
runningPosition = updatedRunningPosition
participantPositionChanges.push(participantPositionChange)
transferStateChanges.push(transferStateChange)
Expand All @@ -93,13 +92,13 @@ const processPositionFulfilBin = async (
followupMessage.content.context = binItem.message.value.content.context
followupMessages.push({ binItem, messageKey: participantCurrencyId.toString(), message: followupMessage })
}
} else {
} else {
const transferAmount = transferInfoList[transferId].amount

const resultMessage = _constructTransferFulfilResultMessage(binItem, transferId, payerFsp, payeeFsp, transfer, reservedActionTransfers)
const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition }
= _handleParticipantPositionChange(runningPosition, transferAmount, transferId, accumulatedPositionReservedValue)

const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChange(runningPosition, transferAmount, transferId, accumulatedPositionReservedValue)
runningPosition = updatedRunningPosition
binItem.result = { success: true }
participantPositionChanges.push(participantPositionChange)
Expand All @@ -123,7 +122,6 @@ const processPositionFulfilBin = async (
notifyMessages: resultMessages, // array of objects containing bin item and result message. {binItem, message}
followupMessages // array of objects containing bin item, message key and followup message. {binItem, messageKey, message}
}

}

const _handleIncorrectTransferState = (binItem, payeeFsp, transferId, accumulatedTransferStates) => {
Expand Down
2 changes: 1 addition & 1 deletion src/domain/position/fx-fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const Logger = require('@mojaloop/central-services-logger')
*/
const processPositionFxFulfilBin = async (
binItems,
accumulatedFxTransferStates,
accumulatedFxTransferStates
) => {
const fxTransferStateChanges = []
const resultMessages = []
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/positions/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ const positions = async (error, messages) => {
: (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED
? Enum.Events.ActionLetter.bulkTimeoutReserved
: (action === Enum.Events.Event.Action.BULK_ABORT
? Enum.Events.ActionLetter.bulkAbort
: Enum.Events.ActionLetter.unknown)))))))))
? Enum.Events.ActionLetter.bulkAbort
: Enum.Events.ActionLetter.unknown)))))))))
const params = { message, kafkaTopic, decodedPayload: payload, span, consumer: Consumer, producer: Producer }
const eventDetail = { action }
if (![Enum.Events.Event.Action.BULK_PREPARE, Enum.Events.Event.Action.BULK_COMMIT, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED, Enum.Events.Event.Action.BULK_ABORT].includes(action)) {
Expand Down
12 changes: 11 additions & 1 deletion src/handlers/positions/handlerBatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,17 @@ const positions = async (error, messages) => {
// Produce position message and audit message
const action = item.binItem.message?.value.metadata.event.action
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.POSITION, action, item.message, eventStatus, item.messageKey, item.binItem.span)
await Kafka.produceGeneralMessage(
Config.KAFKA_CONFIG,
Producer,
Enum.Events.Event.Type.POSITION,
action,
item.message,
eventStatus,
item.messageKey,
item.binItem.span,
Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT
)
}
histTimerEnd({ success: true })
} catch (err) {
Expand Down
9 changes: 7 additions & 2 deletions src/handlers/transfers/FxFulfilService.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,18 @@ class FxFulfilService {
await this.kafkaProceed({
consumerCommit,
eventDetail,
messageKey: cyrilOutput.counterPartyFspSourceParticipantCurrencyId.toString()
messageKey: cyrilOutput.counterPartyFspSourceParticipantCurrencyId.toString(),
topicNameOverride: this.Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT
})
return true
}

async kafkaProceed(kafkaOpts) {
return this.Kafka.proceed(this.Config.KAFKA_CONFIG, this.params, kafkaOpts)
return this.Kafka.proceed(
this.Config.KAFKA_CONFIG,
this.params,
kafkaOpts
)
}

validateFulfilCondition(fulfilment, condition) {
Expand Down
Loading

0 comments on commit 13c95ae

Please sign in to comment.