Skip to content

Commit

Permalink
Document a set of Kafka integration patterns:
Browse files Browse the repository at this point in the history
* Kafka with Hibernate
* Kafka with Hibernate Reactive
* Kafka to SSE
* HTTP to Kafka
  • Loading branch information
cescoffier committed Aug 20, 2021
1 parent 2be9adf commit e2a0a84
Showing 1 changed file with 362 additions and 0 deletions.
362 changes: 362 additions & 0 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1548,6 +1548,368 @@ kafka.bootstrap.servers=...

include::smallrye-kafka-outgoing.adoc[]

== Integrating with Kafka - Common patterns

=== Writing to Kafka from an HTTP endpoint

To send messages to Kafka from an HTTP endpoint, inject an `Emitter` (or a `MutinyEmitter`) in your endpoint:

[source, java]
----
package org.acme;
import java.util.concurrent.CompletionStage;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@Path("/")
public class ResourceSendingToKafka {
@Channel("kafka") Emitter<String> emitter; // <1>
@POST
@Produces(MediaType.TEXT_PLAIN)
public CompletionStage<Void> send(String payload) { // <2>
return emitter.send(payload); // <3>
}
}
----
<1> Inject an `Emitter<String>`
<2> The HTTP method receives the payload and returns a `CompletionStage` completed when the message is written to Kafka
<3> Send the message to Kafka, the `send` method returns a `CompletionStage`

The endpoint sends the passed payload (from a `POST` HTTP request) to the emitter.
The emitter's channel is mapped to a Kafka topic in the `application.properties` file:

[source, properties]
----
mp.messaging.outgoing.kafka.connector=smallrye-kafka
mp.messaging.outgoing.kafka.topic=my-topic
mp.messaging.outgoing.kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
----

The endpoint returns a `CompletionStage` indicating the asynchronous nature of the method.
The `emitter.send` method returns a `CompletionStage<Void>` .
The returned future is completed when the message has been written to Kafka.
If the writing fails, the returned `CompletionStage` is completed exceptionally.

If the endpoint does not return a `CompletionStage`, the HTTP response may be written before the message is sent to Kafka, and so failures won't be reported to the user.

If you need to send a Kafka record, use:

[source, java]
----
package org.acme;
import java.util.concurrent.CompletionStage;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import io.smallrye.reactive.messaging.kafka.Record;
@Path("/")
public class ResourceSendingToKafka {
@Channel("kafka") Emitter<Record<String,String>> emitter; // <1>
@POST
@Produces(MediaType.TEXT_PLAIN)
public CompletionStage<Void> send(String payload) {
return emitter.send(Record.of("my-key", payload)); // <2>
}
}
----
<1> Note the usage of an `Emitter<Record<K, V>>`
<2> Create the record using `Record.of(k, v)`

=== Persisting Kafka messages with Hibernate with Panache

To persist objects, received from Kafka into a database, you can use Hibernate with Panache.

NOTE: If you use Hibernate Reactive, look at <<persisting-kafka-messages-with-hibernate-reactive>>.

Let's imagine you receive `Fruit` objects.
For simplicity purposes, our `Fruit` class is pretty simple:

[source, java]
----
package org.acme;
import javax.persistence.Entity;
import io.quarkus.hibernate.orm.panache.PanacheEntity;
@Entity
public class Fruit extends PanacheEntity {
public String name;
}
----

To consume `Fruit` instances stored on a Kafka topic, and persist them into a database, you can use the following approach:

[source, java]
----
package org.acme;
import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.smallrye.common.annotation.Blocking;
@ApplicationScoped
public class FruitConsumer {
@Incoming("fruits") // <1>
@Transactional // <2>
@Blocking // <3>
public void persistFruits(Fruit fruit) { // <4>
fruit.persist(); // <5>
}
}
----
<1> Configuring the incoming channel. This channel reads from Kafka.
<2> As we are writing in a database, we must be in a transaction. This annotation starts a new transaction and commits it when the method returns.
<3> Writing to a database using classic Hibernate is blocking. So, you must tell to Quarkus that the method must be called on a worker thread you can block (and not an I/O thread).
<4> The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records.
<5> Persist the received `fruit` object.

As mentioned in <4>, you need a deserializer that can create a `Fruit` from the record.
This can be done using a Jackson deserializer:

[source, java]
----
package org.acme;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
public FruitDeserializer() {
super(Fruit.class);
}
}
----

The associated configuration would be:

[source, properties]
----
mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer
----

Check <<jackson-serialization>> for more detail about the usage of Jackson with Kafka.
You can also use Avro.

[#persisting-kafka-messages-with-hibernate-reactive]
=== Persisting Kafka messages with Hibernate Reactive

To persist objects received from Kafka into a database, you can use Hibernate Reactive with Panache.

Let's imagine you receive `Fruit` objects.
For simplicity purposes, our `Fruit` class is pretty simple:

[source, java]
----
package org.acme;
import javax.persistence.Entity;
import io.quarkus.hibernate.reactive.panache.PanacheEntity; // <1>
@Entity
public class Fruit extends PanacheEntity {
public String name;
}
----
<1> Make sure to use the reactive variant

To consume `Fruit` instances stored on a Kafka topic, and persist them into a database, you can use the following approach:

[source, java]
----
package org.acme;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class FruitStore {
@Incoming("fruits")
public Uni<Void> persist(Fruit fruit) {
return Panache.withTransaction(() -> // <1>
fruit.persist() // <2>
.map(persisted -> null) // <3>
);
}
}
----
<1> Instruct Panache to run the given (asynchronous) action in a transaction. The transaction completes when the action completes.
<2> Persist the entity. It returns a `Uni<Fruit>`.
<3> Switch back to a `Uni<Void>`.

Unlike with _classic_ Hibernate, you can't use `@Transactional`.
Instead, we use `Panache.withTransaction` and persist our entity.
The `map` is used to return a `Uni<Void>` and not a `Uni<Fruit>`.

You need a deserializer that can create a `Fruit` from the record.
This can be done using a Jackson deserializer:

[source, java]
----
package org.acme;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
public FruitDeserializer() {
super(Fruit.class);
}
}
----

The associated configuration would be:

[source, properties]
----
mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer
----

Check <<jackson-serialization>> for more detail about the usage of Jackson with Kafka.
You can also use Avro.

=== Writing entities managed by Hibernate to Kafka

Let's imagine the following process:

1. You receive an HTTP request with a payload,
2. You create an Hibernate entity instance from this payload,
3. You persist that entity into a database,
4. You send the entity to a Kafka topic

Because we write to a database, we must run this method in a transaction.
Yet, sending the entity to Kafka happens asynchronously.
The operation returns a `CompletionStage` (or a `Uni` if you use a `MutinyEmitter`) reporting when the operation completes.
We must be sure that the transaction is still running until the object is written.
Otherwise, you may access the object outside the transaction, which is not allowed.

To implement this process, you need the following approach:

[source, java]
----
package org.acme;
import java.util.concurrent.CompletionStage;
import javax.transaction.Transactional;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@Path("/")
public class ResourceSendingToKafka {
@Channel("kafka") Emitter<Fruit> emitter;
@POST
@Path("/fruits")
@Transactional // <1>
public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) { // <2>
fruit.persist();
return emitter.send(fruit); // <3>
}
}
----
<1> As we are writing to the database, make sure we run inside a transaction
<2> The method receives the fruit instance to persist. It returns a `CompletionStage` which is used for the transaction demarcation. The transaction is committed when the return `CompletionStage` completes. In our case, it's when the message is written to Kafka.
<3> Send the managed instance to Kafka. Make sure we wait for the message to complete before closing the transaction.

=== Streaming Kafka topics as server-sent events

Streaming a Kafka topic as server-sent events (SSE) is straightforward:

1. You inject the channel representing the Kafka topic in your HTTP endpoint
2. You return that channel as a `Publisher` or a `Multi` from the HTTP method

The following code provides an example:

[source, java]
----
@Channel("fruits")
Multi<Fruit> fruits;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Fruit> stream() {
return fruits;
}
----

Some environment cuts the SSE connection when there is not enough activity.
The workaround consists of sending _ping_ messages (or empty objects) periodically.

[source, java]
----
@Channel("fruits")
Multi<Fruit> fruits;
@Inject
ObjectMapper mapper;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
return Multi.createBy().merging()
.streams(
fruits.map(this::toJson),
getPingStream()
);
}
Multi<String> emitAPeriodicPing() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(10))
.onItem().transform(x -> "{}");
}
private String toJson(Fruit f) {
try {
return mapper.writeValueAsString(f);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
----

The workaround is a bit more complex as besides sending the fruits coming from Kafka, we need to send pings periodically.
To achieve this we merge the stream coming from Kafka and a periodic stream emitting `{}` every 10 seconds.

== Going further

This guide has shown how you can interact with Kafka using Quarkus.
Expand Down

0 comments on commit e2a0a84

Please sign in to comment.