Skip to content

Commit

Permalink
feat(mojaloop/#3636): batching implementation for position prepare me…
Browse files Browse the repository at this point in the history
…ssages (#968)

feat(mojaloop/#3636): batching implementation for position prepare messages - mojaloop/project#3636
- Added prepare-batch handler for `prepare` messages
- Updated metrics
- Replaced `uuidv4` with built-in `randomUUID` from `crypto` nodejs library
- `bulk-prepare` messages are routed to non-batch prepare handler
- Added tests (unit, integration)
  • Loading branch information
vijayg10 authored Dec 5, 2023
1 parent bdff6c7 commit 538616d
Show file tree
Hide file tree
Showing 27 changed files with 7,483 additions and 1,422 deletions.
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,48 @@ NOTE: Only POSITION.PREPARE is supported at this time, with additional event-typ
}
```

### Batch Processing Configuration Guide

Batch processing can be enabled in the transfer execution flow. Follow the steps below to enable batch processing for a more efficient transfer execution:

- **Step 1:** **Create a New Kafka Topic**

Create a new Kafka topic named `topic-transfer-position-batch` to handle batch processing events.
- **Step 2:** **Configure Action Type Mapping**

Point the prepare handler to the newly created topic for the action type `prepare` using the `KAFKA.EVENT_TYPE_ACTION_TOPIC_MAP` configuration as shown below:
```
"KAFKA": {
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": "topic-transfer-position-batch",
"BULK_PREPARE": "topic-transfer-position"
}
}
}
```
_NOTE_: `BULK_PREPARE` configuration property is added to aid routing of `bulk-prepare` events to non-batch handler since the batch handler does not support `bulk-prepare` events.

- **Step 3:** **Run Batch Processing Handlers**

Run the position batch handler along with the existing position handler using the following configuration options:
- **Configure Event Type Action Topic Map for Batch Handler:**

Configure the `EVENT_TYPE_ACTION_TOPIC_MAP` parameter for the position batch handler to consume events from the new topic (topic-transfer-position-batch).

- **Set Batch Size:**

Adjust the batch size using the parameter `KAFKA.CONSUMER.TRANSFER.POSITION_BATCH.config.options.batchSize`. This parameter determines the number of messages fetched in each batch.

- **Set Consume Timeout:**

Configure the consume timeout using the parameter `KAFKA.CONSUMER.TRANSFER.POSITION_BATCH.config.options.consumeTimeout`. This parameter specifies the number of milliseconds to wait for a batch of messages to be fetched if the specified batch size of messages is not immediately available.

Example Command to Run Handlers:
```
node src/handlers/index.js handler --positionbatch
```

## API

For endpoint documentation, see the [API documentation](API.md).
Expand Down Expand Up @@ -187,6 +229,25 @@ If you want to run integration tests in a repetitive manner, you can startup the

If you want to run override position topic tests you can repeat the above and use `npm run test:int-override` after configuring settings found [here](#kafka-position-event-type-action-topic-map)

#### For running integration tests for batch processing interactively
- Run dependecies
```
docker-compose up -d mysql kafka init-kafka kafka-debug-console
npm run wait-4-docker
```
- Run central-ledger services
```
nvm use
npm run migrate
env "CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch" npm start
```
- Additionally, run position batch handler in a new terminal
```
env "CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch" "CLEDG_HANDLERS__API__DISABLED=true" node src/handlers/index.js handler --positionbatch
```
- Run tests using `npx tape 'test/integration-override/**/handlerBatch.test.js'`


If you want to just run all of the integration suite non-interactively then use npm run `test:integration`.
It will handle docker start up, migration, service starting and testing. Be sure to exit any previously ran handlers or docker commands.

Expand Down
33 changes: 31 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@
"KAFKA": {
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": null
"PREPARE": null,
"BULK_PREPARE": null
}
},
"TOPIC_TEMPLATES": {
Expand Down Expand Up @@ -289,7 +290,35 @@
"group.id": "cl-group-transfer-position",
"metadata.broker.list": "localhost:9092",
"socket.keepalive.enable": true,
"allow.auto.create.topics": true
"allow.auto.create.topics": true,
"partition.assignment.strategy": "cooperative-sticky",
"enable.auto.commit": false
},
"topicConf": {
"auto.offset.reset": "earliest"
}
}
},
"POSITION_BATCH": {
"config": {
"options": {
"mode": 2,
"batchSize": 10,
"pollFrequency": 10,
"recursiveTimeout": 100,
"messageCharset": "utf8",
"messageAsJSON": true,
"sync": true,
"consumeTimeout": 10
},
"rdkafkaConf": {
"client.id": "cl-con-transfer-position-batch",
"group.id": "cl-group-transfer-position-batch",
"metadata.broker.list": "localhost:9092",
"socket.keepalive.enable": true,
"allow.auto.create.topics": true,
"partition.assignment.strategy": "cooperative-sticky",
"enable.auto.commit": false
},
"topicConf": {
"auto.offset.reset": "earliest"
Expand Down
Loading

0 comments on commit 538616d

Please sign in to comment.