diff --git a/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerCompressionSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerCompressionSpec.scala new file mode 100644 index 000000000..bf3f52f0a --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerCompressionSpec.scala @@ -0,0 +1,23 @@ +package zio.kafka.producer + +import org.apache.kafka.common.record.CompressionType +import zio._ +import zio.test._ + +object ProducerCompressionSpec extends ZIOSpecDefault { + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("ProducerCompression")( + test("all Kafka supported compression codes have a corresponding ProducerCompression") { + val compressions = Seq( + ProducerCompression.NoCompression, + ProducerCompression.Gzip(), + ProducerCompression.Snappy(), + ProducerCompression.Lz4(), + ProducerCompression.Zstd() + ) + val availableCompressionsCount = CompressionType.values().length + assertTrue(availableCompressionsCount == compressions.size) + } + ) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerCompression.scala b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerCompression.scala new file mode 100644 index 000000000..cb300759a --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerCompression.scala @@ -0,0 +1,55 @@ +package zio.kafka.producer + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.record.CompressionType + +abstract sealed class ProducerCompression(name: String, extra: Option[(String, AnyRef)] = None) { + def properties: Map[String, AnyRef] = + Map(ProducerConfig.COMPRESSION_TYPE_CONFIG -> name) ++ extra +} + +/** + * The compression codecs that Kafka supports while producing records. + */ +object ProducerCompression { + + /** Produce kafka records without compression. */ + case object NoCompression extends ProducerCompression(CompressionType.NONE.name) + + /** + * Produce kafka records with GZIP compression. + * @param level + * a value between 1 and 9 or -1 (defaults to -1) + */ + case class Gzip(level: Int = CompressionType.GZIP.defaultLevel()) + extends ProducerCompression( + CompressionType.GZIP.name, + Some(ProducerConfig.COMPRESSION_GZIP_LEVEL_CONFIG -> Int.box(level)) + ) + + /** Produce kafka records with Snappy compression. */ + case class Snappy() extends ProducerCompression(CompressionType.SNAPPY.name) + + /** + * Produce kafka records with Lz4 compression. + * @param level + * a value between 1 and 17 (defaults to 9) + */ + case class Lz4(level: Int = CompressionType.LZ4.defaultLevel()) + extends ProducerCompression( + CompressionType.LZ4.name, + Some(ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG -> Int.box(level)) + ) + + /** + * Produce kafka records with Zstd compression. + * + * @param level + * a value between -131072 and 22 (defaults to 3) + */ + case class Zstd(level: Int = CompressionType.ZSTD.defaultLevel()) + extends ProducerCompression( + CompressionType.ZSTD.name, + Some(ProducerConfig.COMPRESSION_ZSTD_LEVEL_CONFIG -> Int.box(level)) + ) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala index b0d1c27f7..2f5fd1531 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala @@ -10,7 +10,8 @@ import zio.kafka.security.KafkaCredentialStore * To stay source compatible with future releases, you are recommended to construct the settings as follows: * {{{ * ProducerSettings(bootstrapServers) - * .withCloseTimeout(30.seconds) + * .withLinger(500.millis) + * .withCompression(ProducerCompression.Zstd(3)) * .... etc. * }}} */ @@ -43,6 +44,24 @@ final case class ProducerSettings( def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings = withProperties(credentialsStore.properties) + /** + * @param lingerDuration + * The maximum amount of time a record is allowed to linger in the producer's internal buffer. Higher values allow + * for better batching (especially important when compression is used), lower values reduce latency and memory + * usage. + */ + def withLinger(lingerDuration: Duration): ProducerSettings = + withProperty(ProducerConfig.LINGER_MS_CONFIG, lingerDuration.toMillis.toString) + + /** + * @param compression + * The compression codec to use when publishing records. Compression is of full batches of data, so the efficacy of + * batching will also impact the compression ratio (more batching means better compression). See also + * [[withLinger]]. + */ + def withCompression(compression: ProducerCompression): ProducerSettings = + withProperties(compression.properties) + /** * @param sendBufferSize * The maximum number of record chunks that can queue up while waiting for the underlying producer to become