Skip to content

Commit

Permalink
Make it easy to configure compression and linger
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Jan 19, 2025
1 parent 9133254 commit cc605df
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package zio.kafka.producer

import org.apache.kafka.common.record.CompressionType
import zio._
import zio.test._

object ProducerCompressionSpec extends ZIOSpecDefault {

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("ProducerCompression")(
test("all Kafka supported compression codes have a corresponding ProducerCompression") {
val compressions = Seq(
ProducerCompression.NoCompression,
ProducerCompression.Gzip(),
ProducerCompression.Snappy(),
ProducerCompression.Lz4(),
ProducerCompression.Zstd()
)
val availableCompressionsCount = CompressionType.values().length
assertTrue(availableCompressionsCount == compressions.size)
}
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package zio.kafka.producer

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.record.CompressionType

abstract sealed class ProducerCompression(name: String, extra: Option[(String, AnyRef)] = None) {
def properties: Map[String, AnyRef] =
Map(ProducerConfig.COMPRESSION_TYPE_CONFIG -> name) ++ extra
}

/**
* The compression codecs that Kafka supports while producing records.
*/
object ProducerCompression {

/** Produce kafka records without compression. */
case object NoCompression extends ProducerCompression(CompressionType.NONE.name)

/**
* Produce kafka records with GZIP compression.
* @param level
* a value between 1 and 9 or -1 (defaults to -1)
*/
case class Gzip(level: Int = CompressionType.GZIP.defaultLevel())
extends ProducerCompression(
CompressionType.GZIP.name,
Some(ProducerConfig.COMPRESSION_GZIP_LEVEL_CONFIG -> Int.box(level))
)

/** Produce kafka records with Snappy compression. */
case class Snappy() extends ProducerCompression(CompressionType.SNAPPY.name)

/**
* Produce kafka records with Lz4 compression.
* @param level
* a value between 1 and 17 (defaults to 9)
*/
case class Lz4(level: Int = CompressionType.LZ4.defaultLevel())
extends ProducerCompression(
CompressionType.LZ4.name,
Some(ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG -> Int.box(level))
)

/**
* Produce kafka records with Zstd compression.
*
* @param level
* a value between -131072 and 22 (defaults to 3)
*/
case class Zstd(level: Int = CompressionType.ZSTD.defaultLevel())
extends ProducerCompression(
CompressionType.ZSTD.name,
Some(ProducerConfig.COMPRESSION_ZSTD_LEVEL_CONFIG -> Int.box(level))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import zio.kafka.security.KafkaCredentialStore
* To stay source compatible with future releases, you are recommended to construct the settings as follows:
* {{{
* ProducerSettings(bootstrapServers)
* .withCloseTimeout(30.seconds)
* .withLinger(500.millis)
* .withCompression(ProducerCompression.Zstd(3))
* .... etc.
* }}}
*/
Expand Down Expand Up @@ -43,6 +44,24 @@ final case class ProducerSettings(
def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings =
withProperties(credentialsStore.properties)

/**
* @param lingerDuration
* The maximum amount of time a record is allowed to linger in the producer's internal buffer. Higher values allow
* for better batching (especially important when compression is used), lower values reduce latency and memory
* usage.
*/
def withLinger(lingerDuration: Duration): ProducerSettings =
withProperty(ProducerConfig.LINGER_MS_CONFIG, lingerDuration.toMillis.toString)

/**
* @param compression
* The compression codec to use when publishing records. Compression is of full batches of data, so the efficacy of
* batching will also impact the compression ratio (more batching means better compression). See also
* [[withLinger]].
*/
def withCompression(compression: ProducerCompression): ProducerSettings =
withProperties(compression.properties)

/**
* @param sendBufferSize
* The maximum number of record chunks that can queue up while waiting for the underlying producer to become
Expand Down

0 comments on commit cc605df

Please sign in to comment.