From 865d23a4bac7abe9202682b2fe739ea9f8b80f5f Mon Sep 17 00:00:00 2001 From: Ralf Ueberfuhr Date: Fri, 28 Jun 2024 13:57:06 +0200 Subject: [PATCH 1/3] Use acknowlegdment --- .../schulung/statistics/kafka/CustomerEventListener.java | 6 +++++- .../src/main/resources/application.yml | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java index ba3e3d0..3331763 100644 --- a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java @@ -6,6 +6,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -24,7 +25,8 @@ public class CustomerEventListener { public void consume( @Payload CustomerEventRecord record, @Header(KafkaHeaders.RECEIVED_PARTITION) String partition, - @Header(KafkaHeaders.OFFSET) int offset + @Header(KafkaHeaders.OFFSET) int offset, + Acknowledgment acknowledgment ) { log.info( "Received record {} {} (Partition: {}, Offset: {})", @@ -34,6 +36,7 @@ public void consume( offset ); if(record.eventType() == null) { + acknowledgment.acknowledge(); return; } switch (record.eventType()) { @@ -57,6 +60,7 @@ public void consume( default: throw new ValidationException(); } + acknowledgment.acknowledge(); } } diff --git a/statistics-service-provider/src/main/resources/application.yml b/statistics-service-provider/src/main/resources/application.yml index b9a76f2..35475d5 100644 --- a/statistics-service-provider/src/main/resources/application.yml +++ b/statistics-service-provider/src/main/resources/application.yml @@ -25,6 +25,9 @@ spring: "[spring.json.value.default.type]": de.sample.schulung.statistics.kafka.CustomerEventRecord group-id: customer-statistics auto-offset-reset: earliest + enable-auto-commit: false + listener: + ack-mode: manual application: init-sample-data: enabled: true \ No newline at end of file From e16f18d0ee9603e7aa6af2a7f1ae6b32692e959f Mon Sep 17 00:00:00 2001 From: Ralf Ueberfuhr Date: Fri, 28 Jun 2024 14:08:51 +0200 Subject: [PATCH 2/3] Use ErrorHandlingDeserializer --- .../CustomDeserializationFailureHandler.java | 17 +++++++++++++++++ .../src/main/resources/application.yml | 5 ++++- 2 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomDeserializationFailureHandler.java diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomDeserializationFailureHandler.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomDeserializationFailureHandler.java new file mode 100644 index 0000000..e00c88f --- /dev/null +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomDeserializationFailureHandler.java @@ -0,0 +1,17 @@ +package de.sample.schulung.statistics.kafka; + +import org.springframework.kafka.support.serializer.FailedDeserializationInfo; + +import java.util.function.Function; + +public class CustomDeserializationFailureHandler + implements Function { + + @Override + public T apply(FailedDeserializationInfo failedDeserializationInfo) { + // hier Fehler analysieren und eigenes Objekt erzeugen + byte[] payload = failedDeserializationInfo.getData(); + return null; + } + +} diff --git a/statistics-service-provider/src/main/resources/application.yml b/statistics-service-provider/src/main/resources/application.yml index 35475d5..b6eb66a 100644 --- a/statistics-service-provider/src/main/resources/application.yml +++ b/statistics-service-provider/src/main/resources/application.yml @@ -19,10 +19,13 @@ spring: kafka: bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} consumer: - value-deserializer: de.sample.schulung.statistics.kafka.CustomJsonDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + #de.sample.schulung.statistics.kafka.CustomJsonDeserializer properties: "[spring.json.use.type.headers]": false "[spring.json.value.default.type]": de.sample.schulung.statistics.kafka.CustomerEventRecord + "[spring.deserializer.value.delegate.class]": de.sample.schulung.statistics.kafka.CustomJsonDeserializer + "[spring.deserializer.value.function]": de.sample.schulung.statistics.kafka.CustomDeserializationFailureHandler group-id: customer-statistics auto-offset-reset: earliest enable-auto-commit: false From 791a578b7a17c5d6927feb52fbb7b62bdc77a2a8 Mon Sep 17 00:00:00 2001 From: Ralf Ueberfuhr Date: Fri, 28 Jun 2024 14:29:15 +0200 Subject: [PATCH 3/3] Use Retry --- .../statistics/kafka/CustomerEventListener.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java index 3331763..dd30993 100644 --- a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java @@ -2,16 +2,21 @@ import de.sample.schulung.statistics.domain.Customer; import de.sample.schulung.statistics.domain.CustomersService; +import jakarta.validation.Valid; import jakarta.validation.ValidationException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.retry.annotation.Backoff; import org.springframework.stereotype.Component; +import org.springframework.validation.annotation.Validated; +@Validated @Component @Slf4j @RequiredArgsConstructor @@ -19,11 +24,19 @@ public class CustomerEventListener { private final CustomersService customersService; + // Versuch, danach 1 Versuch nach 1s, danach Versuch nach 2s, danach Versuch nach 4s + @RetryableTopic( + attempts = "4", + backoff = @Backoff( + delay = 1000L, + multiplier = 2 + ) + ) @KafkaListener( topics = KafkaConstants.CUSTOMER_EVENTS_TOPIC ) public void consume( - @Payload CustomerEventRecord record, + @Valid @Payload CustomerEventRecord record, @Header(KafkaHeaders.RECEIVED_PARTITION) String partition, @Header(KafkaHeaders.OFFSET) int offset, Acknowledgment acknowledgment