Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated code for Micronaut Framework 5 and document breaking changes. #988

Draft
wants to merge 1 commit into
base: 6.0.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions config/accepted-api-changes.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[
{
"type": "io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder",
"member": "Constructor io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder(java.util.Properties)",
"reason": "Removed deprecated code for Micronaut Framework 5."
},
{
"type": "io.micronaut.configuration.kafka.bind.ConsumerRecordBinderRegistry",
"member": "Constructor io.micronaut.configuration.kafka.bind.ConsumerRecordBinderRegistry(io.micronaut.configuration.kafka.bind.ConsumerRecordBinder[])",
"reason": "Removed deprecated code for Micronaut Framework 5."
},
{
"type": "io.micronaut.configuration.kafka.exceptions.DefaultKafkaListenerExceptionHandler",
"member": "Constructor io.micronaut.configuration.kafka.exceptions.DefaultKafkaListenerExceptionHandler()",
"reason": "Removed deprecated code for Micronaut Framework 5."
},
{
"type": "io.micronaut.configuration.kafka.health.KafkaHealthIndicator",
"member": "Constructor io.micronaut.configuration.kafka.health.KafkaHealthIndicator(org.apache.kafka.clients.admin.AdminClient,io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration)",
"reason": "Removed deprecated code for Micronaut Framework 5."
},
{
"type": "io.micronaut.configuration.kafka.KafkaHeaders",
"member": "Constructor io.micronaut.configuration.kafka.KafkaHeaders(org.apache.kafka.common.header.Headers)",
"reason": "Removed deprecated code for Micronaut Framework 5."
}
]
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
projectVersion=5.4.0-SNAPSHOT
projectVersion=6.0.0-SNAPSHOT
projectGroup=io.micronaut.kafka

title=Micronaut Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,6 @@ public class ConfiguredStreamBuilder extends StreamsBuilder implements Named {

private final Duration closeTimeout;

/**
* Default constructor.
*
* @param configuration The configuration
* @deprecated Use {@link #ConfiguredStreamBuilder(Properties, String, Duration)}
*/
@Deprecated(since = "5.4.0")
public ConfiguredStreamBuilder(Properties configuration) {
this(configuration, AbstractKafkaStreamsConfiguration.DEFAULT_NAME, AbstractKafkaStreamsConfiguration.DEFAULT_CLOSE_TIMEOUT);
}

/**
* Default constructor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.messaging.MessageHeaders;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

/**
* A {@link MessageHeaders} implementation for Kafka.
*
Expand All @@ -38,16 +44,6 @@ public class KafkaHeaders implements MessageHeaders {
private final Headers headers;
private final ConversionService conversionService;

/**
* Constructs a new instance for the given headers.
*
* @param headers The kafka headers
*/
@Deprecated
public KafkaHeaders(Headers headers) {
this(headers, ConversionService.SHARED);
}

/**
* Constructs a new instance for the given headers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArrayUtils;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.consumer.ConsumerRecord;

Expand All @@ -44,24 +43,12 @@ public class ConsumerRecordBinderRegistry implements ArgumentBinderRegistry<Cons
private final Map<Class<? extends Annotation>, ConsumerRecordBinder<?>> byAnnotation = new HashMap<>();
private final Map<Integer, ConsumerRecordBinder<?>> byType = new HashMap<>();

/**
* Creates the registry for the given binders.
*
* @param binders The binders
* @deprecated Use conversion service constructor instead.
*/
@Deprecated
public ConsumerRecordBinderRegistry(ConsumerRecordBinder<?>... binders) {
this(ConversionService.SHARED, binders);
}

/**
* Creates the registry for the given binders.
*
* @param conversionService The conversion service
* @param binders The binders
*/
@Inject
public ConsumerRecordBinderRegistry(ConversionService conversionService, ConsumerRecordBinder<?>... binders) {
if (ArrayUtils.isNotEmpty(binders)) {
for (ConsumerRecordBinder<?> binder : binders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
package io.micronaut.configuration.kafka.exceptions;

import io.micronaut.configuration.kafka.config.DefaultKafkaListenerExceptionHandlerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaListenerExceptionHandlerConfigurationProperties;
import io.micronaut.context.annotation.Primary;
import jakarta.inject.Inject;
import io.micronaut.core.annotation.NonNull;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -28,7 +27,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micronaut.core.annotation.NonNull;
import java.util.Collections;
import java.util.Optional;
import java.util.regex.Matcher;
Expand All @@ -55,20 +53,11 @@ public class DefaultKafkaListenerExceptionHandler implements KafkaListenerExcept
*
* @param config The default Kafka listener exception handler configuration
*/
@Inject
public DefaultKafkaListenerExceptionHandler(DefaultKafkaListenerExceptionHandlerConfiguration config) {
skipRecordOnDeserializationFailure = config.isSkipRecordOnDeserializationFailure();
commitRecordOnDeserializationFailure = config.isCommitRecordOnDeserializationFailure();
}

/**
* @deprecated Use {@link DefaultKafkaListenerExceptionHandler#DefaultKafkaListenerExceptionHandler(DefaultKafkaListenerExceptionHandlerConfiguration)}
*/
@Deprecated(since = "5.1.0", forRemoval = true)
public DefaultKafkaListenerExceptionHandler() {
this(new DefaultKafkaListenerExceptionHandlerConfigurationProperties());
}

@Override
public void handle(KafkaListenerException exception) {
final Throwable cause = exception.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.micronaut.health.HealthStatus;
import io.micronaut.management.health.indicator.HealthIndicator;
import io.micronaut.management.health.indicator.HealthResult;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.AdminClient;
Expand All @@ -44,7 +43,11 @@
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import static org.apache.kafka.clients.NetworkClientUtils.awaitReady;
Expand Down Expand Up @@ -84,7 +87,6 @@ public class KafkaHealthIndicator implements HealthIndicator, ClusterResourceLis
* @param networkClientCreator Functional interface to create a {@link NetworkClient}.
* @param kafkaHealthConfiguration Kafka Health indicator configuration
*/
@Inject
public KafkaHealthIndicator(BeanContext beanContext,
KafkaDefaultConfiguration defaultConfiguration,
NetworkClientCreator networkClientCreator,
Expand All @@ -95,22 +97,6 @@ public KafkaHealthIndicator(BeanContext beanContext,
this.kafkaHealthConfiguration = kafkaHealthConfiguration;
}

/**
* Constructs a new Kafka health indicator for the given arguments.
*
* @param adminClient The admin client
* @param defaultConfiguration The default configuration
* @deprecated Use {@link KafkaHealthIndicator(BeanContext, KafkaDefaultConfiguration, NetworkClientCreator, KafkaHealthConfiguration)} instead.
*/
@Deprecated(forRemoval = true)
public KafkaHealthIndicator(AdminClient adminClient,
KafkaDefaultConfiguration defaultConfiguration) {
this.adminClientSupplier = () -> adminClient;
this.defaultConfiguration = defaultConfiguration;
this.networkClientSupplier = SupplierUtil.memoized(() -> new DefaultNetworkClientCreator(defaultConfiguration).create(this));
this.kafkaHealthConfiguration = new KafkaHealthConfigurationProperties();
}

@Override
public void onUpdate(ClusterResource clusterResource) {
this.clusterId = Optional.ofNullable(clusterResource).map(ClusterResource::clusterId).orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import io.micronaut.configuration.kafka.annotation.ErrorStrategy
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.configuration.kafka.retry.DefaultConditionalRetryBehaviourHandler
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler
import io.micronaut.configuration.kafka.retry.ConditionalRetryBehaviourHandler
import io.micronaut.configuration.kafka.retry.DefaultConditionalRetryBehaviourHandler
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.Blocking
Expand All @@ -23,8 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger

import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.NONE
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RESUME_AT_NEXT_RECORD
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RETRY_CONDITIONALLY_ON_ERROR
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RETRY_CONDITIONALLY_ON_ERROR
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RETRY_ON_ERROR
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
Expand Down Expand Up @@ -335,27 +335,6 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
myConsumer.times[1] - myConsumer.times[0] >= 5_000
}

/**
* @deprecated This test is deprecated as the poll next strategy is default to ensure backwards
* compatibility with existing (broken) functionality that people may have workarounds for with
* custom error handlers.
*/
@Deprecated
void "test an exception that is thrown is not committed with default error strategy"() {
when:"A consumer throws an exception"
PollNextErrorClient myClient = context.getBean(PollNextErrorClient)
myClient.sendMessage("One")
myClient.sendMessage("Two")

PollNextErrorCausingConsumer myConsumer = context.getBean(PollNextErrorCausingConsumer)

then:"The message is re-delivered and eventually handled"
conditions.eventually {
myConsumer.received.size() == 2
myConsumer.count.get() == 3
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = SYNC, errorStrategy = @ErrorStrategy(value = RESUME_AT_NEXT_RECORD))
static class ResumeAtNextRecordErrorCausingConsumer {
Expand Down
21 changes: 21 additions & 0 deletions src/main/docs/guide/breaks.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
This section documents breaking changes between Micronaut Kafka versions:

=== Micronaut Kafka 6.0.0

- The Singleton constructor `io.micronaut.configuration.kafka.bind.ConsumerRecordBinderRegistry(ConsumerRecordBinder<?>)` deprecated previously has been removed.
`ConsumerRecordBinderRegistry(ConversionService, ConsumerRecordBinder<?>)` is used instead.

- The Singleton constructor `io.micronaut.configuration.kafka.exceptions.DefaultKafkaListenerExceptionHandler()` deprecated previously has been removed.
`DefaultKafkaListenerExceptionHandler(DefaultKafkaListenerExceptionHandlerConfiguration)` is used instead.

- The Singleton constructor `io.micronaut.configuration.kafka.health.KafkaHealthIndicator(AdminClient,
KafkaDefaultConfiguration)` deprecated previously has been removed.
`KafkaHealthIndicator(BeanContext, KafkaDefaultConfiguration, NetworkClientCreator, KafkaHealthConfiguration)` is used instead.

- The constructor `io.micronaut.configuration.kafka.KafkaHeaders(Headers)` deprecated previously has been removed.
Use `KafkaHeaders(Headers, ConversionService)` instead.

- The constructor `io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder(Properties)` deprecated previously has been removed.
Use `ConfiguredStreamBuilder(Properties, String, Duration)` instead.


1 change: 1 addition & 0 deletions src/main/docs/guide/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ kafkaStreams:
kafkaStreamHealth: Kafka Stream Health Checks
kafkaStreamExceptions: Handling Uncaught Exceptions
kafkaGuides: Guides
breaks: Breaking Changes
repository: Repository
Loading