diff --git a/README.md b/README.md index f30b3e26..ed06fd58 100644 --- a/README.md +++ b/README.md @@ -18,32 +18,53 @@ Add the following to your `build.sbt`: libraryDependencies += "uk.sky" %% "kafka-topic-loader" % "" ``` +## Examples + +### Simple load + ```scala +import akka.actor.ActorSystem +import akka.stream.scaladsl.Sink +import cats.data.NonEmptyList +import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer} import uk.sky.kafka.topicloader.{LoadAll, TopicLoader} -import org.apache.kafka.common.serialization.Deserializer} -implicit val as: ActorSystem = ActorSystem() -implicit val stringDeserializer: Deserializer[String] = new StringDeserializer +implicit val system: ActorSystem = ActorSystem() +implicit val stringDeserializer: Deserializer[String] = new StringDeserializer() -val stream = TopicLoader.load[String, String](NonEmptyList.one("topic-to-load"), LoadAll) - .mapAsync(1)(_ => ??? /* store records in akka.Actor for example */) - .runWith(Sink.ignore) +val stream = TopicLoader + .load[String, String](NonEmptyList.one("topic-to-load"), LoadAll) + .mapAsync(1)(_ => ??? /* store records in akka.Actor for example */ ) + .runWith(Sink.ignore) ``` +### Load and run + `loadAndRun` will load the topics, complete the `Future[Done]` from the materialised value and then carry on running, emitting any new records that appear on the topics. An example use-case for this is a REST API that holds the contents of a Kafka topic in memory. This kind of application doesn't need to commit offsets and can use the `Future[Done]` to determine readiness. ```scala +import akka.Done +import akka.actor.ActorSystem +import akka.kafka.scaladsl.Consumer +import akka.stream.scaladsl.Sink +import cats.data.NonEmptyList +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer, StringDeserializer} +import uk.sky.kafka.topicloader.TopicLoader + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import scala.concurrent.Future + object Main extends App { - implicit val system = ActorSystem() - implicit val mat = ActorMaterializer() + implicit val system: ActorSystem = ActorSystem() import system.dispatcher - implicit val keyDeserializer: Deserializer[String] = new StringDeserializer - implicit val valueDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer + implicit val keyDeserializer: Deserializer[String] = new StringDeserializer() + implicit val valueDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer() val state = new SimplifiedState @@ -54,22 +75,52 @@ object Main extends App { .run() initialLoadingFuture.foreach(_ => state.isAppReady.set(true)) -} -class SimplifiedState { + class SimplifiedState { - /** - * API requests may query this state - */ - val store = new ConcurrentHashMap[String, Array[Byte]]() + /** API requests may query this state + */ + val store = new ConcurrentHashMap[String, Array[Byte]]() - /** - * A readiness endpoint could be created that queries this - */ - val isAppReady = new AtomicBoolean() + /** A readiness endpoint could be created that queries this + */ + val isAppReady = new AtomicBoolean() + } } ``` +### Partitioned load + +Data can also be loaded from specific partitions using `partitionedLoad`. By loading from specific partitions the topic +loader can be used by multiple application instances with separate streams per set of partitions (see [Alpakka kafka](https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition) and below). + +```scala +import akka.actor.ActorSystem +import akka.kafka.scaladsl.Consumer +import akka.kafka.{ConsumerSettings, Subscriptions} +import akka.stream.scaladsl.Source +import cats.data.NonEmptyList +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.{Deserializer, LongDeserializer, StringDeserializer} +import uk.sky.kafka.topicloader.{LoadAll, TopicLoader} + +implicit val system: ActorSystem = ActorSystem() + +implicit val keyDeserializer: Deserializer[String] = new StringDeserializer() +implicit val longDeserializer: LongDeserializer = new LongDeserializer() + +val consumerSettings: ConsumerSettings[String, Long] = ??? + +val stream: Source[ConsumerRecord[String, Long], Consumer.Control] = + Consumer + .plainPartitionedSource(consumerSettings, Subscriptions.topics("topic-to-load")) + .flatMapConcat { case (topicPartition, source) => + TopicLoader + .partitionedLoad[String, java.lang.Long](NonEmptyList.one(topicPartition), LoadAll) + .flatMapConcat(_ => source) + } +``` + ## Configuring your consumer group.id You should configure the `akka.kafka.consumer.kafka-clients.group.id` to match that of your application. @@ -87,27 +138,3 @@ akka.kafka { } } ``` - -## Source per partition - -This is deprecated in favour of a new API for partitioned loading which is coming soon. - -Data can also be loaded from specific partitions using `fromPartitions`. By loading from specific partitions the topic -loader can be used by multiple application instances with separate streams per set of partitions (see [Alpakka kafka](https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition) and below). - -```scala -implicit val system = ActorSystem() - -val consumerSettings: ConsumerSettings[String, Long] = ??? -val doBusinessLogic: ConsumerRecord[String, Long] => Future[Unit] = ??? - -val stream: Source[ConsumerMessage.CommittableMessage[String, Long], Consumer.Control] = - Consumer - .committablePartitionedSource(consumerSettings, Subscriptions.topics("topic-to-load")) - .flatMapConcat { - case (topicPartition, source) => - TopicLoader - .fromPartitions(LoadAll, NonEmptyList.one(topicPartition), doBusinessLogic, new LongDeserializer()) - .flatMapConcat(_ => source) - } -``` diff --git a/build.sbt b/build.sbt index d4febde7..e39de0ee 100644 --- a/build.sbt +++ b/build.sbt @@ -50,6 +50,8 @@ Test / fork := true Global / onChangedBuildSource := ReloadOnSourceChanges +Compile / doc / scalacOptions += "-no-link-warnings" + libraryDependencies ++= Dependencies.all excludeDependencies ++= { diff --git a/src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala b/src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala index 84bf6f40..cfd9e61a 100644 --- a/src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala +++ b/src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala @@ -1,7 +1,7 @@ package uk.sky.kafka.topicloader -import java.lang.{Long => JLong} -import java.util.{List => JList, Map => JMap, Optional} +import java.lang.Long as JLong +import java.util.{List as JList, Map as JMap, Optional} import akka.Done import akka.actor.ActorSystem @@ -10,22 +10,24 @@ import akka.kafka.{ConsumerSettings, Subscriptions} import akka.stream.OverflowStrategy import akka.stream.scaladsl.{Flow, Keep, Source} import cats.data.NonEmptyList -import cats.syntax.bifunctor._ -import cats.syntax.option._ -import cats.syntax.show._ +import cats.syntax.bifunctor.* +import cats.syntax.option.* +import cats.syntax.show.* import cats.{Bifunctor, Show} import com.typesafe.scalalogging.LazyLogging -import org.apache.kafka.clients.consumer._ +import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.serialization._ +import org.apache.kafka.common.serialization.* import uk.sky.kafka.topicloader.config.{Config, TopicLoaderConfig} import scala.concurrent.Future -import scala.jdk.CollectionConverters._ +import scala.jdk.CollectionConverters.* object TopicLoader extends TopicLoader { private[topicloader] case class LogOffsets(lowest: Long, highest: Long) + private type MaybeConsumerSettings = Option[ConsumerSettings[Array[Byte], Array[Byte]]] + private case class HighestOffsetsWithRecord[K, V]( partitionOffsets: Map[TopicPartition, Long], consumerRecord: Option[ConsumerRecord[K, V]] = none[ConsumerRecord[K, V]] @@ -65,7 +67,7 @@ object TopicLoader extends TopicLoader { trait TopicLoader extends LazyLogging { - import TopicLoader._ + import TopicLoader.* /** Source that loads the specified topics from the beginning and completes when the offsets reach the point specified * by the requested strategy. Materializes to a Future[Consumer.Control] where the Future represents the retrieval of @@ -74,7 +76,7 @@ trait TopicLoader extends LazyLogging { def load[K : Deserializer, V : Deserializer]( topics: NonEmptyList[String], strategy: LoadTopicStrategy, - maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None + maybeConsumerSettings: MaybeConsumerSettings = None )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Future[Consumer.Control]] = { val config = Config @@ -88,10 +90,47 @@ trait TopicLoader extends LazyLogging { */ def loadAndRun[K : Deserializer, V : Deserializer]( topics: NonEmptyList[String], - maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None + maybeConsumerSettings: MaybeConsumerSettings = None + )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = { + val config = Config.loadOrThrow(system.settings.config).topicLoader + val logOffsetsF = logOffsetsForTopics(topics, LoadAll) + loadLogOffsetsAndRun[K, V](logOffsetsF, config, maybeConsumerSettings) + } + + /** Same as [[TopicLoader.load]], but for a specified partitions. See + * [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition Akka's source per partition]] + * for how to get a partition assignment from Kafka. + */ + def partitionedLoad[K : Deserializer, V : Deserializer]( + partitions: NonEmptyList[TopicPartition], + strategy: LoadTopicStrategy, + maybeConsumerSettings: MaybeConsumerSettings = None + )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Future[Consumer.Control]] = { + val config = + Config + .loadOrThrow(system.settings.config) + .topicLoader + load(logOffsetsForPartitions(partitions, strategy), config, maybeConsumerSettings) + } + + /** Same as [[TopicLoader.loadAndRun]], but for a specified partitions. See + * [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition Akka's source per partition]] + * for how to get a partition assignment from Kafka. + */ + def partitionedLoadAndRun[K : Deserializer, V : Deserializer]( + partitions: NonEmptyList[TopicPartition], + maybeConsumerSettings: MaybeConsumerSettings = None + )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = { + val config = Config.loadOrThrow(system.settings.config).topicLoader + val logOffsetsF = logOffsetsForPartitions(partitions, LoadAll) + loadLogOffsetsAndRun[K, V](logOffsetsF, config, maybeConsumerSettings) + } + + protected def loadLogOffsetsAndRun[K : Deserializer, V : Deserializer]( + logOffsetsF: Future[Map[TopicPartition, LogOffsets]], + config: TopicLoaderConfig, + maybeConsumerSettings: MaybeConsumerSettings = None )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = { - val config = Config.loadOrThrow(system.settings.config).topicLoader - val logOffsetsF = logOffsetsForTopics(topics, LoadAll) val postLoadingSource = Source.futureSource(logOffsetsF.map { logOffsets => val highestOffsets = logOffsets.map { case (p, o) => p -> o.highest } kafkaSource[K, V](highestOffsets, config, maybeConsumerSettings) @@ -151,7 +190,7 @@ trait TopicLoader extends LazyLogging { protected def load[K : Deserializer, V : Deserializer]( logOffsets: Future[Map[TopicPartition, LogOffsets]], config: TopicLoaderConfig, - maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] + maybeConsumerSettings: MaybeConsumerSettings )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Future[Consumer.Control]] = { def topicDataSource(offsets: Map[TopicPartition, LogOffsets]): Source[ConsumerRecord[K, V], Consumer.Control] = { @@ -192,7 +231,7 @@ trait TopicLoader extends LazyLogging { private def kafkaSource[K : Deserializer, V : Deserializer]( startingOffsets: Map[TopicPartition, Long], config: TopicLoaderConfig, - maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] + maybeConsumerSettings: MaybeConsumerSettings )(implicit system: ActorSystem) = Consumer .plainSource(consumerSettings(maybeConsumerSettings), Subscriptions.assignmentWithOffset(startingOffsets)) @@ -200,7 +239,7 @@ trait TopicLoader extends LazyLogging { .map(cr => cr.bimap(_.deserialize[K](cr.topic), _.deserialize[V](cr.topic))) def consumerSettings( - maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] + maybeConsumerSettings: MaybeConsumerSettings )(implicit system: ActorSystem): ConsumerSettings[Array[Byte], Array[Byte]] = maybeConsumerSettings.getOrElse( ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer) diff --git a/src/main/scala/uk/sky/kafka/topicloader/config/config.scala b/src/main/scala/uk/sky/kafka/topicloader/config/config.scala index 13f48d14..ed372c13 100644 --- a/src/main/scala/uk/sky/kafka/topicloader/config/config.scala +++ b/src/main/scala/uk/sky/kafka/topicloader/config/config.scala @@ -1,7 +1,7 @@ package uk.sky.kafka.topicloader import cats.data.ValidatedNec -import cats.implicits._ +import cats.implicits.* import com.typesafe.config.ConfigException import scala.util.Try diff --git a/src/main/scala/uk/sky/kafka/topicloader/config/topicLoaderConfig.scala b/src/main/scala/uk/sky/kafka/topicloader/config/topicLoaderConfig.scala index 68d179de..fed0e0bc 100644 --- a/src/main/scala/uk/sky/kafka/topicloader/config/topicLoaderConfig.scala +++ b/src/main/scala/uk/sky/kafka/topicloader/config/topicLoaderConfig.scala @@ -3,8 +3,8 @@ package uk.sky.kafka.topicloader.config import java.util.concurrent.TimeUnit import cats.data.{Validated, ValidatedNec} -import cats.implicits._ -import com.typesafe.config.{Config => TypesafeConfig, ConfigException} +import cats.implicits.* +import com.typesafe.config.{Config as TypesafeConfig, ConfigException} import scala.concurrent.duration.FiniteDuration import scala.util.Try diff --git a/src/test/scala/base/IntegrationSpecBase.scala b/src/test/scala/base/IntegrationSpecBase.scala index 5f445388..4cf4443f 100644 --- a/src/test/scala/base/IntegrationSpecBase.scala +++ b/src/test/scala/base/IntegrationSpecBase.scala @@ -7,13 +7,13 @@ import akka.actor.ActorSystem import akka.kafka.ConsumerSettings import akka.util.Timeout import cats.data.NonEmptyList -import cats.syntax.option._ +import cats.syntax.option.* import com.typesafe.config.ConfigFactory import io.github.embeddedkafka.Codecs.{stringDeserializer, stringSerializer} import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, ConsumerRecords} -import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.scalatest.Assertion import org.scalatest.concurrent.Eventually @@ -21,7 +21,7 @@ import utils.RandomPort import scala.annotation.tailrec import scala.concurrent.duration.DurationInt -import scala.jdk.CollectionConverters._ +import scala.jdk.CollectionConverters.* abstract class IntegrationSpecBase extends WordSpecBase with Eventually { @@ -96,6 +96,11 @@ abstract class IntegrationSpecBase extends WordSpecBase with Eventually { publishToKafka(topic, messages) publishToKafka(topic, filler) } + + def publishToKafka(topic: String, partition: Int, messages: Seq[(String, String)]): Unit = + messages.foreach { case (k, v) => + publishToKafka(new ProducerRecord[String, String](topic, partition, k, v)) + } } trait KafkaConsumer { this: TestContext => diff --git a/src/test/scala/integration/TopicLoaderIntSpec.scala b/src/test/scala/integration/TopicLoaderIntSpec.scala index c809b00f..27ab7b27 100644 --- a/src/test/scala/integration/TopicLoaderIntSpec.scala +++ b/src/test/scala/integration/TopicLoaderIntSpec.scala @@ -1,6 +1,6 @@ package integration -import java.util.concurrent.{TimeoutException => JavaTimeoutException} +import java.util.concurrent.TimeoutException as JavaTimeoutException import akka.actor.ActorSystem import akka.kafka.ConsumerSettings @@ -8,19 +8,20 @@ import akka.stream.scaladsl.{Keep, Sink} import akka.stream.testkit.scaladsl.TestSink import base.IntegrationSpecBase import cats.data.NonEmptyList -import cats.syntax.option._ +import cats.syntax.option.* import com.typesafe.config.{ConfigException, ConfigFactory} import io.github.embeddedkafka.Codecs.{stringDeserializer, stringSerializer} -import org.apache.kafka.common.errors.{TimeoutException => KafkaTimeoutException} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.TimeoutException as KafkaTimeoutException import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.scalatest.prop.TableDrivenPropertyChecks._ +import org.scalatest.prop.TableDrivenPropertyChecks.* import org.scalatest.prop.Tables.Table import uk.sky.kafka.topicloader.TopicLoader.consumerSettings -import uk.sky.kafka.topicloader._ +import uk.sky.kafka.topicloader.* import uk.sky.kafka.topicloader.config.Config import scala.concurrent.Future -import scala.concurrent.duration._ +import scala.concurrent.duration.* class TopicLoaderIntSpec extends IntegrationSpecBase { @@ -248,6 +249,82 @@ class TopicLoaderIntSpec extends IntegrationSpecBase { } } + "partitionedLoad" should { + + val strategy = LoadAll + + "stream records from all partitions" in new TestContext { + val (forPart1, forPart2) = records(1 to 15).splitAt(10) + val topicPart1 = new TopicPartition(testTopic1, 1) + val topicPart2 = new TopicPartition(testTopic1, 2) + val partitions = NonEmptyList.of(topicPart1, topicPart2) + + withRunningKafka { + createCustomTopic(testTopic1, partitions = 5) + publishToKafka(testTopic1, 1, forPart1) + publishToKafka(testTopic1, 2, forPart2) + + val loadedRecords = + TopicLoader.partitionedLoad[String, String](partitions, strategy).runWith(Sink.seq).futureValue + loadedRecords.map(recordToTuple) should contain theSameElementsAs (forPart1 ++ forPart2) + } + + } + + "stream records only from required partitions" in new TestContext { + val (forPart1, forPart2) = records(1 to 15).splitAt(10) + val topicPart1 = new TopicPartition(testTopic1, 1) + val topicPart2 = new TopicPartition(testTopic1, 2) + + withRunningKafka { + createCustomTopic(testTopic1, partitions = 5) + publishToKafka(testTopic1, 1, forPart1) + publishToKafka(testTopic1, 2, forPart2) + + val loadedForPart1 = + TopicLoader + .partitionedLoad[String, String](NonEmptyList.one(topicPart1), strategy) + .runWith(Sink.seq) + .futureValue + loadedForPart1.map(recordToTuple) should contain theSameElementsAs forPart1 + + val loadedForPart2 = + TopicLoader + .partitionedLoad[String, String](NonEmptyList.one(topicPart2), strategy) + .runWith(Sink.seq) + .futureValue + loadedForPart2.map(recordToTuple) should contain theSameElementsAs forPart2 + } + } + } + + "partitionedLoadAndRun" should { + + "execute callback when finished loading a partition and keep streaming" in new TestContext { + val (preLoad, postLoad) = records(1 to 15).splitAt(10) + val topicPartition = new TopicPartition(testTopic1, 1) + + withRunningKafka { + createCustomTopic(testTopic1, partitions = 5) + publishToKafka(testTopic1, 1, preLoad) + + val ((callback, _), recordsProbe) = + TopicLoader + .partitionedLoadAndRun[String, String](NonEmptyList.one(topicPartition)) + .toMat(TestSink.probe)(Keep.both) + .run() + + recordsProbe.request(preLoad.size.toLong + postLoad.size.toLong) + recordsProbe.expectNextN(preLoad.size.toLong).map(recordToTuple) shouldBe preLoad + + whenReady(callback) { _ => + publishToKafka(testTopic1, 1, postLoad) + recordsProbe.expectNextN(postLoad.size.toLong).map(recordToTuple) shouldBe postLoad + } + } + } + } + "consumerSettings" should { implicit val system: ActorSystem = ActorSystem("test")