Skip to content

Commit

Permalink
Add seekToBeginning option, reuse instances of kafka (de)serializers (#…
Browse files Browse the repository at this point in the history
…179)

* Add seekToBeginning option, reuse instances of kafka (de)serializers

* disable 2.13 in ci

* Change ObservableSeekOnStart to ADT, scalafmt
  • Loading branch information
Avasil authored Apr 7, 2020
1 parent 4caa1c4 commit 810a311
Show file tree
Hide file tree
Showing 47 changed files with 372 additions and 104 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy val warnUnusedImport = Seq(
lazy val sharedSettings = warnUnusedImport ++ Seq(
organization := "io.monix",
scalaVersion := "2.12.10",
crossScalaVersions := Seq("2.11.12", "2.12.10", "2.13.0"),
crossScalaVersions := Seq("2.11.12", "2.12.10", "2.13.1"),

scalacOptions ++= Seq(
// warnings
Expand Down
5 changes: 3 additions & 2 deletions kafka-0.10.x/src/main/resources/monix/kafka/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ kafka {
# Number of requests that KafkaProducerSink
# can push in parallel
monix.producer.sink.parallelism = 100
# Triggers a seekToEnd when the observable starts
monix.observable.seekEnd.onStart = false
# Triggers either seekToEnd or seektoBeginning when the observable starts
# Possible values: end, beginning, no-seek
monix.observable.seek.onStart = "no-seek"
# Possible values: sync, async
monix.observable.commit.type = "sync"
# Possible values: before-ack, after-ack or no-ack
Expand Down
1 change: 1 addition & 0 deletions kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
}
Expand Down
3 changes: 2 additions & 1 deletion kafka-0.10.x/src/main/scala/monix/kafka/Deserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object Deserializer {
implicit def fromKafkaDeserializer[A](implicit des: KafkaDeserializer[A]): Deserializer[A] =
Deserializer[A](
className = des.getClass.getName,
classType = des.getClass
classType = des.getClass,
constructor = _ => des
)

/** Alias for the function that provides an instance of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ final case class KafkaConsumerConfig(
retryBackoffTime: FiniteDuration,
observableCommitType: ObservableCommitType,
observableCommitOrder: ObservableCommitOrder,
observableSeekToEndOnStart: Boolean,
observableSeekOnStart: ObservableSeekOnStart,
properties: Map[String, String]) {

def toMap: Map[String, String] = properties ++ Map(
Expand Down Expand Up @@ -428,7 +428,7 @@ object KafkaConsumerConfig {
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")),
properties = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
implicit val s = scheduler
val feedTask = consumer.flatMap { c =>
// Skipping all available messages on all partitions
if (config.observableSeekToEndOnStart) c.seekToEnd(Nil.asJavaCollection)
if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection)
else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection)
// A task to execute on both cancellation and normal termination
val onCancel = cancelTask(c)
runLoop(c, out).guarantee(onCancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis

case class CommitWithConsumer(consumer: KafkaConsumer[K, V]) extends Commit {

override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] =
Task(blocking(consumer.synchronized(consumer.commitSync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava))))

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task {
blocking(consumer.synchronized(consumer.commitAsync(batch.map {
Expand Down
28 changes: 16 additions & 12 deletions kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ import scala.util.{Failure, Success}
* - An error with `Task.raiseError` which will finish the stream with an error.
*/
final class KafkaProducerSink[K, V] private (
producer: Coeval[KafkaProducer[K, V]],
shouldTerminate: Boolean,
parallelism: Int,
onSendError: Throwable => Task[Ack])
extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable {
producer: Coeval[KafkaProducer[K, V]],
shouldTerminate: Boolean,
parallelism: Int,
onSendError: Throwable => Task[Ack])
extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable {

require(parallelism >= 1, "parallelism >= 1")

def createSubscriber(
cb: Callback[Throwable, Unit],
s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = {
cb: Callback[Throwable, Unit],
s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = {
val out = new Subscriber[Seq[ProducerRecord[K, V]]] { self =>
implicit val scheduler: Scheduler = s
private[this] val p = producer.memoize
Expand Down Expand Up @@ -102,10 +102,11 @@ final class KafkaProducerSink[K, V] private (

object KafkaProducerSink extends StrictLogging {

private[this] val onSendErrorDefault = (ex: Throwable) => Task {
logger.error("Unexpected error in KafkaProducerSink", ex)
Continue
}
private[this] val onSendErrorDefault = (ex: Throwable) =>
Task {
logger.error("Unexpected error in KafkaProducerSink", ex)
Continue
}

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(
Expand All @@ -126,6 +127,9 @@ object KafkaProducerSink extends StrictLogging {
apply(producer, parallelism, onSendErrorDefault)

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](producer: Coeval[KafkaProducer[K, V]], parallelism: Int, onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] =
def apply[K, V](
producer: Coeval[KafkaProducer[K, V]],
parallelism: Int,
onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] =
new KafkaProducerSink(producer, shouldTerminate = false, parallelism = parallelism, onSendError)
}
3 changes: 2 additions & 1 deletion kafka-0.10.x/src/main/scala/monix/kafka/Serializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object Serializer {
implicit def fromKafkaSerializer[A](implicit ser: KafkaSerializer[A]): Serializer[A] =
Serializer[A](
className = ser.getClass.getName,
classType = ser.getClass
classType = ser.getClass,
constructor = _ => ser
)

/** Alias for the function that provides an instance of
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package monix.kafka.config

import com.typesafe.config.ConfigException.BadValue

/** Specifies whether to call `seekToEnd` or `seekToBeginning` when starting
* [[monix.kafka.KafkaConsumerObservable KafkaConsumerObservable]]
*
* Available options:
*
* - [[ObservableSeekOnStart.End]]
* - [[ObservableSeekOnStart.Beginning]]
* - [[ObservableSeekOnStart.NoSeek]]
*/
sealed trait ObservableSeekOnStart extends Serializable {
def id: String

def isSeekBeginning: Boolean =
this match {
case ObservableSeekOnStart.Beginning => true
case _ => false
}

def isSeekEnd: Boolean =
this match {
case ObservableSeekOnStart.End => true
case _ => false
}
}

object ObservableSeekOnStart {

@throws(classOf[BadValue])
def apply(id: String): ObservableSeekOnStart =
id match {
case End.id => End
case Beginning.id => Beginning
case NoSeek.id => NoSeek
case _ =>
throw new BadValue("kafka.monix.observable.seek.onStart", s"Invalid value: $id")
}

/** Calls `consumer.seekToEnd()` when starting consumer.
*/
case object End extends ObservableSeekOnStart {
val id = "end"
}

/** Calls `consumer.seekToBeginning()` when starting consumer.
*/
case object Beginning extends ObservableSeekOnStart {
val id = "beginning"
}

/** Does not call neither `consumer.seekToEnd()` nor `consumer.seekToBeginning`
* when starting consumer.
*/
case object NoSeek extends ObservableSeekOnStart {
val id = "no-seek"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe

val commitCallbacks: List[Commit] = List.fill(4)(new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
})
Expand All @@ -33,9 +34,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe
val partitions = offsets.map(_.topicPartition)
val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets)

received.foreach { batch =>
partitions should contain allElementsOf batch.offsets.keys
}
received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys }

received.size should be <= 4
}
Expand Down Expand Up @@ -64,9 +63,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe
.mergeMap(i => createConsumer(i.toInt, topicName).take(500))
.bufferTumbling(2000)
.map(CommittableOffsetBatch.mergeByCommitCallback)
.map { offsetBatches =>
assert(offsetBatches.length == 4)
}
.map { offsetBatches => assert(offsetBatches.length == 4) }
.completedL

Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit {
val listT = consumer
.executeOn(io)
.bufferTumbling(count)
.map { messages =>
messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset))
}
.map { messages => messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) }
.mapEval { case (values, batch) => Task.shift *> batch.commitSync().map(_ => values -> batch.offsets) }
.headL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class AFailingSerializer extends ASerializer {
}

class AHalfFailingSerializer extends ASerializer {

override def serialize(topic: String, data: A): Array[Byte] = {
if (data.value.toInt % 2 == 0) super.serialize(topic, data)
else throw new RuntimeException("fail")
Expand Down
5 changes: 3 additions & 2 deletions kafka-0.11.x/src/main/resources/monix/kafka/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ kafka {
# Number of requests that KafkaProducerSink
# can push in parallel
monix.producer.sink.parallelism = 100
# Triggers a seekToEnd when the observable starts
monix.observable.seekEnd.onStart = false
# Triggers either seekToEnd or seektoBeginning when the observable starts
# Possible values: end, beginning, no-seek
monix.observable.seek.onStart = "no-seek"
# Possible values: sync, async
monix.observable.commit.type = "sync"
# Possible values: before-ack, after-ack or no-ack
Expand Down
1 change: 1 addition & 0 deletions kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
}
Expand Down
3 changes: 2 additions & 1 deletion kafka-0.11.x/src/main/scala/monix/kafka/Deserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object Deserializer {
implicit def fromKafkaDeserializer[A](implicit des: KafkaDeserializer[A]): Deserializer[A] =
Deserializer[A](
className = des.getClass.getName,
classType = des.getClass
classType = des.getClass,
constructor = _ => des
)

/** Alias for the function that provides an instance of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ final case class KafkaConsumerConfig(
retryBackoffTime: FiniteDuration,
observableCommitType: ObservableCommitType,
observableCommitOrder: ObservableCommitOrder,
observableSeekToEndOnStart: Boolean,
observableSeekOnStart: ObservableSeekOnStart,
properties: Map[String, String]) {

def toMap: Map[String, String] = properties ++ Map(
Expand Down Expand Up @@ -435,7 +435,7 @@ object KafkaConsumerConfig {
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")),
properties = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
implicit val s = scheduler
val feedTask = consumer.flatMap { c =>
// Skipping all available messages on all partitions
if (config.observableSeekToEndOnStart) c.seekToEnd(Nil.asJavaCollection)
if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection)
else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection)
// A task to execute on both cancellation and normal termination
val onCancel = cancelTask(c)
runLoop(c, out).guarantee(onCancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis

case class CommitWithConsumer(consumer: KafkaConsumer[K, V]) extends Commit {

override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] =
Task(blocking(consumer.synchronized(consumer.commitSync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava))))

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task {
blocking(consumer.synchronized(consumer.commitAsync(batch.map {
Expand Down
28 changes: 16 additions & 12 deletions kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ import scala.util.{Failure, Success}
* - An error with `Task.raiseError` which will finish the stream with an error.
*/
final class KafkaProducerSink[K, V] private (
producer: Coeval[KafkaProducer[K, V]],
shouldTerminate: Boolean,
parallelism: Int,
onSendError: Throwable => Task[Ack])
extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable {
producer: Coeval[KafkaProducer[K, V]],
shouldTerminate: Boolean,
parallelism: Int,
onSendError: Throwable => Task[Ack])
extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable {

require(parallelism >= 1, "parallelism >= 1")

def createSubscriber(
cb: Callback[Throwable, Unit],
s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = {
cb: Callback[Throwable, Unit],
s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = {
val out = new Subscriber[Seq[ProducerRecord[K, V]]] { self =>
implicit val scheduler: Scheduler = s
private[this] val p = producer.memoize
Expand Down Expand Up @@ -102,10 +102,11 @@ final class KafkaProducerSink[K, V] private (

object KafkaProducerSink extends StrictLogging {

private[this] val onSendErrorDefault = (ex: Throwable) => Task {
logger.error("Unexpected error in KafkaProducerSink", ex)
Continue
}
private[this] val onSendErrorDefault = (ex: Throwable) =>
Task {
logger.error("Unexpected error in KafkaProducerSink", ex)
Continue
}

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(
Expand All @@ -126,6 +127,9 @@ object KafkaProducerSink extends StrictLogging {
apply(producer, parallelism, onSendErrorDefault)

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](producer: Coeval[KafkaProducer[K, V]], parallelism: Int, onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] =
def apply[K, V](
producer: Coeval[KafkaProducer[K, V]],
parallelism: Int,
onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] =
new KafkaProducerSink(producer, shouldTerminate = false, parallelism = parallelism, onSendError)
}
3 changes: 2 additions & 1 deletion kafka-0.11.x/src/main/scala/monix/kafka/Serializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object Serializer {
implicit def fromKafkaSerializer[A](implicit ser: KafkaSerializer[A]): Serializer[A] =
Serializer[A](
className = ser.getClass.getName,
classType = ser.getClass
classType = ser.getClass,
constructor = _ => ser
)

/** Alias for the function that provides an instance of
Expand Down
Loading

0 comments on commit 810a311

Please sign in to comment.