Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #9 from ralf-ueberfuhr-ars/feature/consumer-fault-…
Browse files Browse the repository at this point in the history
…handling

Consumer Fault Handling
  • Loading branch information
ralf-ueberfuhr-ars authored Jun 28, 2024
2 parents 9465069 + 791a578 commit 79d6e0e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package de.sample.schulung.statistics.kafka;

import org.springframework.kafka.support.serializer.FailedDeserializationInfo;

import java.util.function.Function;

public class CustomDeserializationFailureHandler<T>
implements Function<FailedDeserializationInfo, T> {

@Override
public T apply(FailedDeserializationInfo failedDeserializationInfo) {
// hier Fehler analysieren und eigenes Objekt erzeugen
byte[] payload = failedDeserializationInfo.getData();
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,44 @@

import de.sample.schulung.statistics.domain.Customer;
import de.sample.schulung.statistics.domain.CustomersService;
import jakarta.validation.Valid;
import jakarta.validation.ValidationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

@Validated
@Component
@Slf4j
@RequiredArgsConstructor
public class CustomerEventListener {

private final CustomersService customersService;

// Versuch, danach 1 Versuch nach 1s, danach Versuch nach 2s, danach Versuch nach 4s
@RetryableTopic(
attempts = "4",
backoff = @Backoff(
delay = 1000L,
multiplier = 2
)
)
@KafkaListener(
topics = KafkaConstants.CUSTOMER_EVENTS_TOPIC
)
public void consume(
@Payload CustomerEventRecord record,
@Valid @Payload CustomerEventRecord record,
@Header(KafkaHeaders.RECEIVED_PARTITION) String partition,
@Header(KafkaHeaders.OFFSET) int offset
@Header(KafkaHeaders.OFFSET) int offset,
Acknowledgment acknowledgment
) {
log.info(
"Received record {} {} (Partition: {}, Offset: {})",
Expand All @@ -34,6 +49,7 @@ public void consume(
offset
);
if(record.eventType() == null) {
acknowledgment.acknowledge();
return;
}
switch (record.eventType()) {
Expand All @@ -57,6 +73,7 @@ public void consume(
default:
throw new ValidationException();
}
acknowledgment.acknowledge();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
consumer:
value-deserializer: de.sample.schulung.statistics.kafka.CustomJsonDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
#de.sample.schulung.statistics.kafka.CustomJsonDeserializer
properties:
"[spring.json.use.type.headers]": false
"[spring.json.value.default.type]": de.sample.schulung.statistics.kafka.CustomerEventRecord
"[spring.deserializer.value.delegate.class]": de.sample.schulung.statistics.kafka.CustomJsonDeserializer
"[spring.deserializer.value.function]": de.sample.schulung.statistics.kafka.CustomDeserializationFailureHandler
group-id: customer-statistics
auto-offset-reset: earliest
enable-auto-commit: false
listener:
ack-mode: manual
application:
init-sample-data:
enabled: true

0 comments on commit 79d6e0e

Please sign in to comment.