diff --git a/docs/src/main/asciidoc/kafka-streams.adoc b/docs/src/main/asciidoc/kafka-streams.adoc index 9f016be2..be68b962 100644 --- a/docs/src/main/asciidoc/kafka-streams.adoc +++ b/docs/src/main/asciidoc/kafka-streams.adoc @@ -430,213 +430,6 @@ public Function, KStream> bar() { You can compose them as `foo|bar`, but keep in mind that the second function (`bar` in this case) must have a `KTable` as input since the first function (`foo`) has `KTable` as output. -==== Imperative programming model. - -Starting with `3.1.0` version of the binder, we recommend using the functional programming model described above for Kafka Streams binder based applications. -The support for `StreamListener` is deprecated starting with `3.1.0` of Spring Cloud Stream. -Below, we are providing some details on the `StreamListener` based Kafka Streams processors as a reference. - -Following is the equivalent of the Word count example using `StreamListener`. - -[source] ----- -@SpringBootApplication -@EnableBinding(KafkaStreamsProcessor.class) -public class WordCountProcessorApplication { - - @StreamListener("input") - @SendTo("output") - public KStream process(KStream input) { - return input - .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .groupBy((key, value) -> value) - .windowedBy(TimeWindows.of(5000)) - .count(Materialized.as("WordCounts-multi")) - .toStream() - .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))); - } - - public static void main(String[] args) { - SpringApplication.run(WordCountProcessorApplication.class, args); - } ----- - -As you can see, this is a bit more verbose since you need to provide `EnableBinding` and the other extra annotations like `StreamListener` and `SendTo` to make it a complete application. -`EnableBinding` is where you specify your binding interface that contains your bindings. -In this case, we are using the stock `KafkaStreamsProcessor` binding interface that has the following contracts. - -[source] ----- -public interface KafkaStreamsProcessor { - - @Input("input") - KStream input(); - - @Output("output") - KStream output(); - -} ----- - -Binder will create bindings for the input `KStream` and output `KStream` since you are using a binding interface that contains those declarations. - -In addition to the obvious differences in the programming model offered in the functional style, one particular thing that needs to be mentioned here is that the binding names are what you specify in the binding interface. -For example, in the above application, since we are using `KafkaStreamsProcessor`, the binding names are `input` and `output`. -Binding properties need to use those names. For instance `spring.cloud.stream.bindings.input.destination`, `spring.cloud.stream.bindings.output.destination` etc. -Keep in mind that this is fundamentally different from the functional style since there the binder generates binding names for the application. -This is because the application does not provide any binding interfaces in the functional model using `EnableBinding`. - -Here is another example of a sink where we have two inputs. - -[source] ----- -@EnableBinding(KStreamKTableBinding.class) -..... -..... -@StreamListener -public void process(@Input("inputStream") KStream playEvents, - @Input("inputTable") KTable songTable) { - .... - .... -} - -interface KStreamKTableBinding { - - @Input("inputStream") - KStream inputStream(); - - @Input("inputTable") - KTable inputTable(); -} - ----- - -Following is the `StreamListener` equivalent of the same `BiFunction` based processor that we saw above. - - -[source] ----- -@EnableBinding(KStreamKTableBinding.class) -.... -.... - -@StreamListener -@SendTo("output") -public KStream process(@Input("input") KStream userClicksStream, - @Input("inputTable") KTable userRegionsTable) { -.... -.... -} - -interface KStreamKTableBinding extends KafkaStreamsProcessor { - - @Input("inputX") - KTable inputTable(); -} ----- - -Finally, here is the `StreamListener` equivalent of the application with three inputs and curried functions. - -[source] ----- -@EnableBinding(CustomGlobalKTableProcessor.class) -... -... - @StreamListener - @SendTo("output") - public KStream process( - @Input("input-1") KStream ordersStream, - @Input("input-2") GlobalKTable customers, - @Input("input-3") GlobalKTable products) { - - KStream customerOrdersStream = ordersStream.join( - customers, (orderId, order) -> order.getCustomerId(), - (order, customer) -> new CustomerOrder(customer, order)); - - return customerOrdersStream.join(products, - (orderId, customerOrder) -> customerOrder.productId(), - (customerOrder, product) -> { - EnrichedOrder enrichedOrder = new EnrichedOrder(); - enrichedOrder.setProduct(product); - enrichedOrder.setCustomer(customerOrder.customer); - enrichedOrder.setOrder(customerOrder.order); - return enrichedOrder; - }); - } - - interface CustomGlobalKTableProcessor { - - @Input("input-1") - KStream input1(); - - @Input("input-2") - GlobalKTable input2(); - - @Input("input-3") - GlobalKTable input3(); - - @Output("output") - KStream output(); - } - ----- - -You might notice that the above two examples are even more verbose since in addition to provide `EnableBinding`, you also need to write your own custom binding interface as well. -Using the functional model, you can avoid all those ceremonial details. - -Before we move on from looking at the general programming model offered by Kafka Streams binder, here is the `StreamListener` version of multiple output bindings. - -[source] ----- -EnableBinding(KStreamProcessorWithBranches.class) -public static class WordCountProcessorApplication { - - @Autowired - private TimeWindows timeWindows; - - @StreamListener("input") - @SendTo({"output1","output2","output3"}) - public KStream[] process(KStream input) { - - Predicate isEnglish = (k, v) -> v.word.equals("english"); - Predicate isFrench = (k, v) -> v.word.equals("french"); - Predicate isSpanish = (k, v) -> v.word.equals("spanish"); - - return input - .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .groupBy((key, value) -> value) - .windowedBy(timeWindows) - .count(Materialized.as("WordCounts-1")) - .toStream() - .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))) - .branch(isEnglish, isFrench, isSpanish); - } - - interface KStreamProcessorWithBranches { - - @Input("input") - KStream input(); - - @Output("output1") - KStream output1(); - - @Output("output2") - KStream output2(); - - @Output("output3") - KStream output3(); - } -} ----- - -To recap, we have reviewed the various programming model choices when using the Kafka Streams binder. - -The binder provides binding capabilities for `KStream`, `KTable` and `GlobalKTable` on the input. -`KTable` and `GlobalKTable` bindings are only available on the input. -Binder supports both input and output bindings for `KStream`. - -The upshot of the programming model of Kafka Streams binder is that the binder provides you the flexibility of going with a fully functional programming model or using the `StreamListener` based imperative approach. - === Ancillaries to the programming model ==== Multiple Kafka Streams processors within a single application @@ -677,7 +470,7 @@ This is also true when you have a single Kafka Streams processor and other types Application id is a mandatory property that you need to provide for a Kafka Streams application. Spring Cloud Stream Kafka Streams binder allows you to configure this application id in multiple ways. -If you only have one single processor or `StreamListener` in the application, then you can set this at the binder level using the following property: +If you only have one single processor in the application, then you can set this at the binder level using the following property: `spring.cloud.stream.kafka.streams.binder.applicationId`. @@ -712,33 +505,6 @@ and `spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId` -In the case of `StreamListener`, you need to set this on the first input binding on the processor. - -For e.g. imagine that you have the following two `StreamListener` based processors. - -``` -@StreamListener -@SendTo("output") -public KStream process(@Input("input") > input) { - ... -} - -@StreamListener -@SendTo("anotherOutput") -public KStream anotherProcess(@Input("anotherInput") > input) { - ... -} -``` - -Then you must set the application id for this using the following binding property. - -`spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId` - -and - -`spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId` - - For function based model also, this approach of setting application id at the binding level will work. However, setting per function at the binder level as we have seen above is much easier if you are using the functional model. @@ -749,14 +515,12 @@ If the application does not provide an application ID, then in that case the bin This is convenient in development scenarios as it avoids the need for explicitly providing the application ID. The generated application ID in this manner will be static over application restarts. In the case of functional model, the generated application ID will be the function bean name followed by the literal `applicationID`, for e.g `process-applicationID` if `process` if the function bean name. -In the case of `StreamListener`, instead of using the function bean name, the generated application ID will be use the containing class name followed by the method name followed by the literal `applicationId`. ====== Summary of setting Application ID -* By default, binder will auto generate the application ID per function or `StreamListener` methods. +* By default, binder will auto generate the application ID per function methods. * If you have a single processor, then you can use `spring.kafka.streams.applicationId`, `spring.application.name` or `spring.cloud.stream.kafka.streams.binder.applicationId`. * If you have multiple processors, then application ID can be set per function using the property - `spring.cloud.stream.kafka.streams.binder.functions..applicationId`. -In the case of `StreamListener`, this can be done using `spring.cloud.stream.kafka.streams.bindings.input.applicationId`, assuming that the input binding name is `input`. ==== Overriding the default binding names generated by the binder with the functional style @@ -816,7 +580,7 @@ Keys are always deserialized using native Serdes. For values, by default, deserialization on the inbound is natively performed by Kafka. Please note that this is a major change on default behavior from previous versions of Kafka Streams binder where the deserialization was done by the framework. -Kafka Streams binder will try to infer matching `Serde` types by looking at the type signature of `java.util.function.Function|Consumer` or `StreamListener`. +Kafka Streams binder will try to infer matching `Serde` types by looking at the type signature of `java.util.function.Function|Consumer`. Here is the order that it matches the Serdes. * If the application provides a bean of type `Serde` and if the return type is parameterized with the actual type of the incoming key or value type, then it will use that `Serde` for inbound deserialization. @@ -1016,7 +780,7 @@ It is always recommended to explicitly create a DLQ topic for each input binding ==== DLQ per input consumer binding The property `spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler` is applicable for the entire application. -This implies that if there are multiple functions or `StreamListener` methods in the same application, this property is applied to all of them. +This implies that if there are multiple functions in the same application, this property is applied to all of them. However, if you have multiple processors or multiple input bindings within a single processor, then you can use the finer-grained DLQ control that the binder provides per input consumer binding. If you have the following processor, @@ -1061,7 +825,7 @@ If you set a consumer binding's `dlqPartitions` property to a value greater than A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder. * The property `spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler` is applicable for the entire application. -This implies that if there are multiple functions or `StreamListener` methods in the same application, this property is applied to all of them. +This implies that if there are multiple functions in the same application, this property is applied to all of them. * The exception handling for deserialization works consistently with native deserialization and framework provided message conversion. ==== Handling Production Exceptions in the Binder @@ -2100,7 +1864,7 @@ Default: `logAndFail` applicationId:: Convenient way to set the application.id for the Kafka Streams application globally at the binder level. -If the application contains multiple functions or `StreamListener` methods, then the application id should be set differently. +If the application contains multiple functions, then the application id should be set differently. See above where setting the application id is discussed in detail. + Default: application will generate a static application ID. See the application ID section for more details. @@ -2164,7 +1928,7 @@ The following properties are available for Kafka Streams consumers and must be p For convenience, if there are multiple input bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.consumer.`. applicationId:: -Setting application.id per input binding. This is only preferred for `StreamListener` based processors, for function based processors see other approaches outlined above. +Setting application.id per input binding. + Default: See above. @@ -2238,7 +2002,7 @@ In Kafka Streams, you can control of the number of threads a processor can creat This, you can do using the various `configuration` options described above under binder, functions, producer or consumer level. You can also use the `concurrency` property that core Spring Cloud Stream provides for this purpose. When using this, you need to use it on the consumer. -When you have more than one input bindings either in a function or `StreamListener`, set this on the first input binding. +When you have more than one input binding, set this on the first input binding. For e.g. when setting `spring.cloud.stream.bindings.process-in-0.consumer.concurrency`, it will be translated as `num.stream.threads` by the binder. If you have multiple processors and one processor defines binding level concurrency, but not the others, those ones with no binding level concurrency will default back to the binder wide property specified through `spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads`. diff --git a/docs/src/main/asciidoc/overview.adoc b/docs/src/main/asciidoc/overview.adoc index 096eccf6..b3dca239 100644 --- a/docs/src/main/asciidoc/overview.adoc +++ b/docs/src/main/asciidoc/overview.adoc @@ -364,8 +364,6 @@ Starting with version 3.0, when `spring.cloud.stream.binding..consumer.bat Otherwise, the method will be called with one record at a time. The size of the batch is controlled by Kafka consumer properties `max.poll.records`, `fetch.min.bytes`, `fetch.max.wait.ms`; refer to the Kafka documentation for more information. -Bear in mind that batch mode is not supported with `@StreamListener` - it only works with the newer functional programming model. - IMPORTANT: Retry within the binder is not supported when using batch mode, so `maxAttempts` will be overridden to 1. You can configure a `SeekToCurrentBatchErrorHandler` (using a `ListenerContainerCustomizer`) to achieve similar functionality to retry in the binder. You can also use a manual `AckMode` and call `Ackowledgment.nack(index, sleep)` to commit the offsets for a partial batch and have the remaining records redelivered. diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionCompositionTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionCompositionTests.java index f3716d41..cebe3121 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionCompositionTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionCompositionTests.java @@ -224,6 +224,7 @@ public void testChainedFunctionsAsComposed() throws InterruptedException { try (ConfigurableApplicationContext context = app.run( "--server.port=0", "--spring.jmx.enabled=false", + "--spring.cloud.stream.kafka.streams.binder.applicationId=my-app-id", "--spring.cloud.stream.function.definition=fooBiFunc|anotherFooFunc|yetAnotherFooFunc|lastFunctionInChain", "--spring.cloud.stream.function.bindings.fooBiFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-in-0=input1", "--spring.cloud.stream.function.bindings.fooBiFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-in-1=input2", @@ -266,6 +267,7 @@ public void testFirstFunctionCurriedThenComposeWithOtherFunctions() throws Inter try (ConfigurableApplicationContext context = app.run( "--server.port=0", "--spring.jmx.enabled=false", + "--spring.cloud.stream.kafka.streams.binder.applicationId=my-app-id-xyz", "--spring.cloud.stream.function.definition=curriedFunc|anotherFooFunc|yetAnotherFooFunc|lastFunctionInChain", "--spring.cloud.stream.function.bindings.curriedFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-in-0=input1", "--spring.cloud.stream.function.bindings.curriedFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-in-1=input2", diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java index b8d03225..c32de113 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java @@ -28,7 +28,6 @@ import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.cloud.stream.annotation.StreamMessageConverter; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics; import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener; @@ -141,7 +140,6 @@ ProducerListener producerListener() { } @Bean - @StreamMessageConverter @ConditionalOnMissingBean(KafkaNullConverter.class) MessageConverter kafkaNullConverter() { return new KafkaNullConverter();