From 3e61ec040e19c41a77c92b11114cfd9723346a33 Mon Sep 17 00:00:00 2001 From: soundvibe Date: Sat, 7 Sep 2024 01:26:11 +0300 Subject: [PATCH 1/3] 3.8.0 --- pom.xml | 22 ++-- .../kafka/config/AbstractConfigBuilder.java | 68 ++++++++++ .../consumer/ConsumerConfigBuilder.java | 20 +++ .../producer/ProducerConfigBuilder.java | 64 +++++++++ .../config/streams/StreamsConfigBuilder.java | 123 ++++++++++++++++-- src/main/resources/logback.xml | 11 ++ .../config/admin/AdminConfigBuilderTest.java | 5 + .../consumer/ConsumerConfigBuilderTest.java | 2 + .../config/consumer/TestSecurityProvider.java | 12 +- .../producer/ProducerConfigBuilderTest.java | 7 +- .../streams/StreamsConfigBuilderTest.java | 20 ++- .../config/streams/TestPartitionGrouper.java | 13 -- .../streams/TestRocksDBConfigSetter.java | 5 + 13 files changed, 337 insertions(+), 35 deletions(-) create mode 100644 src/main/resources/logback.xml delete mode 100644 src/test/java/net/soundvibe/kafka/config/streams/TestPartitionGrouper.java diff --git a/pom.xml b/pom.xml index 4767c32..76908b1 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.soundvibe kafka-config - 2.7.0 + 3.8.0 jar kafka-config Typesafe configuration for Kafka clients @@ -34,9 +34,9 @@ - 1.8 + 11 UTF-8 - 2.7.0 + 3.8.0 @@ -65,10 +65,16 @@ provided + + ch.qos.logback + logback-classic + 1.4.14 + + org.junit.jupiter junit-jupiter - 5.6.2 + 5.10.0 test @@ -104,7 +110,7 @@ maven-source-plugin - 3.2.1 + 3.3.0 attach-sources @@ -117,7 +123,7 @@ maven-release-plugin - 2.5.3 + 3.0.1 deploy false @@ -128,7 +134,7 @@ org.jacoco jacoco-maven-plugin - 0.8.5 + 0.8.10 @@ -148,7 +154,7 @@ org.sonatype.plugins nexus-staging-maven-plugin - 1.6.8 + 1.7.0 true ossrh diff --git a/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java index c45fd68..522ce05 100644 --- a/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java @@ -67,6 +67,46 @@ public T withClientDNSLookup(ClientDnsLookup clientDNSLookup) { return (T) this; } + /** + * Default inner class of list serde for key that implements the org.apache.kafka.common.serialization.Serde interface. + * This configuration will be read if and only if default.key.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde + */ + public T withDefaultListKeySerdeInnerClass(String defaultListKeySerdeInnerClass) { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, defaultListKeySerdeInnerClass); + return (T) this; + } + + /** + * Default inner class of list serde for value that implements the org.apache.kafka.common.serialization.Serde interface. + * This configuration will be read if and only if default.value.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde + */ + public T withDefaultListValueSerdeInnerClass(String defaultListValueSerdeInnerClass) { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, defaultListValueSerdeInnerClass); + return (T) this; + } + + /** + * Default class for key that implements the java.util.List interface. + * This configuration will be read if and only if default.key.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde + * Note when list serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via + * `DEFAULT_LIST_KEY_SERDE_INNER_CLASS' + */ + public T withDefaultListKeySerdeTypeClass(String defaultListKeySerdeTypeClass) { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, defaultListKeySerdeTypeClass); + return (T) this; + } + + /** + * Default class for value that implements the java.util.List interface. + * This configuration will be read if and only if default.value.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde + * Note when list serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via + * `DEFAULT_LIST_VALUE_SERDE_INNER_CLASS' + */ + public T withDefaultListValueSerdeTypeClass(String defaultListValueSerdeTypeClass) { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, defaultListValueSerdeTypeClass); + return (T) this; + } + /** * The period of time after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. */ @@ -83,6 +123,17 @@ public T withSendBufferBytes(int sendBufferBytes) { return (T) this; } + /** + * The maximum allowed time for each worker to join the group + * once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to " + * flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed " + * from the group, which will cause offset commit failures. + */ + public T withRebalanceTimeout(Duration rebalanceTimeout) { + props.put(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG, rebalanceTimeout.toMillis()); + return (T) this; + } + /** * The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used. */ @@ -119,6 +170,23 @@ public T withRetryBackoff(Duration retryBackoff) { return (T) this; } + /** + * Controls how the client recovers when none of the brokers known to it is available. + * If set to none, the client fails. If set to rebootstrap, + * the client repeats the bootstrap process using bootstrap.servers. + * Rebootstrapping is useful when a client communicates with brokers so infrequently + * that the set of brokers may change entirely before the client refreshes metadata. + * Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. + * Brokers appear unavailable when disconnected and no current retry attempt is in-progress. + * Consider increasing reconnect.backoff.ms and reconnect.backoff.max.ms and + * decreasing socket.connection.setup.timeout.ms and socket.connection.setup.timeout.max.ms + * for the client. + */ + public T withMetadataRecoveryStrategy(MetadataRecoveryStrategy metadataRecoveryStrategy) { + props.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, metadataRecoveryStrategy.name); + return (T) this; + } + /** * The window of time a metrics sample is computed over. */ diff --git a/src/main/java/net/soundvibe/kafka/config/consumer/ConsumerConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/consumer/ConsumerConfigBuilder.java index b8a73b8..294c917 100644 --- a/src/main/java/net/soundvibe/kafka/config/consumer/ConsumerConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/consumer/ConsumerConfigBuilder.java @@ -38,6 +38,26 @@ public ConsumerConfigBuilder withGroupInstanceId(String groupInstanceId) { return this; } + /** + * The group protocol consumer should use. We currently + * support "classic" or "consumer". If "consumer" is specified, then the consumer group protocol will be + * used. Otherwise, the classic group protocol will be used. + */ + public ConsumerConfigBuilder withGroupProtocol(GroupProtocol groupProtocol) { + props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)); + return this; + } + + /** + * The server-side assignor to use. If no assignor is specified, + * the group coordinator will pick one. This configuration is applied only if group.protocol is + * set to "consumer". + */ + public ConsumerConfigBuilder withGroupRemoteAssignor(String groupRemoteAssignor) { + props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, groupRemoteAssignor); + return this; + } + /** * The maximum number of records returned in a single call to poll(). */ diff --git a/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java index c138a0e..62ca495 100644 --- a/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java @@ -65,6 +65,16 @@ public ProducerConfigBuilder withLinger(Duration linger) { return this; } + /** + * Controls how long the producer will cache metadata for a topic that's idle. If the elapsed + * time since a topic was last produced to exceeds the metadata idle duration, then the topic's + * metadata is forgotten and the next access to it will force a metadata fetch request. + */ + public ProducerConfigBuilder withMetadataMaxIdle(Duration metadataMaxIdle) { + props.put(ProducerConfig.METADATA_MAX_IDLE_CONFIG, metadataMaxIdle.toMillis()); + return this; + } + /** * This should be larger than replica.lag.time.max.ms (a broker configuration) * to reduce the possibility of message duplication due to unnecessary producer retries. @@ -89,6 +99,36 @@ public ProducerConfigBuilder withDeliveryTimeout(Duration deliveryTimeout) { return this; } + /** + * When set to 'true', the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers. + * If 'false', producer will try to distribute messages uniformly. Note: this setting has no effect if a custom partitioner is used. + */ + public ProducerConfigBuilder withPartitionerAdaptivePartitioningEnable(boolean partitionerAdaptivePartitioningEnable) { + props.put(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, partitionerAdaptivePartitioningEnable); + return this; + } + + /** + * If a broker cannot process produce requests from a partition for PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG time, + * the partitioner treats that partition as not available. If the value is 0, this logic is disabled. + * Note: this setting has no effect if a custom partitioner is used or PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG + * is set to 'false' + */ + public ProducerConfigBuilder withPartitionerAvailabilityTimeout(Duration partitionerAvailabilityTimeout) { + props.put(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, partitionerAvailabilityTimeout.toMillis()); + return this; + } + + /** + * When set to 'true' the producer won't use record keys to choose a partition. + * If 'false', producer would choose a partition based on a hash of the key when a key is present. + * Note: this setting has no effect if a custom partitioner is used. + */ + public ProducerConfigBuilder withPartitionerIgnoreKeys(boolean partitionerIgnoreKeys) { + props.put(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG, partitionerIgnoreKeys); + return this; + } + /** * The maximum size of a request in bytes. This setting will limit the number of record * batches the producer will send in a single request to avoid sending huge requests. @@ -137,6 +177,30 @@ public ProducerConfigBuilder withCompressionType(CompressionType compressionType return this; } + /** + * The compression level to use if `withCompressionType` is set to gzip. + */ + public ProducerConfigBuilder withCompressionGZipLevel(int compressionGZipLevel) { + props.put(ProducerConfig.COMPRESSION_GZIP_LEVEL_CONFIG, compressionGZipLevel); + return this; + } + + /** + * The compression level to use if `withCompressionType` is set to lz4. + */ + public ProducerConfigBuilder withCompressionZ4Level(int compressionLZ4Level) { + props.put(ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG, compressionLZ4Level); + return this; + } + + /** + * The compression level to use if `withCompressionType` is set to zstd. + */ + public ProducerConfigBuilder withCompressionZstdLevel(int compressionZstdLevel) { + props.put(ProducerConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, compressionZstdLevel); + return this; + } + /** * The maximum number of unacknowledged requests the client will send on a single connection before blocking. * Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of diff --git a/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java index 6e9e7d3..4260dee 100644 --- a/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java @@ -2,13 +2,17 @@ import net.soundvibe.kafka.config.AbstractConfigBuilder; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.*; import org.apache.kafka.streams.processor.*; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.state.DslStoreSuppliers; import org.apache.kafka.streams.state.RocksDBConfigSetter; import java.nio.file.Path; import java.time.Duration; +import java.util.List; public final class StreamsConfigBuilder extends AbstractConfigBuilder { @@ -27,6 +31,14 @@ public StreamsConfigBuilder withApplicationId(String applicationId) { return this; } + /** + * Version of the built-in metrics to use. + */ + public StreamsConfigBuilder withBuiltinMetricsVersion(String builtinMetricsVersion) { + props.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtinMetricsVersion); + return this; + } + /** * The replication factor for change log topics and repartition topics created by the stream processing application. */ @@ -43,6 +55,14 @@ public StreamsConfigBuilder withStateDir(Path stateDir) { return this; } + /** + * Maximum number of memory bytes to be used for statestore cache across all threads. + */ + public StreamsConfigBuilder withStateStoreCacheMaxBytes(long stateStoreCacheMaxBytes) { + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, stateStoreCacheMaxBytes); + return this; + } + /** * Maximum number of memory bytes to be used for buffering across all threads */ @@ -60,6 +80,16 @@ public StreamsConfigBuilder withClientId(String clientId) { return super.withClientId(clientId); } + /** + * The frequency in milliseconds with which to delete fully consumed records from repartition topics. + * Purging will occur after at least this value since the last purge, but may be delayed until later. + * (Note, unlike commit.interval.ms, the default for this value remains unchanged when processing.guarantee is set to EXACTLY_ONCE_V2). + */ + public StreamsConfigBuilder withRepartitionPurgeInterval(Duration repartitionPurgeInterval) { + props.put(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, repartitionPurgeInterval.toMillis()); + return this; + } + /** * Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface. */ @@ -104,6 +134,14 @@ public StreamsConfigBuilder withDefaultTimestampExtractor(Classorg.apache.kafka.streams.state.DslStoreSuppliers interface. + */ + public StreamsConfigBuilder withDSLStoreSupplierClass(Class dslStoreSupplierClass) { + props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, dslStoreSupplierClass); + return this; + } + /** * The number of standby replicas for each task. */ @@ -139,6 +177,34 @@ public StreamsConfigBuilder withProcessingGuarantee(ProcessingGuarantee processi return this; } + /** + * List of client tag keys used to distribute standby replicas across Kafka Streams instances. + * When configured, Kafka Streams will make a best-effort to distribute + * the standby tasks over each client tag dimension. + */ + public StreamsConfigBuilder withRackAwareAssignmentTags(List rackAwareAssignmentTags) { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags); + return this; + } + + /** + * The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. + * For a timeout of 0ms, a task would raise an error for the first internal error. + * For any timeout larger than 0ms, a task will retry at least once before an error is raised. + */ + public StreamsConfigBuilder withTaskTimeout(Duration taskTimeout) { + props.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, taskTimeout.toMillis()); + return this; + } + + /** + * Sets window size for the deserializer in order to calculate window end times. + */ + public StreamsConfigBuilder withWindowSize(Duration windowSize) { + props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, windowSize.toMillis()); + return this; + } + /** * A configuration telling Kafka Streams if it should optimize the topology, disabled by default */ @@ -173,16 +239,6 @@ public StreamsConfigBuilder withCommitInterval(Duration commitInterval) { return this; } - /** - * Partition grouper class that implements the org.apache.kafka.streams.processor.PartitionGrouper interface." - * WARNING: This config is deprecated and will be removed in 3.0.0 release. - */ - @Deprecated - public StreamsConfigBuilder withPartitionGrouper(Class partitionGrouper) { - props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, partitionGrouper); - return this; - } - /** * The amount of time to block waiting for input. */ @@ -218,6 +274,53 @@ public StreamsConfigBuilder withUpgradeFrom(UpgradeFrom upgradeFrom) { return this; } + /** + * Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface. + */ + public StreamsConfigBuilder withDefaultClientSupplier(Class defaultClientSupplier) { + props.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, defaultClientSupplier); + return this; + } + + /** + * The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning + * tasks to minimize cross rack traffic. Valid settings are : "RACK_AWARE_ASSIGNMENT_STRATEGY_NONE (default), which will disable rack aware assignment; RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, + * which will compute minimum cross rack traffic assignment; "RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY", which will compute minimum cross rack traffic and try to balance the tasks of same subtopolgies across different clients + */ + public StreamsConfigBuilder withRackAwareAssignmentStrategy(String rackAwareAssignmentStrategy) { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareAssignmentStrategy); + return this; + } + + /** + * Cost associated with cross rack traffic. This config and rack.aware.assignment.non_overlap_cost controls whether the + * optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value RackAwareTaskAssignor.class.getName() will + * optimize for minimizing cross rack traffic. The default value is null which means it will use default traffic cost values in different assignors. + */ + public StreamsConfigBuilder withRackAwareAssignmentTrafficCost(int rackAwareAssignmentTrafficCost) { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, rackAwareAssignmentTrafficCost); + return this; + } + + /** + * Cost associated with moving tasks from existing assignment. This config and RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG controls whether the + * optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value RackAwareTaskAssignor.class.getName() will + * optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors. + */ + public StreamsConfigBuilder withRackAwareAssignmentNonOverlapCost(int rackAwareAssignmentNonOverlapCost) { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, rackAwareAssignmentNonOverlapCost); + return this; + } + + /** + * A task assignor class or class name implementing the TaskAssignor.class.getName() interface. Defaults to the HighAvailabilityTaskAssignor class. + */ + public StreamsConfigBuilder withTaskAssignorClass(Class taskAssignorClass) { + props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, taskAssignorClass.getName()); + return this; + } + + /** * Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day */ diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..6eea430 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java b/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java index 008ab11..c62927c 100644 --- a/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java +++ b/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java @@ -1,6 +1,7 @@ package net.soundvibe.kafka.config.admin; import org.apache.kafka.clients.ClientDnsLookup; +import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.metrics.Sensor; import org.junit.jupiter.api.*; @@ -21,6 +22,7 @@ void should_build_all_properties() { .withBootstrapServers(BOOTSTRAP_SERVERS) .withRequestTimeout(Duration.ofSeconds(30)) .withClientId("clientId") + .withRebalanceTimeout(Duration.ofSeconds(30)) .withClientDNSLookup(ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) .withClientRack("rack") .withConnectionsMaxIdle(Duration.ofSeconds(30)) @@ -35,6 +37,9 @@ void should_build_all_properties() { .withCustomMap(new HashMap<>()) .withSocketConnectionSetupTimeout(Duration.ofSeconds(15)) .withSocketConnectionSetupTimeoutMax(Duration.ofSeconds(30)) + .withMetadataRecoveryStrategy(MetadataRecoveryStrategy.NONE) + .withDefaultListKeySerdeInnerClass("org.apache.kafka.common.serialization.Serdes.ListSerde") + .withDefaultListValueSerdeInnerClass("org.apache.kafka.common.serialization.Serdes.ListSerde") .buildProperties(); assertEquals(BOOTSTRAP_SERVERS, adminProps.getProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); diff --git a/src/test/java/net/soundvibe/kafka/config/consumer/ConsumerConfigBuilderTest.java b/src/test/java/net/soundvibe/kafka/config/consumer/ConsumerConfigBuilderTest.java index b72b851..379e927 100644 --- a/src/test/java/net/soundvibe/kafka/config/consumer/ConsumerConfigBuilderTest.java +++ b/src/test/java/net/soundvibe/kafka/config/consumer/ConsumerConfigBuilderTest.java @@ -46,10 +46,12 @@ void should_set_all_props() { Properties consumerProps = ConsumerConfigBuilder.create() .withBootstrapServers(BOOTSTRAP_SERVERS) .withGroupId("test-group") + .withGroupRemoteAssignor(null) .withAllowAutoCreateTopics(true) .withAutoOffsetReset(OffsetResetStrategy.EARLIEST) .withCheckCRCs(true) .withEnableAutoCommit(false) + .withGroupProtocol(GroupProtocol.CONSUMER) .withDefaultApiTimeout(Duration.ofMinutes(1)) .withAutoCommitInterval(Duration.ofSeconds(30)) .withExcludeInternalTopics(false) diff --git a/src/test/java/net/soundvibe/kafka/config/consumer/TestSecurityProvider.java b/src/test/java/net/soundvibe/kafka/config/consumer/TestSecurityProvider.java index bcc995e..3a7be49 100644 --- a/src/test/java/net/soundvibe/kafka/config/consumer/TestSecurityProvider.java +++ b/src/test/java/net/soundvibe/kafka/config/consumer/TestSecurityProvider.java @@ -1,13 +1,21 @@ package net.soundvibe.kafka.config.consumer; import org.apache.kafka.common.security.auth.SecurityProviderCreator; -import sun.security.pkcs11.SunPKCS11; import java.security.Provider; public class TestSecurityProvider implements SecurityProviderCreator { @Override public Provider getProvider() { - return new SunPKCS11(); + return new FooProvider(); } } + + +class FooProvider extends Provider { + + public FooProvider() { + super("foo", "v1", "info"); + } + +} \ No newline at end of file diff --git a/src/test/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilderTest.java b/src/test/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilderTest.java index 69de9d3..b150c09 100644 --- a/src/test/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilderTest.java +++ b/src/test/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilderTest.java @@ -22,7 +22,8 @@ void should_build_all_properties() { .withBootstrapServers(BOOTSTRAP_SERVERS) .withAcks(Acks.ALL) .withBatchSize(10) - .withCompressionType(CompressionType.SNAPPY) + .withCompressionType(CompressionType.ZSTD) + .withCompressionZstdLevel(3) .withEnableIdempotence(true) .withBufferMemory(1024L * 1000L) .withDeliveryTimeout(Duration.ofMinutes(10)) @@ -39,6 +40,10 @@ void should_build_all_properties() { .withClientId("clientId") .withMaxInFlightRequestsPerConnection(5) .withInterceptorClasses(TestProducerInterceptor.class) + .withMetadataMaxIdle(Duration.ofSeconds(30)) + .withPartitionerAdaptivePartitioningEnable(false) + .withPartitionerAvailabilityTimeout(Duration.ofSeconds(30)) + .withPartitionerIgnoreKeys(true) .buildMap(); assertEquals(BOOTSTRAP_SERVERS, producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); diff --git a/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java b/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java index afb5dea..7b38f5e 100644 --- a/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java +++ b/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java @@ -5,14 +5,22 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; +import org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; import org.junit.jupiter.api.*; import org.junit.jupiter.api.condition.DisabledOnOs; import java.io.IOException; import java.nio.file.Files; import java.time.Duration; +import java.util.List; +import java.util.Locale; import java.util.Properties; +import static org.apache.kafka.streams.StreamsConfig.METRICS_LATEST; +import static org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE; import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.condition.OS.WINDOWS; @@ -52,7 +60,6 @@ void should_build_all_properties() throws IOException { .withMaxTaskIdle(Duration.ofSeconds(60)) .withNumStandbyReplicas(3) .withNumStreamThreads(4) - .withPartitionGrouper(TestPartitionGrouper.class) .withPoll(Duration.ofSeconds(30)) .withProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE) .withReplicationFactor(3) @@ -65,6 +72,17 @@ void should_build_all_properties() throws IOException { .withAcceptableRecoveryLag(10_000L) .withMaxWarmupReplicas(2) .withProbingRebalanceInterval(Duration.ofMinutes(2)) + .withBuiltinMetricsVersion(METRICS_LATEST) + .withStateStoreCacheMaxBytes(1024) + .withRepartitionPurgeInterval(Duration.ofSeconds(60)) + .withDSLStoreSupplierClass(BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class) + .withTaskTimeout(Duration.ofMinutes(1)) + .withWindowSize(Duration.ofMinutes(5)) + .withDefaultClientSupplier(DefaultKafkaClientSupplier.class) + .withRackAwareAssignmentStrategy(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE) + .withRackAwareAssignmentTrafficCost(20) + .withRackAwareAssignmentNonOverlapCost(10) + .withTaskAssignorClass(StickyTaskAssignor.class) .buildProperties(); assertEquals(BOOTSTRAP_SERVERS, streamProps.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); diff --git a/src/test/java/net/soundvibe/kafka/config/streams/TestPartitionGrouper.java b/src/test/java/net/soundvibe/kafka/config/streams/TestPartitionGrouper.java deleted file mode 100644 index 1ecd1a1..0000000 --- a/src/test/java/net/soundvibe/kafka/config/streams/TestPartitionGrouper.java +++ /dev/null @@ -1,13 +0,0 @@ -package net.soundvibe.kafka.config.streams; - -import org.apache.kafka.common.*; -import org.apache.kafka.streams.processor.*; - -import java.util.*; - -public class TestPartitionGrouper implements PartitionGrouper { - @Override - public Map> partitionGroups(Map> topicGroups, Cluster metadata) { - return new HashMap<>(); - } -} diff --git a/src/test/java/net/soundvibe/kafka/config/streams/TestRocksDBConfigSetter.java b/src/test/java/net/soundvibe/kafka/config/streams/TestRocksDBConfigSetter.java index c3a27a5..282f8ba 100644 --- a/src/test/java/net/soundvibe/kafka/config/streams/TestRocksDBConfigSetter.java +++ b/src/test/java/net/soundvibe/kafka/config/streams/TestRocksDBConfigSetter.java @@ -10,4 +10,9 @@ public class TestRocksDBConfigSetter implements RocksDBConfigSetter { public void setConfig(String storeName, Options options, Map configs) { // } + + @Override + public void close(String storeName, Options options) { + // + } } From 4881d1a0280740d3e55ab85321cc27a3fc4c4187 Mon Sep 17 00:00:00 2001 From: "Linas N." Date: Sat, 7 Sep 2024 01:28:32 +0300 Subject: [PATCH 2/3] Update maven.yml --- .github/workflows/maven.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 3dabc7a..ceee660 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - java: [ 8, 9, 10, 11, 12, 13, 14, 15 ] + java: [ 11, 12, 13, 14, 15 ] name: Java ${{ matrix.java }} run tests steps: - uses: actions/checkout@v2 From 886e5ba15473e9081167fbad30d1ded2a47aee88 Mon Sep 17 00:00:00 2001 From: soundvibe Date: Sat, 7 Sep 2024 01:30:20 +0300 Subject: [PATCH 3/3] updated README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index fd20051..0babd67 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ # kafka-config Easy to use type-safe builders for kafka clients. -Supports Java >= 8. +Supports Java >= 11. kafka-config version aligns with kafka-client version. ## Motivation @@ -105,7 +105,7 @@ Binaries and dependency information for Maven, Ivy, Gradle and others can be fou Example for Gradle: ```groovy -compile 'net.soundvibe:kafka-config:2.7.0' +compile 'net.soundvibe:kafka-config:3.8.0' ``` and for Maven: @@ -114,7 +114,7 @@ and for Maven: net.soundvibe kafka-config - 2.7.0 + 3.8.0 ```