Skip to content

Commit

Permalink
fix(impl): Remove useless errorStrategy configuration entry
Browse files Browse the repository at this point in the history
Indeed the error strategies are activated in Kafka Streams and Kafka in general with `kafka.error.strategy` configuration.
So a dedicated configuration was defined that had no effect.

The proper fix is to deprecated that config entry, and switch the business logic so it uses `kafka.error.strategy` instead.
  • Loading branch information
edeweerd1A committed Oct 29, 2024
1 parent 67c621f commit c654b9e
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Set;

import jakarta.annotation.Priority;
import jakarta.decorator.Delegate;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;

Expand All @@ -44,7 +43,6 @@
import io.quarkiverse.kafkastreamsprocessor.impl.errors.ErrorHandlingStrategy;
import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics;
import io.quarkiverse.kafkastreamsprocessor.spi.SinkToTopicMappingBuilder;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;

Expand All @@ -54,7 +52,7 @@
* Uses a dead-letter sink from the topology, rather than a raw producer, to benefit from the same KStreams guarantees
* (at least once / exactly once).
*/
//@Decorator
// @Decorator
@Priority(ProcessorDecoratorPriorities.DLQ)
@Dependent
public class DlqDecorator extends AbstractProcessorDecorator {
Expand Down Expand Up @@ -105,19 +103,16 @@ public class DlqDecorator extends AbstractProcessorDecorator {
* the enricher of metadata before sending message to the dead letter queue
* @param metrics
* container of all metrics of the framework
* @param kStreamsProcessorConfig
* It contains the configuration for the error strategy configuration property value (default
* {@link ErrorHandlingStrategy#CONTINUE})
* and the configuration Kafka topic to use for dead letter queue (optional)
* @param errorHandlingStrategy
* tells whether DLQ is activated
*/
@Inject
public DlqDecorator(
SinkToTopicMappingBuilder sinkToTopicMappingBuilder, DlqMetadataHandler dlqMetadataHandler,
KafkaStreamsProcessorMetrics metrics,
KStreamsProcessorConfig kStreamsProcessorConfig) { // NOSONAR Optional with microprofile-config
ErrorHandlingStrategy errorHandlingStrategy) { // NOSONAR Optional with microprofile-config
this(sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics,
ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(),
kStreamsProcessorConfig.dlq().topic()));
errorHandlingStrategy.shouldSendToDlq());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
*/
package io.quarkiverse.kafkastreamsprocessor.impl.errors;

import java.util.Optional;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;

/**
* Constants related to Kafka error handling
*/
@ApplicationScoped
public class ErrorHandlingStrategy {
/**
* Configuration property to check for the Kafka error handling strategy
Expand All @@ -45,28 +51,30 @@ public class ErrorHandlingStrategy {
*/
public static final String FAIL = "fail";

private final String errorStrategy;

private final KStreamsProcessorConfig kStreamsProcessorConfig;

@Inject
public ErrorHandlingStrategy(@ConfigProperty(name = CONFIG_PROPERTY, defaultValue = CONTINUE) String errorStrategy,
KStreamsProcessorConfig config) {
this.errorStrategy = errorStrategy;
this.kStreamsProcessorConfig = config;
}

/**
* Tells whether microservice-specific DLQ is activated and has a dedicated topic
*
* @param errorStrategy
* the error strategy chosen by an application between <code>continue</code>, <code>dead-letter-queue</code>
* and <code>fail</code>, configured with <code>kafka.error.strategy</code>
* @param dlqTopic
* the optional topic that is mandatory if the chosen error strategy is <code>dead-letter-queue</code>
* @return whether DLQ mechanism is activated by the configuration or not
*/
public static boolean shouldSendToDlq(String errorStrategy, Optional<String> dlqTopic) {
if (ErrorHandlingStrategy.DEAD_LETTER_QUEUE.equals(errorStrategy)) {
if (dlqTopic.isPresent()) {
public boolean shouldSendToDlq() {
if (DEAD_LETTER_QUEUE.equals(errorStrategy)) {
if (kStreamsProcessorConfig.dlq().topic().isPresent()) {
return true;
} else {
throw new IllegalStateException("DLQ strategy enabled but dlq.topic configuration property is missing");
}
}
return false;
}

private ErrorHandlingStrategy() {
// Prevent instantiation of utility class
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class LogAndSendToDlqExceptionHandlerDelegate implements DeserializationE
*/
private final KStreamsProcessorConfig kStreamsProcessorConfig;

private final ErrorHandlingStrategy errorHandlingStrategy;

/** True if the dead letter queue strategy is selected and properly configured */
boolean sendToDlq;

Expand All @@ -91,11 +93,13 @@ public class LogAndSendToDlqExceptionHandlerDelegate implements DeserializationE
public LogAndSendToDlqExceptionHandlerDelegate(KafkaClientSupplier kafkaClientSupplier,
KafkaStreamsProcessorMetrics metrics,
DlqMetadataHandler dlqMetadataHandler,
KStreamsProcessorConfig kStreamsProcessorConfig) {
KStreamsProcessorConfig kStreamsProcessorConfig,
ErrorHandlingStrategy errorHandlingStrategy) {
this.clientSupplier = kafkaClientSupplier;
this.metrics = metrics;
this.dlqMetadataHandler = dlqMetadataHandler;
this.kStreamsProcessorConfig = kStreamsProcessorConfig;
this.errorHandlingStrategy = errorHandlingStrategy;
}

/**
Expand Down Expand Up @@ -147,8 +151,7 @@ private void sendToDlq(final ProcessorContext context, final ConsumerRecord<byte
@Override
public void configure(final Map<String, ?> configs) {
// Resolve the DLQ strategy once to fail fast in case of misconfiguration
sendToDlq = ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(),
kStreamsProcessorConfig.dlq().topic());
sendToDlq = errorHandlingStrategy.shouldSendToDlq();
if (sendToDlq) {
Map<String, Object> dlqConfigMap = new HashMap<>(configs);
dlqConfigMap.put(KafkaClientSupplierDecorator.DLQ_PRODUCER, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,60 @@
*/
package io.quarkiverse.kafkastreamsprocessor.impl.errors;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

import java.util.Optional;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import io.quarkiverse.kafkastreamsprocessor.spi.properties.DlqConfig;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
import io.quarkus.test.Mock;

@ExtendWith({ MockitoExtension.class })
class ErrorHandlingStrategyTest {

@Mock
KStreamsProcessorConfig extensionConfiguration;

@Mock
DlqConfig dlqConfig;

@BeforeEach
void setUp() {
when(extensionConfiguration.dlq()).thenReturn(dlqConfig);
}

@Test
void shouldSendToDlqIfRequested() {
when(dlqConfig.topic()).thenReturn(Optional.of("aTopicName"));

assertTrue(
ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, Optional.of("aTopicName")));
new ErrorHandlingStrategy(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, extensionConfiguration).shouldSendToDlq());
}

@Test
void shouldThrowIfDlqRequestedButNoTopic() {
Optional<String> noTopic = Optional.empty();
when(dlqConfig.topic()).thenReturn(noTopic);

assertThrows(IllegalStateException.class,
() -> ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, noTopic));
() -> new ErrorHandlingStrategy(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, extensionConfiguration)
.shouldSendToDlq());
}

@Test
void shouldNotSendToDlqWithOtherStrategy() {
assertFalse(
ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.FAIL, Optional.of("aTopicName")));
assertFalse(
ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.CONTINUE, Optional.of("aTopicName")));
when(dlqConfig.topic()).thenReturn(Optional.of("aTopicName"));

assertTrue(
new ErrorHandlingStrategy(ErrorHandlingStrategy.FAIL, extensionConfiguration).shouldSendToDlq());
assertTrue(
new ErrorHandlingStrategy(ErrorHandlingStrategy.CONTINUE, extensionConfiguration).shouldSendToDlq());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -70,6 +69,9 @@ class LogAndSendToDlqExceptionHandlerDelegateTest {
@Mock
private DlqConfig dlqConfig;

@Mock
private ErrorHandlingStrategy errorHandlingStrategy;

@Mock
private ProcessorContext context;

Expand Down Expand Up @@ -109,14 +111,14 @@ void shouldNotBlockAndSendToDlqIfPossible() {
when(record.partition()).thenReturn(PARTITION);
when(record.headers()).thenReturn(headers);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
when(errorHandlingStrategy.shouldSendToDlq()).thenReturn(true);
RecordHeaders headersWithMetadata = new RecordHeaders();
when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
.thenReturn(headersWithMetadata);
when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);

handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig, errorHandlingStrategy);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
Expand All @@ -131,12 +133,12 @@ void shouldNotBlockAndSendToDlqIfPossible() {

@Test
void shouldOnlyContinueIfDefaultErrorStrategy() {
when(kStreamsProcessorConfig.errorStrategy()).thenReturn("continue");
when(errorHandlingStrategy.shouldSendToDlq()).thenReturn(false);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig, errorHandlingStrategy);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
Expand All @@ -146,14 +148,4 @@ void shouldOnlyContinueIfDefaultErrorStrategy() {
assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
}

@Test
void shouldFailFastIfDlqStrategyWithoutTopic() {
when(dlqConfig.topic()).thenReturn(Optional.empty());
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);

assertThrows(IllegalStateException.class, () -> handler.configure(Collections.emptyMap()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ public interface KStreamsProcessorConfig {

/**
* Kafka error handling strategy
*
* @deprecated Has actually no effect whatsoever, as it is rather the kafka config entry
* <code>kafka.error.strategy</code> that activates in KafkaStreams the DLQ mechanism.
*/
@WithDefault("continue")
@Deprecated(forRemoval = true, since = "3.0.2")
String errorStrategy();

/**
Expand Down

0 comments on commit c654b9e

Please sign in to comment.