Skip to content

Commit

Permalink
Merge pull request #11 from soundvibe/v3.8.0
Browse files Browse the repository at this point in the history
3.8.0
  • Loading branch information
soundvibe authored Sep 6, 2024
2 parents 783e6af + 75b6cad commit 07a46cb
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
java: [ 8, 9, 10, 11, 12, 13, 14, 15 ]
java: [ 11, 12, 13, 14, 15 ]
name: Java ${{ matrix.java }} run tests
steps:
- uses: actions/checkout@v2
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# kafka-config

Easy to use type-safe builders for kafka clients.
Supports Java >= 8.
Supports Java >= 11.
kafka-config version aligns with kafka-client version.

## Motivation
Expand Down Expand Up @@ -105,7 +105,7 @@ Binaries and dependency information for Maven, Ivy, Gradle and others can be fou
Example for Gradle:

```groovy
compile 'net.soundvibe:kafka-config:2.7.0'
compile 'net.soundvibe:kafka-config:3.8.0'
```

and for Maven:
Expand All @@ -114,7 +114,7 @@ and for Maven:
<dependency>
<groupId>net.soundvibe</groupId>
<artifactId>kafka-config</artifactId>
<version>2.7.0</version>
<version>3.8.0</version>
</dependency>
```

Expand Down
22 changes: 14 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>net.soundvibe</groupId>
<artifactId>kafka-config</artifactId>
<version>2.7.0</version>
<version>3.8.0</version>
<packaging>jar</packaging>
<name>kafka-config</name>
<description>Typesafe configuration for Kafka clients</description>
Expand Down Expand Up @@ -34,9 +34,9 @@
</developers>

<properties>
<java.version>1.8</java.version>
<java.version>11</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.7.0</kafka.version>
<kafka.version>3.8.0</kafka.version>
</properties>

<distributionManagement>
Expand Down Expand Up @@ -65,10 +65,16 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.14</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.6.2</version>
<version>5.10.0</version>
<scope>test</scope>
</dependency>

Expand Down Expand Up @@ -104,7 +110,7 @@

<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<version>3.3.0</version>
<executions>
<execution>
<id>attach-sources</id>
Expand All @@ -117,7 +123,7 @@

<plugin>
<artifactId>maven-release-plugin</artifactId>
<version>2.5.3</version>
<version>3.0.1</version>
<configuration>
<goals>deploy</goals>
<pushChanges>false</pushChanges>
Expand All @@ -128,7 +134,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.5</version>
<version>0.8.10</version>
<executions>
<execution>
<goals>
Expand All @@ -148,7 +154,7 @@
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.8</version>
<version>1.7.0</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,46 @@ public T withClientDNSLookup(ClientDnsLookup clientDNSLookup) {
return (T) this;
}

/**
* Default inner class of list serde for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.
* This configuration will be read if and only if <code>default.key.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>
*/
public T withDefaultListKeySerdeInnerClass(String defaultListKeySerdeInnerClass) {
props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, defaultListKeySerdeInnerClass);
return (T) this;
}

/**
* Default inner class of list serde for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.
* This configuration will be read if and only if <code>default.value.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>
*/
public T withDefaultListValueSerdeInnerClass(String defaultListValueSerdeInnerClass) {
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, defaultListValueSerdeInnerClass);
return (T) this;
}

/**
* Default class for key that implements the <code>java.util.List</code> interface.
* This configuration will be read if and only if <code>default.key.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>
* Note when list serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via
* `DEFAULT_LIST_KEY_SERDE_INNER_CLASS'
*/
public T withDefaultListKeySerdeTypeClass(String defaultListKeySerdeTypeClass) {
props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, defaultListKeySerdeTypeClass);
return (T) this;
}

/**
* Default class for value that implements the <code>java.util.List</code> interface.
* This configuration will be read if and only if <code>default.value.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>
* Note when list serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via
* `DEFAULT_LIST_VALUE_SERDE_INNER_CLASS'
*/
public T withDefaultListValueSerdeTypeClass(String defaultListValueSerdeTypeClass) {
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, defaultListValueSerdeTypeClass);
return (T) this;
}

/**
* The period of time after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
*/
Expand All @@ -83,6 +123,17 @@ public T withSendBufferBytes(int sendBufferBytes) {
return (T) this;
}

/**
* The maximum allowed time for each worker to join the group
* once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to "
* flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed "
* from the group, which will cause offset commit failures.
*/
public T withRebalanceTimeout(Duration rebalanceTimeout) {
props.put(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG, rebalanceTimeout.toMillis());
return (T) this;
}

/**
* The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
*/
Expand Down Expand Up @@ -119,6 +170,23 @@ public T withRetryBackoff(Duration retryBackoff) {
return (T) this;
}

/**
* Controls how the client recovers when none of the brokers known to it is available.
* If set to <code>none</code>, the client fails. If set to <code>rebootstrap</code>,
* the client repeats the bootstrap process using <code>bootstrap.servers</code>.
* Rebootstrapping is useful when a client communicates with brokers so infrequently
* that the set of brokers may change entirely before the client refreshes metadata.
* Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously.
* Brokers appear unavailable when disconnected and no current retry attempt is in-progress.
* Consider increasing <code>reconnect.backoff.ms</code> and <code>reconnect.backoff.max.ms</code> and
* decreasing <code>socket.connection.setup.timeout.ms</code> and <code>socket.connection.setup.timeout.max.ms</code>
* for the client.
*/
public T withMetadataRecoveryStrategy(MetadataRecoveryStrategy metadataRecoveryStrategy) {
props.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, metadataRecoveryStrategy.name);
return (T) this;
}

/**
* The window of time a metrics sample is computed over.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,26 @@ public ConsumerConfigBuilder withGroupInstanceId(String groupInstanceId) {
return this;
}

/**
* The group protocol consumer should use. We currently
* support "classic" or "consumer". If "consumer" is specified, then the consumer group protocol will be
* used. Otherwise, the classic group protocol will be used.
*/
public ConsumerConfigBuilder withGroupProtocol(GroupProtocol groupProtocol) {
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT));
return this;
}

/**
* The server-side assignor to use. If no assignor is specified,
* the group coordinator will pick one. This configuration is applied only if <code>group.protocol</code> is
* set to "consumer".
*/
public ConsumerConfigBuilder withGroupRemoteAssignor(String groupRemoteAssignor) {
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, groupRemoteAssignor);
return this;
}

/**
* The maximum number of records returned in a single call to poll().
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ public ProducerConfigBuilder withLinger(Duration linger) {
return this;
}

/**
* Controls how long the producer will cache metadata for a topic that's idle. If the elapsed
* time since a topic was last produced to exceeds the metadata idle duration, then the topic's
* metadata is forgotten and the next access to it will force a metadata fetch request.
*/
public ProducerConfigBuilder withMetadataMaxIdle(Duration metadataMaxIdle) {
props.put(ProducerConfig.METADATA_MAX_IDLE_CONFIG, metadataMaxIdle.toMillis());
return this;
}

/**
* This should be larger than <code>replica.lag.time.max.ms</code> (a broker configuration)
* to reduce the possibility of message duplication due to unnecessary producer retries.
Expand All @@ -89,6 +99,36 @@ public ProducerConfigBuilder withDeliveryTimeout(Duration deliveryTimeout) {
return this;
}

/**
* When set to 'true', the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers.
* If 'false', producer will try to distribute messages uniformly. Note: this setting has no effect if a custom partitioner is used.
*/
public ProducerConfigBuilder withPartitionerAdaptivePartitioningEnable(boolean partitionerAdaptivePartitioningEnable) {
props.put(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, partitionerAdaptivePartitioningEnable);
return this;
}

/**
* If a broker cannot process produce requests from a partition for <code>PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG</code> time,
* the partitioner treats that partition as not available. If the value is 0, this logic is disabled.
* Note: this setting has no effect if a custom partitioner is used or <code>PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG</code>
* is set to 'false'
*/
public ProducerConfigBuilder withPartitionerAvailabilityTimeout(Duration partitionerAvailabilityTimeout) {
props.put(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, partitionerAvailabilityTimeout.toMillis());
return this;
}

/**
* When set to 'true' the producer won't use record keys to choose a partition.
* If 'false', producer would choose a partition based on a hash of the key when a key is present.
* Note: this setting has no effect if a custom partitioner is used.
*/
public ProducerConfigBuilder withPartitionerIgnoreKeys(boolean partitionerIgnoreKeys) {
props.put(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG, partitionerIgnoreKeys);
return this;
}

/**
* The maximum size of a request in bytes. This setting will limit the number of record
* batches the producer will send in a single request to avoid sending huge requests.
Expand Down Expand Up @@ -137,6 +177,30 @@ public ProducerConfigBuilder withCompressionType(CompressionType compressionType
return this;
}

/**
* The compression level to use if `withCompressionType` is set to <code>gzip</code>.
*/
public ProducerConfigBuilder withCompressionGZipLevel(int compressionGZipLevel) {
props.put(ProducerConfig.COMPRESSION_GZIP_LEVEL_CONFIG, compressionGZipLevel);
return this;
}

/**
* The compression level to use if `withCompressionType` is set to <code>lz4</code>.
*/
public ProducerConfigBuilder withCompressionZ4Level(int compressionLZ4Level) {
props.put(ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG, compressionLZ4Level);
return this;
}

/**
* The compression level to use if `withCompressionType` is set to <code>zstd</code>.
*/
public ProducerConfigBuilder withCompressionZstdLevel(int compressionZstdLevel) {
props.put(ProducerConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, compressionZstdLevel);
return this;
}

/**
* The maximum number of unacknowledged requests the client will send on a single connection before blocking.
* Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of
Expand Down
Loading

0 comments on commit 07a46cb

Please sign in to comment.