From 810a3114b3bc779226bb2a2967143695d61e94f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Gawry=C5=9B?= Date: Tue, 7 Apr 2020 22:27:05 +0200 Subject: [PATCH] Add seekToBeginning option, reuse instances of kafka (de)serializers (#179) * Add seekToBeginning option, reuse instances of kafka (de)serializers * disable 2.13 in ci * Change ObservableSeekOnStart to ADT, scalafmt --- build.sbt | 2 +- .../main/resources/monix/kafka/default.conf | 5 +- .../src/main/scala/monix/kafka/Commit.scala | 1 + .../main/scala/monix/kafka/Deserializer.scala | 3 +- .../monix/kafka/KafkaConsumerConfig.scala | 4 +- .../monix/kafka/KafkaConsumerObservable.scala | 3 +- .../KafkaConsumerObservableManualCommit.scala | 2 + .../scala/monix/kafka/KafkaProducerSink.scala | 28 +++++---- .../main/scala/monix/kafka/Serializer.scala | 3 +- .../kafka/config/ObservableSeekOnStart.scala | 60 +++++++++++++++++++ .../kafka/MergeByCommitCallbackTest.scala | 9 +-- .../monix/kafka/MonixKafkaTopicListTest.scala | 4 +- .../scala/monix/kafka/SerializationTest.scala | 1 + .../main/resources/monix/kafka/default.conf | 5 +- .../src/main/scala/monix/kafka/Commit.scala | 1 + .../main/scala/monix/kafka/Deserializer.scala | 3 +- .../monix/kafka/KafkaConsumerConfig.scala | 4 +- .../monix/kafka/KafkaConsumerObservable.scala | 3 +- .../KafkaConsumerObservableManualCommit.scala | 2 + .../scala/monix/kafka/KafkaProducerSink.scala | 28 +++++---- .../main/scala/monix/kafka/Serializer.scala | 3 +- .../kafka/config/ObservableSeekOnStart.scala | 60 +++++++++++++++++++ .../kafka/MergeByCommitCallbackTest.scala | 9 +-- .../monix/kafka/MonixKafkaTopicListTest.scala | 4 +- .../scala/monix/kafka/SerializationTest.scala | 1 + .../main/resources/monix/kafka/default.conf | 5 +- .../src/main/scala/monix/kafka/Commit.scala | 1 + .../main/scala/monix/kafka/Deserializer.scala | 3 +- .../monix/kafka/KafkaConsumerConfig.scala | 4 +- .../monix/kafka/KafkaConsumerObservable.scala | 3 +- .../KafkaConsumerObservableManualCommit.scala | 2 + .../scala/monix/kafka/KafkaProducerSink.scala | 28 +++++---- .../main/scala/monix/kafka/Serializer.scala | 3 +- .../kafka/config/ObservableSeekOnStart.scala | 60 +++++++++++++++++++ .../scala/monix/kafka/MonixKafkaTest.scala | 8 +-- .../main/resources/monix/kafka/default.conf | 5 +- .../src/main/scala/monix/kafka/Commit.scala | 1 + .../main/scala/monix/kafka/Deserializer.scala | 3 +- .../monix/kafka/KafkaConsumerConfig.scala | 4 +- .../monix/kafka/KafkaConsumerObservable.scala | 3 +- .../KafkaConsumerObservableManualCommit.scala | 2 + .../scala/monix/kafka/KafkaProducerSink.scala | 16 +++-- .../main/scala/monix/kafka/Serializer.scala | 3 +- .../kafka/config/ObservableSeekOnStart.scala | 60 +++++++++++++++++++ .../kafka/MergeByCommitCallbackTest.scala | 9 +-- .../monix/kafka/MonixKafkaTopicListTest.scala | 4 +- .../scala/monix/kafka/SerializationTest.scala | 1 + 47 files changed, 372 insertions(+), 104 deletions(-) create mode 100644 kafka-0.10.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala create mode 100644 kafka-0.11.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala create mode 100644 kafka-0.9.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala create mode 100644 kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala diff --git a/build.sbt b/build.sbt index f7f16a0b..f34eab0b 100644 --- a/build.sbt +++ b/build.sbt @@ -32,7 +32,7 @@ lazy val warnUnusedImport = Seq( lazy val sharedSettings = warnUnusedImport ++ Seq( organization := "io.monix", scalaVersion := "2.12.10", - crossScalaVersions := Seq("2.11.12", "2.12.10", "2.13.0"), + crossScalaVersions := Seq("2.11.12", "2.12.10", "2.13.1"), scalacOptions ++= Seq( // warnings diff --git a/kafka-0.10.x/src/main/resources/monix/kafka/default.conf b/kafka-0.10.x/src/main/resources/monix/kafka/default.conf index 4d97d73c..182c8300 100644 --- a/kafka-0.10.x/src/main/resources/monix/kafka/default.conf +++ b/kafka-0.10.x/src/main/resources/monix/kafka/default.conf @@ -70,8 +70,9 @@ kafka { # Number of requests that KafkaProducerSink # can push in parallel monix.producer.sink.parallelism = 100 - # Triggers a seekToEnd when the observable starts - monix.observable.seekEnd.onStart = false + # Triggers either seekToEnd or seektoBeginning when the observable starts + # Possible values: end, beginning, no-seek + monix.observable.seek.onStart = "no-seek" # Possible values: sync, async monix.observable.commit.type = "sync" # Possible values: before-ack, after-ack or no-ack diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala b/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala index 74354265..eb666b7b 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala @@ -33,6 +33,7 @@ private[kafka] object Commit { val empty: Commit = new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task.unit } diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/Deserializer.scala b/kafka-0.10.x/src/main/scala/monix/kafka/Deserializer.scala index 04194f9e..66ddfef4 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/Deserializer.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/Deserializer.scala @@ -49,7 +49,8 @@ object Deserializer { implicit def fromKafkaDeserializer[A](implicit des: KafkaDeserializer[A]): Deserializer[A] = Deserializer[A]( className = des.getClass.getName, - classType = des.getClass + classType = des.getClass, + constructor = _ => des ) /** Alias for the function that provides an instance of diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala index e4d94c4e..72e0adf3 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -243,7 +243,7 @@ final case class KafkaConsumerConfig( retryBackoffTime: FiniteDuration, observableCommitType: ObservableCommitType, observableCommitOrder: ObservableCommitOrder, - observableSeekToEndOnStart: Boolean, + observableSeekOnStart: ObservableSeekOnStart, properties: Map[String, String]) { def toMap: Map[String, String] = properties ++ Map( @@ -428,7 +428,7 @@ object KafkaConsumerConfig { retryBackoffTime = config.getInt("retry.backoff.ms").millis, observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")), observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")), - observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"), + observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")), properties = Map.empty ) } diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index 26fd9cb9..f2360315 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -64,7 +64,8 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { implicit val s = scheduler val feedTask = consumer.flatMap { c => // Skipping all available messages on all partitions - if (config.observableSeekToEndOnStart) c.seekToEnd(Nil.asJavaCollection) + if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) + else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) // A task to execute on both cancellation and normal termination val onCancel = cancelTask(c) runLoop(c, out).guarantee(onCancel) diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 648a0cfb..53160c6f 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -43,10 +43,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis case class CommitWithConsumer(consumer: KafkaConsumer[K, V]) extends Commit { + override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) => k -> new OffsetAndMetadata(v) }.asJava)))) + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task { blocking(consumer.synchronized(consumer.commitAsync(batch.map { diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index dac7d333..ec57f1df 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -38,17 +38,17 @@ import scala.util.{Failure, Success} * - An error with `Task.raiseError` which will finish the stream with an error. */ final class KafkaProducerSink[K, V] private ( - producer: Coeval[KafkaProducer[K, V]], - shouldTerminate: Boolean, - parallelism: Int, - onSendError: Throwable => Task[Ack]) - extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable { + producer: Coeval[KafkaProducer[K, V]], + shouldTerminate: Boolean, + parallelism: Int, + onSendError: Throwable => Task[Ack]) + extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable { require(parallelism >= 1, "parallelism >= 1") def createSubscriber( - cb: Callback[Throwable, Unit], - s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = { + cb: Callback[Throwable, Unit], + s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = { val out = new Subscriber[Seq[ProducerRecord[K, V]]] { self => implicit val scheduler: Scheduler = s private[this] val p = producer.memoize @@ -102,10 +102,11 @@ final class KafkaProducerSink[K, V] private ( object KafkaProducerSink extends StrictLogging { - private[this] val onSendErrorDefault = (ex: Throwable) => Task { - logger.error("Unexpected error in KafkaProducerSink", ex) - Continue - } + private[this] val onSendErrorDefault = (ex: Throwable) => + Task { + logger.error("Unexpected error in KafkaProducerSink", ex) + Continue + } /** Builder for [[KafkaProducerSink]]. */ def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( @@ -126,6 +127,9 @@ object KafkaProducerSink extends StrictLogging { apply(producer, parallelism, onSendErrorDefault) /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](producer: Coeval[KafkaProducer[K, V]], parallelism: Int, onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] = + def apply[K, V]( + producer: Coeval[KafkaProducer[K, V]], + parallelism: Int, + onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] = new KafkaProducerSink(producer, shouldTerminate = false, parallelism = parallelism, onSendError) } diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/Serializer.scala b/kafka-0.10.x/src/main/scala/monix/kafka/Serializer.scala index 964ebdf9..ba3644a5 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/Serializer.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/Serializer.scala @@ -49,7 +49,8 @@ object Serializer { implicit def fromKafkaSerializer[A](implicit ser: KafkaSerializer[A]): Serializer[A] = Serializer[A]( className = ser.getClass.getName, - classType = ser.getClass + classType = ser.getClass, + constructor = _ => ser ) /** Alias for the function that provides an instance of diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala b/kafka-0.10.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala new file mode 100644 index 00000000..1e61d9df --- /dev/null +++ b/kafka-0.10.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala @@ -0,0 +1,60 @@ +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** Specifies whether to call `seekToEnd` or `seekToBeginning` when starting + * [[monix.kafka.KafkaConsumerObservable KafkaConsumerObservable]] + * + * Available options: + * + * - [[ObservableSeekOnStart.End]] + * - [[ObservableSeekOnStart.Beginning]] + * - [[ObservableSeekOnStart.NoSeek]] + */ +sealed trait ObservableSeekOnStart extends Serializable { + def id: String + + def isSeekBeginning: Boolean = + this match { + case ObservableSeekOnStart.Beginning => true + case _ => false + } + + def isSeekEnd: Boolean = + this match { + case ObservableSeekOnStart.End => true + case _ => false + } +} + +object ObservableSeekOnStart { + + @throws(classOf[BadValue]) + def apply(id: String): ObservableSeekOnStart = + id match { + case End.id => End + case Beginning.id => Beginning + case NoSeek.id => NoSeek + case _ => + throw new BadValue("kafka.monix.observable.seek.onStart", s"Invalid value: $id") + } + + /** Calls `consumer.seekToEnd()` when starting consumer. + */ + case object End extends ObservableSeekOnStart { + val id = "end" + } + + /** Calls `consumer.seekToBeginning()` when starting consumer. + */ + case object Beginning extends ObservableSeekOnStart { + val id = "beginning" + } + + /** Does not call neither `consumer.seekToEnd()` nor `consumer.seekToBeginning` + * when starting consumer. + */ + case object NoSeek extends ObservableSeekOnStart { + val id = "no-seek" + } +} diff --git a/kafka-0.10.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-0.10.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala index b4ff3a1a..6a03f5b4 100644 --- a/kafka-0.10.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala +++ b/kafka-0.10.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -18,6 +18,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe val commitCallbacks: List[Commit] = List.fill(4)(new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task.unit }) @@ -33,9 +34,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe val partitions = offsets.map(_.topicPartition) val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets) - received.foreach { batch => - partitions should contain allElementsOf batch.offsets.keys - } + received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys } received.size should be <= 4 } @@ -64,9 +63,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe .mergeMap(i => createConsumer(i.toInt, topicName).take(500)) .bufferTumbling(2000) .map(CommittableOffsetBatch.mergeByCommitCallback) - .map { offsetBatches => - assert(offsetBatches.length == 4) - } + .map { offsetBatches => assert(offsetBatches.length == 4) } .completedL Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) diff --git a/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala b/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala index 66022e2b..e160b417 100644 --- a/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala +++ b/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala @@ -127,9 +127,7 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit { val listT = consumer .executeOn(io) .bufferTumbling(count) - .map { messages => - messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) - } + .map { messages => messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) } .mapEval { case (values, batch) => Task.shift *> batch.commitSync().map(_ => values -> batch.offsets) } .headL diff --git a/kafka-0.10.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-0.10.x/src/test/scala/monix/kafka/SerializationTest.scala index acb9d846..e1941115 100644 --- a/kafka-0.10.x/src/test/scala/monix/kafka/SerializationTest.scala +++ b/kafka-0.10.x/src/test/scala/monix/kafka/SerializationTest.scala @@ -127,6 +127,7 @@ class AFailingSerializer extends ASerializer { } class AHalfFailingSerializer extends ASerializer { + override def serialize(topic: String, data: A): Array[Byte] = { if (data.value.toInt % 2 == 0) super.serialize(topic, data) else throw new RuntimeException("fail") diff --git a/kafka-0.11.x/src/main/resources/monix/kafka/default.conf b/kafka-0.11.x/src/main/resources/monix/kafka/default.conf index 045f9a9b..7c9ea91b 100644 --- a/kafka-0.11.x/src/main/resources/monix/kafka/default.conf +++ b/kafka-0.11.x/src/main/resources/monix/kafka/default.conf @@ -72,8 +72,9 @@ kafka { # Number of requests that KafkaProducerSink # can push in parallel monix.producer.sink.parallelism = 100 - # Triggers a seekToEnd when the observable starts - monix.observable.seekEnd.onStart = false + # Triggers either seekToEnd or seektoBeginning when the observable starts + # Possible values: end, beginning, no-seek + monix.observable.seek.onStart = "no-seek" # Possible values: sync, async monix.observable.commit.type = "sync" # Possible values: before-ack, after-ack or no-ack diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala b/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala index 74354265..eb666b7b 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala @@ -33,6 +33,7 @@ private[kafka] object Commit { val empty: Commit = new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task.unit } diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/Deserializer.scala b/kafka-0.11.x/src/main/scala/monix/kafka/Deserializer.scala index 04194f9e..66ddfef4 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/Deserializer.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/Deserializer.scala @@ -49,7 +49,8 @@ object Deserializer { implicit def fromKafkaDeserializer[A](implicit des: KafkaDeserializer[A]): Deserializer[A] = Deserializer[A]( className = des.getClass.getName, - classType = des.getClass + classType = des.getClass, + constructor = _ => des ) /** Alias for the function that provides an instance of diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala index 3faede6b..d2cff138 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -248,7 +248,7 @@ final case class KafkaConsumerConfig( retryBackoffTime: FiniteDuration, observableCommitType: ObservableCommitType, observableCommitOrder: ObservableCommitOrder, - observableSeekToEndOnStart: Boolean, + observableSeekOnStart: ObservableSeekOnStart, properties: Map[String, String]) { def toMap: Map[String, String] = properties ++ Map( @@ -435,7 +435,7 @@ object KafkaConsumerConfig { retryBackoffTime = config.getInt("retry.backoff.ms").millis, observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")), observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")), - observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"), + observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")), properties = Map.empty ) } diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index 26fd9cb9..f2360315 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -64,7 +64,8 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { implicit val s = scheduler val feedTask = consumer.flatMap { c => // Skipping all available messages on all partitions - if (config.observableSeekToEndOnStart) c.seekToEnd(Nil.asJavaCollection) + if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) + else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) // A task to execute on both cancellation and normal termination val onCancel = cancelTask(c) runLoop(c, out).guarantee(onCancel) diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 648a0cfb..53160c6f 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -43,10 +43,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis case class CommitWithConsumer(consumer: KafkaConsumer[K, V]) extends Commit { + override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) => k -> new OffsetAndMetadata(v) }.asJava)))) + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task { blocking(consumer.synchronized(consumer.commitAsync(batch.map { diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index dac7d333..ec57f1df 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -38,17 +38,17 @@ import scala.util.{Failure, Success} * - An error with `Task.raiseError` which will finish the stream with an error. */ final class KafkaProducerSink[K, V] private ( - producer: Coeval[KafkaProducer[K, V]], - shouldTerminate: Boolean, - parallelism: Int, - onSendError: Throwable => Task[Ack]) - extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable { + producer: Coeval[KafkaProducer[K, V]], + shouldTerminate: Boolean, + parallelism: Int, + onSendError: Throwable => Task[Ack]) + extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable { require(parallelism >= 1, "parallelism >= 1") def createSubscriber( - cb: Callback[Throwable, Unit], - s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = { + cb: Callback[Throwable, Unit], + s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = { val out = new Subscriber[Seq[ProducerRecord[K, V]]] { self => implicit val scheduler: Scheduler = s private[this] val p = producer.memoize @@ -102,10 +102,11 @@ final class KafkaProducerSink[K, V] private ( object KafkaProducerSink extends StrictLogging { - private[this] val onSendErrorDefault = (ex: Throwable) => Task { - logger.error("Unexpected error in KafkaProducerSink", ex) - Continue - } + private[this] val onSendErrorDefault = (ex: Throwable) => + Task { + logger.error("Unexpected error in KafkaProducerSink", ex) + Continue + } /** Builder for [[KafkaProducerSink]]. */ def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( @@ -126,6 +127,9 @@ object KafkaProducerSink extends StrictLogging { apply(producer, parallelism, onSendErrorDefault) /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](producer: Coeval[KafkaProducer[K, V]], parallelism: Int, onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] = + def apply[K, V]( + producer: Coeval[KafkaProducer[K, V]], + parallelism: Int, + onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] = new KafkaProducerSink(producer, shouldTerminate = false, parallelism = parallelism, onSendError) } diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/Serializer.scala b/kafka-0.11.x/src/main/scala/monix/kafka/Serializer.scala index 964ebdf9..ba3644a5 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/Serializer.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/Serializer.scala @@ -49,7 +49,8 @@ object Serializer { implicit def fromKafkaSerializer[A](implicit ser: KafkaSerializer[A]): Serializer[A] = Serializer[A]( className = ser.getClass.getName, - classType = ser.getClass + classType = ser.getClass, + constructor = _ => ser ) /** Alias for the function that provides an instance of diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala b/kafka-0.11.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala new file mode 100644 index 00000000..1e61d9df --- /dev/null +++ b/kafka-0.11.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala @@ -0,0 +1,60 @@ +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** Specifies whether to call `seekToEnd` or `seekToBeginning` when starting + * [[monix.kafka.KafkaConsumerObservable KafkaConsumerObservable]] + * + * Available options: + * + * - [[ObservableSeekOnStart.End]] + * - [[ObservableSeekOnStart.Beginning]] + * - [[ObservableSeekOnStart.NoSeek]] + */ +sealed trait ObservableSeekOnStart extends Serializable { + def id: String + + def isSeekBeginning: Boolean = + this match { + case ObservableSeekOnStart.Beginning => true + case _ => false + } + + def isSeekEnd: Boolean = + this match { + case ObservableSeekOnStart.End => true + case _ => false + } +} + +object ObservableSeekOnStart { + + @throws(classOf[BadValue]) + def apply(id: String): ObservableSeekOnStart = + id match { + case End.id => End + case Beginning.id => Beginning + case NoSeek.id => NoSeek + case _ => + throw new BadValue("kafka.monix.observable.seek.onStart", s"Invalid value: $id") + } + + /** Calls `consumer.seekToEnd()` when starting consumer. + */ + case object End extends ObservableSeekOnStart { + val id = "end" + } + + /** Calls `consumer.seekToBeginning()` when starting consumer. + */ + case object Beginning extends ObservableSeekOnStart { + val id = "beginning" + } + + /** Does not call neither `consumer.seekToEnd()` nor `consumer.seekToBeginning` + * when starting consumer. + */ + case object NoSeek extends ObservableSeekOnStart { + val id = "no-seek" + } +} diff --git a/kafka-0.11.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-0.11.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala index b4ff3a1a..6a03f5b4 100644 --- a/kafka-0.11.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala +++ b/kafka-0.11.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -18,6 +18,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe val commitCallbacks: List[Commit] = List.fill(4)(new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task.unit }) @@ -33,9 +34,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe val partitions = offsets.map(_.topicPartition) val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets) - received.foreach { batch => - partitions should contain allElementsOf batch.offsets.keys - } + received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys } received.size should be <= 4 } @@ -64,9 +63,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe .mergeMap(i => createConsumer(i.toInt, topicName).take(500)) .bufferTumbling(2000) .map(CommittableOffsetBatch.mergeByCommitCallback) - .map { offsetBatches => - assert(offsetBatches.length == 4) - } + .map { offsetBatches => assert(offsetBatches.length == 4) } .completedL Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) diff --git a/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala b/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala index 6e2e2359..ec5ee286 100644 --- a/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala +++ b/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala @@ -124,9 +124,7 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit { val listT = consumer .executeOn(io) .bufferTumbling(count) - .map { messages => - messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) - } + .map { messages => messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) } .mapEval { case (values, batch) => Task.shift *> batch.commitSync().map(_ => values -> batch.offsets) } .headL diff --git a/kafka-0.11.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-0.11.x/src/test/scala/monix/kafka/SerializationTest.scala index acb9d846..e1941115 100644 --- a/kafka-0.11.x/src/test/scala/monix/kafka/SerializationTest.scala +++ b/kafka-0.11.x/src/test/scala/monix/kafka/SerializationTest.scala @@ -127,6 +127,7 @@ class AFailingSerializer extends ASerializer { } class AHalfFailingSerializer extends ASerializer { + override def serialize(topic: String, data: A): Array[Byte] = { if (data.value.toInt % 2 == 0) super.serialize(topic, data) else throw new RuntimeException("fail") diff --git a/kafka-0.9.x/src/main/resources/monix/kafka/default.conf b/kafka-0.9.x/src/main/resources/monix/kafka/default.conf index b264eed4..b995b27d 100644 --- a/kafka-0.9.x/src/main/resources/monix/kafka/default.conf +++ b/kafka-0.9.x/src/main/resources/monix/kafka/default.conf @@ -65,8 +65,9 @@ kafka { # Number of requests that KafkaProducerSink # can push in parallel monix.producer.sink.parallelism = 100 - # Triggers a seekToEnd when the observable starts - monix.observable.seekEnd.onStart = false + # Triggers either seekToEnd or seektoBeginning when the observable starts + # Possible values: end, beginning, no-seek + monix.observable.seek.onStart = "no-seek" # Possible values: sync, async monix.observable.commit.type = "sync" # Possible values: before-ack, after-ack or no-ack diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala b/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala index 74354265..eb666b7b 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala @@ -33,6 +33,7 @@ private[kafka] object Commit { val empty: Commit = new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task.unit } diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/Deserializer.scala b/kafka-0.9.x/src/main/scala/monix/kafka/Deserializer.scala index 32a202fb..0d0b150d 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/Deserializer.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/Deserializer.scala @@ -47,7 +47,8 @@ object Deserializer { implicit def fromKafkaDeserializer[A](implicit des: KafkaDeserializer[A]): Deserializer[A] = Deserializer[A]( className = des.getClass.getName, - classType = des.getClass + classType = des.getClass, + constructor = _ => des ) /** Alias for the function that provides an instance of diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala index ff080101..6ad52e7f 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -237,7 +237,7 @@ final case class KafkaConsumerConfig( retryBackoffTime: FiniteDuration, observableCommitType: ObservableCommitType, observableCommitOrder: ObservableCommitOrder, - observableSeekToEndOnStart: Boolean, + observableSeekOnStart: ObservableSeekOnStart, properties: Map[String, String]) { def toMap: Map[String, String] = properties ++ Map( @@ -418,7 +418,7 @@ object KafkaConsumerConfig { retryBackoffTime = config.getInt("retry.backoff.ms").millis, observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")), observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")), - observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"), + observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")), properties = Map.empty ) } diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index c65d38a3..06cb3ac7 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -61,7 +61,8 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { implicit val s = scheduler val feedTask = consumer.flatMap { c => // Skipping all available messages on all partitions - if (config.observableSeekToEndOnStart) c.seekToEnd() + if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd() + else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning() // A task to execute on both cancellation and normal termination val onCancel = cancelTask(c) runLoop(c, out).guarantee(onCancel) diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 648a0cfb..53160c6f 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -43,10 +43,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis case class CommitWithConsumer(consumer: KafkaConsumer[K, V]) extends Commit { + override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) => k -> new OffsetAndMetadata(v) }.asJava)))) + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task { blocking(consumer.synchronized(consumer.commitAsync(batch.map { diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index dac7d333..ec57f1df 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -38,17 +38,17 @@ import scala.util.{Failure, Success} * - An error with `Task.raiseError` which will finish the stream with an error. */ final class KafkaProducerSink[K, V] private ( - producer: Coeval[KafkaProducer[K, V]], - shouldTerminate: Boolean, - parallelism: Int, - onSendError: Throwable => Task[Ack]) - extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable { + producer: Coeval[KafkaProducer[K, V]], + shouldTerminate: Boolean, + parallelism: Int, + onSendError: Throwable => Task[Ack]) + extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable { require(parallelism >= 1, "parallelism >= 1") def createSubscriber( - cb: Callback[Throwable, Unit], - s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = { + cb: Callback[Throwable, Unit], + s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = { val out = new Subscriber[Seq[ProducerRecord[K, V]]] { self => implicit val scheduler: Scheduler = s private[this] val p = producer.memoize @@ -102,10 +102,11 @@ final class KafkaProducerSink[K, V] private ( object KafkaProducerSink extends StrictLogging { - private[this] val onSendErrorDefault = (ex: Throwable) => Task { - logger.error("Unexpected error in KafkaProducerSink", ex) - Continue - } + private[this] val onSendErrorDefault = (ex: Throwable) => + Task { + logger.error("Unexpected error in KafkaProducerSink", ex) + Continue + } /** Builder for [[KafkaProducerSink]]. */ def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( @@ -126,6 +127,9 @@ object KafkaProducerSink extends StrictLogging { apply(producer, parallelism, onSendErrorDefault) /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](producer: Coeval[KafkaProducer[K, V]], parallelism: Int, onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] = + def apply[K, V]( + producer: Coeval[KafkaProducer[K, V]], + parallelism: Int, + onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] = new KafkaProducerSink(producer, shouldTerminate = false, parallelism = parallelism, onSendError) } diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/Serializer.scala b/kafka-0.9.x/src/main/scala/monix/kafka/Serializer.scala index 21bc4892..cc103a63 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/Serializer.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/Serializer.scala @@ -47,7 +47,8 @@ object Serializer { implicit def fromKafkaSerializer[A](implicit ser: KafkaSerializer[A]): Serializer[A] = Serializer[A]( className = ser.getClass.getName, - classType = ser.getClass + classType = ser.getClass, + constructor = _ => ser ) /** Alias for the function that provides an instance of diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala b/kafka-0.9.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala new file mode 100644 index 00000000..1e61d9df --- /dev/null +++ b/kafka-0.9.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala @@ -0,0 +1,60 @@ +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** Specifies whether to call `seekToEnd` or `seekToBeginning` when starting + * [[monix.kafka.KafkaConsumerObservable KafkaConsumerObservable]] + * + * Available options: + * + * - [[ObservableSeekOnStart.End]] + * - [[ObservableSeekOnStart.Beginning]] + * - [[ObservableSeekOnStart.NoSeek]] + */ +sealed trait ObservableSeekOnStart extends Serializable { + def id: String + + def isSeekBeginning: Boolean = + this match { + case ObservableSeekOnStart.Beginning => true + case _ => false + } + + def isSeekEnd: Boolean = + this match { + case ObservableSeekOnStart.End => true + case _ => false + } +} + +object ObservableSeekOnStart { + + @throws(classOf[BadValue]) + def apply(id: String): ObservableSeekOnStart = + id match { + case End.id => End + case Beginning.id => Beginning + case NoSeek.id => NoSeek + case _ => + throw new BadValue("kafka.monix.observable.seek.onStart", s"Invalid value: $id") + } + + /** Calls `consumer.seekToEnd()` when starting consumer. + */ + case object End extends ObservableSeekOnStart { + val id = "end" + } + + /** Calls `consumer.seekToBeginning()` when starting consumer. + */ + case object Beginning extends ObservableSeekOnStart { + val id = "beginning" + } + + /** Does not call neither `consumer.seekToEnd()` nor `consumer.seekToBeginning` + * when starting consumer. + */ + case object NoSeek extends ObservableSeekOnStart { + val id = "no-seek" + } +} diff --git a/kafka-0.9.x/src/test/scala/monix/kafka/MonixKafkaTest.scala b/kafka-0.9.x/src/test/scala/monix/kafka/MonixKafkaTest.scala index 6e4de986..0571e4ec 100644 --- a/kafka-0.9.x/src/test/scala/monix/kafka/MonixKafkaTest.scala +++ b/kafka-0.9.x/src/test/scala/monix/kafka/MonixKafkaTest.scala @@ -117,9 +117,7 @@ class MonixKafkaTest extends FunSuite { val listT = consumer .executeOn(io) .bufferTumbling(count) - .map { messages => - messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) - } + .map { messages => messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) } .mapEval { case (values, batch) => Task.shift *> batch.commitSync().map(_ => values -> batch.offsets) } .headL @@ -184,9 +182,7 @@ class MonixKafkaTest extends FunSuite { .merge .bufferTumbling(count) .map(CommittableOffsetBatch.mergeByCommitCallback) - .map { offsetBatches => - assert(offsetBatches.length == 2) - } + .map { offsetBatches => assert(offsetBatches.length == 2) } .headL Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) } diff --git a/kafka-1.0.x/src/main/resources/monix/kafka/default.conf b/kafka-1.0.x/src/main/resources/monix/kafka/default.conf index e0831583..91aefa9c 100644 --- a/kafka-1.0.x/src/main/resources/monix/kafka/default.conf +++ b/kafka-1.0.x/src/main/resources/monix/kafka/default.conf @@ -71,8 +71,9 @@ kafka { # Number of requests that KafkaProducerSink # can push in parallel monix.producer.sink.parallelism = 100 - # Triggers a seekToEnd when the observable starts - monix.observable.seekEnd.onStart = false + # Triggers either seekToEnd or seektoBeginning when the observable starts + # Possible values: end, beginning, no-seek + monix.observable.seek.onStart = "no-seek" # Possible values: sync, async monix.observable.commit.type = "sync" # Possible values: before-ack, after-ack or no-ack diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala index 74354265..eb666b7b 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala @@ -33,6 +33,7 @@ private[kafka] object Commit { val empty: Commit = new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task.unit } diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/Deserializer.scala b/kafka-1.0.x/src/main/scala/monix/kafka/Deserializer.scala index 04194f9e..66ddfef4 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/Deserializer.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/Deserializer.scala @@ -49,7 +49,8 @@ object Deserializer { implicit def fromKafkaDeserializer[A](implicit des: KafkaDeserializer[A]): Deserializer[A] = Deserializer[A]( className = des.getClass.getName, - classType = des.getClass + classType = des.getClass, + constructor = _ => des ) /** Alias for the function that provides an instance of diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala index 4c69bedb..3b4d1746 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -254,7 +254,7 @@ final case class KafkaConsumerConfig( retryBackoffTime: FiniteDuration, observableCommitType: ObservableCommitType, observableCommitOrder: ObservableCommitOrder, - observableSeekToEndOnStart: Boolean, + observableSeekOnStart: ObservableSeekOnStart, properties: Map[String, String]) { def toMap: Map[String, String] = properties ++ Map( @@ -443,7 +443,7 @@ object KafkaConsumerConfig { retryBackoffTime = config.getInt("retry.backoff.ms").millis, observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")), observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")), - observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"), + observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")), properties = Map.empty ) } diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index 6429f384..7392a2fe 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -63,7 +63,8 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { implicit val s = scheduler val feedTask = consumer.flatMap { c => // Skipping all available messages on all partitions - if (config.observableSeekToEndOnStart) c.seekToEnd(Nil.asJavaCollection) + if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) + else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) // A task to execute on both cancellation and normal termination val onCancel = cancelTask(c) runLoop(c, out).guarantee(onCancel) diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 648a0cfb..53160c6f 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -43,10 +43,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis case class CommitWithConsumer(consumer: KafkaConsumer[K, V]) extends Commit { + override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) => k -> new OffsetAndMetadata(v) }.asJava)))) + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task { blocking(consumer.synchronized(consumer.commitAsync(batch.map { diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index cadec84f..ec57f1df 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -62,7 +62,7 @@ final class KafkaProducerSink[K, V] private ( if (parallelism == 1) Task.traverse(list)(p.value().send(_)) else { - Task.wanderN(parallelism)(list)(r => p.value().send(r)) + Task.wanderN(parallelism)(list)(r => p.value().send(r)) } val recovered = sendTask.redeemWith( @@ -102,10 +102,11 @@ final class KafkaProducerSink[K, V] private ( object KafkaProducerSink extends StrictLogging { - private[this] val onSendErrorDefault = (ex: Throwable) => Task { - logger.error("Unexpected error in KafkaProducerSink", ex) - Continue - } + private[this] val onSendErrorDefault = (ex: Throwable) => + Task { + logger.error("Unexpected error in KafkaProducerSink", ex) + Continue + } /** Builder for [[KafkaProducerSink]]. */ def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( @@ -126,6 +127,9 @@ object KafkaProducerSink extends StrictLogging { apply(producer, parallelism, onSendErrorDefault) /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](producer: Coeval[KafkaProducer[K, V]], parallelism: Int, onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] = + def apply[K, V]( + producer: Coeval[KafkaProducer[K, V]], + parallelism: Int, + onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] = new KafkaProducerSink(producer, shouldTerminate = false, parallelism = parallelism, onSendError) } diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/Serializer.scala b/kafka-1.0.x/src/main/scala/monix/kafka/Serializer.scala index 964ebdf9..ba3644a5 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/Serializer.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/Serializer.scala @@ -49,7 +49,8 @@ object Serializer { implicit def fromKafkaSerializer[A](implicit ser: KafkaSerializer[A]): Serializer[A] = Serializer[A]( className = ser.getClass.getName, - classType = ser.getClass + classType = ser.getClass, + constructor = _ => ser ) /** Alias for the function that provides an instance of diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala b/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala new file mode 100644 index 00000000..1e61d9df --- /dev/null +++ b/kafka-1.0.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala @@ -0,0 +1,60 @@ +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** Specifies whether to call `seekToEnd` or `seekToBeginning` when starting + * [[monix.kafka.KafkaConsumerObservable KafkaConsumerObservable]] + * + * Available options: + * + * - [[ObservableSeekOnStart.End]] + * - [[ObservableSeekOnStart.Beginning]] + * - [[ObservableSeekOnStart.NoSeek]] + */ +sealed trait ObservableSeekOnStart extends Serializable { + def id: String + + def isSeekBeginning: Boolean = + this match { + case ObservableSeekOnStart.Beginning => true + case _ => false + } + + def isSeekEnd: Boolean = + this match { + case ObservableSeekOnStart.End => true + case _ => false + } +} + +object ObservableSeekOnStart { + + @throws(classOf[BadValue]) + def apply(id: String): ObservableSeekOnStart = + id match { + case End.id => End + case Beginning.id => Beginning + case NoSeek.id => NoSeek + case _ => + throw new BadValue("kafka.monix.observable.seek.onStart", s"Invalid value: $id") + } + + /** Calls `consumer.seekToEnd()` when starting consumer. + */ + case object End extends ObservableSeekOnStart { + val id = "end" + } + + /** Calls `consumer.seekToBeginning()` when starting consumer. + */ + case object Beginning extends ObservableSeekOnStart { + val id = "beginning" + } + + /** Does not call neither `consumer.seekToEnd()` nor `consumer.seekToBeginning` + * when starting consumer. + */ + case object NoSeek extends ObservableSeekOnStart { + val id = "no-seek" + } +} diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala index b4ff3a1a..6a03f5b4 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -18,6 +18,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe val commitCallbacks: List[Commit] = List.fill(4)(new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task.unit }) @@ -33,9 +34,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe val partitions = offsets.map(_.topicPartition) val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets) - received.foreach { batch => - partitions should contain allElementsOf batch.offsets.keys - } + received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys } received.size should be <= 4 } @@ -64,9 +63,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe .mergeMap(i => createConsumer(i.toInt, topicName).take(500)) .bufferTumbling(2000) .map(CommittableOffsetBatch.mergeByCommitCallback) - .map { offsetBatches => - assert(offsetBatches.length == 4) - } + .map { offsetBatches => assert(offsetBatches.length == 4) } .completedL Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala index 0d869625..8dea38d0 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala @@ -126,9 +126,7 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit { val listT = consumer .executeOn(io) .bufferTumbling(count) - .map { messages => - messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) - } + .map { messages => messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) } .mapEval { case (values, batch) => Task.shift *> batch.commitSync().map(_ => values -> batch.offsets) } .headL diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala index acb9d846..e1941115 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala @@ -127,6 +127,7 @@ class AFailingSerializer extends ASerializer { } class AHalfFailingSerializer extends ASerializer { + override def serialize(topic: String, data: A): Array[Byte] = { if (data.value.toInt % 2 == 0) super.serialize(topic, data) else throw new RuntimeException("fail")