This tutorial explains how a machine learning model is applied on real-time data. It predicts incoming data as well as the model is retrained when the prediction results decrease. It focuses on simplicity and can be seen as a baseline for similar projects. You can read more about it in my blog article: Apache Kafka and R: Real-Time Prediction and Model (Re)training.
Let's go over the single parts of the data flow. A Kafka Producer produces simulated data of a fish's size measurement
as well as the weight continuously into two Kafka topics: machine-weight
and machine-measurement
.
A Kafka Streams application consumes the machine-measurement
topic and communicates via REST API with R
to predict the weight using linear regression. You can find a unit test for the
topology as well as an integration test for the REST communication here.
In ksqlDB both streams are joined, and the prediction is compared with the actual weight (error).
One connector stores data in MongoDB so that it can be used for retraining the regression. The other connector acts as a trigger to do the retraining once the error exceeds a threshold.
In R the model itself, the prediction function, and the retraining function are stored and accessible via REST API. You can find a test here.
docker-compose up -d
It starts:
- Zookeeper
- Kafka Broker
- Kafka Topics
- creates initial topics
- Kafka Connect
- ksqlDB Server
- ksqlDB Client
- MongoDB
- Kafka Producer
- built docker image executing fat JAR
- Kafka Streams
- built docker image executing fat JAR
- RStudio
Make sure to wait some time until everything is fully started up.
First, we start the two Kafka Connectors:
curl -X POST -H "Content-Type: application/json" --data @MongoDBConnector.json http://localhost:8083/connectors | jq
curl -X POST -H "Content-Type: application/json" --data @HTTPConnector.json http://localhost:8083/connectors | jq
Use the client to access ksqlDB:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Run all queries stored in Queries.ksql.
To gain insights of the pipeline, we look at the Stream DIFF_WEIGHT
:
SELECT * FROM DIFF_WEIGHT EMIT CHANGES;
We can also detect when the retrained model is applied because the prediction error decreases, and the model time changes.
In the KTable RETRAIN_WEIGHT
, we see the events that trigger the retraining.
SET 'auto.offset.reset'='earliest';
SELECT * FROM RETRAIN_WEIGHT EMIT CHANGES;