Skip to content

Commit

Permalink
Adding documentation for DltAwareProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
sobychacko authored and omercelikceng committed Sep 14, 2023
1 parent ffa640c commit 385307c
Showing 1 changed file with 60 additions and 0 deletions.
60 changes: 60 additions & 0 deletions docs/src/main/asciidoc/kafka/kafka-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,66 @@ This implies that if there are multiple functions in the same application, this
Unlike the support for deserialization exception handlers as described above, the binder does not provide such first class mechanisms for handling production exceptions.
However, you still can configure production exception handlers using the `StreamsBuilderFactoryBean` customizer which you can find more details about, in a subsequent section below.

==== Runtime Error Handling

When it comes to handling errors from application code, i.e. from the business logic execution, it is usually up to the application to handle that.
Because, the Kafka Streams binder does not have a way to interfere with the application code.
However, to make things a bit easier for the application, the binder provides a convenient `DltAwareProcessor`, using which, you can dictate how you want to handle the application level errors.

Consider the following code.

```
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
```

If the business code inside your `map` call above throws an exception, it is your responsibility to handle that error.
This is where `DltAwareProcessor` becomes handy.
Let's say that you want to publish the failed record to a DLT, rather than handling it within the application.
Here is how you can do that.

```
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>((k, v) -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
```

The business logic code from the original `map` call now has been moved as part of `KStream#process` method call, which takes a `ProcessorSupplier`.
We, then, pass in the custom `DltAwareProcessor,` which is capable to publishing to a DLT.
The constructor for `DltAwareProcessor` above takes three parameters - a `BiFunction` that takes the key and value of the input record and then the business logic operation as part of the `BiFunction` body, the DLT topic, and finally a `DltPublishingContext`. When the `BiFunction`'s lambda expression throws an exception, the `DltAwareProcessor` will send the input record to a DLT. The `DltPublishingContext` provides `DltAwareProcessor` the necessary publishing infrastructure beans.
The `DltPublishingContext` is autoconfigured by the binder, so that you can inject directly this into the application.

If you want to provide a custom timestamp on the record that gets published to the DLT, then you can provide an optional fourth constructor argument which is a `Supplier<Long>`.
If this value is provided, then this `Supplier` is invoked each time `DltAwareProcessor` publishes to the DLT.

If you do not want the binder to publish failed records to a DLT, then you can provide your own recoverer as a `BiConsumer`.
Assume a scenario, in which you do not want to send the record to the DLT, but simply log the message and move on.
For this, it is convenient, if we can override the recovery process in `DltAwareProcessor`.
Here is an example of how you do that.

```
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new DltAwareProcessor<>((k, v) -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// log the message
}));
}
```

In this case, when the record fails, the `DltAwareProcessor`, instead of using its built-in recoverer which publishes to a DLT, uses the user provided recoverer which is a `BiConsumer` that takes the failed record and the exception thrown as arguments.
In this case also, you can provide an optional `Supplier<Long>` to dictate the timestamp used in the record passed in to the `BiConsumer` recoverer.

=== Retrying critical business logic

There are scenarios in which you might want to retry parts of your business logic that are critical to the application.
Expand Down

0 comments on commit 385307c

Please sign in to comment.