Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Partitioned load #130

Closed
wants to merge 17 commits into from
115 changes: 71 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,53 @@ Add the following to your `build.sbt`:
libraryDependencies += "uk.sky" %% "kafka-topic-loader" % "<version>"
```

## Examples

### Simple load

```scala
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import cats.data.NonEmptyList
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import uk.sky.kafka.topicloader.{LoadAll, TopicLoader}
import org.apache.kafka.common.serialization.Deserializer}

implicit val as: ActorSystem = ActorSystem()
implicit val stringDeserializer: Deserializer[String] = new StringDeserializer
implicit val system: ActorSystem = ActorSystem()
implicit val stringDeserializer: Deserializer[String] = new StringDeserializer()

val stream = TopicLoader.load[String, String](NonEmptyList.one("topic-to-load"), LoadAll)
.mapAsync(1)(_ => ??? /* store records in akka.Actor for example */)
.runWith(Sink.ignore)
val stream = TopicLoader
.load[String, String](NonEmptyList.one("topic-to-load"), LoadAll)
.mapAsync(1)(_ => ??? /* store records in akka.Actor for example */ )
.runWith(Sink.ignore)
```

### Load and run

`loadAndRun` will load the topics, complete the `Future[Done]` from the materialised value and then carry on
running, emitting any new records that appear on the topics. An example use-case for this is a REST API that holds the
contents of a Kafka topic in memory. This kind of application doesn't need to commit offsets and can use the `Future[Done]` to determine readiness.

```scala
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import cats.data.NonEmptyList
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer, StringDeserializer}
import uk.sky.kafka.topicloader.TopicLoader

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.Future

object Main extends App {

implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val system: ActorSystem = ActorSystem()

import system.dispatcher

implicit val keyDeserializer: Deserializer[String] = new StringDeserializer
implicit val valueDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer
implicit val keyDeserializer: Deserializer[String] = new StringDeserializer()
implicit val valueDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer()

val state = new SimplifiedState

Expand All @@ -54,22 +75,52 @@ object Main extends App {
.run()

initialLoadingFuture.foreach(_ => state.isAppReady.set(true))
}

class SimplifiedState {
class SimplifiedState {

/**
* API requests may query this state
*/
val store = new ConcurrentHashMap[String, Array[Byte]]()
/** API requests may query this state
*/
val store = new ConcurrentHashMap[String, Array[Byte]]()

/**
* A readiness endpoint could be created that queries this
*/
val isAppReady = new AtomicBoolean()
/** A readiness endpoint could be created that queries this
*/
val isAppReady = new AtomicBoolean()
}
}
```

### Partitioned load

Data can also be loaded from specific partitions using `partitionedLoad`. By loading from specific partitions the topic
loader can be used by multiple application instances with separate streams per set of partitions (see [Alpakka kafka](https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition) and below).

```scala
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import cats.data.NonEmptyList
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{Deserializer, LongDeserializer, StringDeserializer}
import uk.sky.kafka.topicloader.{LoadAll, TopicLoader}

implicit val system: ActorSystem = ActorSystem()

implicit val keyDeserializer: Deserializer[String] = new StringDeserializer()
implicit val longDeserializer: LongDeserializer = new LongDeserializer()

val consumerSettings: ConsumerSettings[String, Long] = ???

val stream: Source[ConsumerRecord[String, Long], Consumer.Control] =
Consumer
.plainPartitionedSource(consumerSettings, Subscriptions.topics("topic-to-load"))
.flatMapConcat { case (topicPartition, source) =>
TopicLoader
.partitionedLoad[String, java.lang.Long](NonEmptyList.one(topicPartition), LoadAll)
.flatMapConcat(_ => source)
}
```

## Configuring your consumer group.id

You should configure the `akka.kafka.consumer.kafka-clients.group.id` to match that of your application.
Expand All @@ -87,27 +138,3 @@ akka.kafka {
}
}
```

## Source per partition

This is deprecated in favour of a new API for partitioned loading which is coming soon.

Data can also be loaded from specific partitions using `fromPartitions`. By loading from specific partitions the topic
loader can be used by multiple application instances with separate streams per set of partitions (see [Alpakka kafka](https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition) and below).

```scala
implicit val system = ActorSystem()

val consumerSettings: ConsumerSettings[String, Long] = ???
val doBusinessLogic: ConsumerRecord[String, Long] => Future[Unit] = ???

val stream: Source[ConsumerMessage.CommittableMessage[String, Long], Consumer.Control] =
Consumer
.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic-to-load"))
.flatMapConcat {
case (topicPartition, source) =>
TopicLoader
.fromPartitions(LoadAll, NonEmptyList.one(topicPartition), doBusinessLogic, new LongDeserializer())
.flatMapConcat(_ => source)
}
```
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Test / fork := true

Global / onChangedBuildSource := ReloadOnSourceChanges

Compile / doc / scalacOptions += "-no-link-warnings"

libraryDependencies ++= Dependencies.all

excludeDependencies ++= {
Expand Down
71 changes: 55 additions & 16 deletions src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package uk.sky.kafka.topicloader

import java.lang.{Long => JLong}
import java.util.{List => JList, Map => JMap, Optional}
import java.lang.Long as JLong
import java.util.{List as JList, Map as JMap, Optional}

import akka.Done
import akka.actor.ActorSystem
Expand All @@ -10,22 +10,24 @@ import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Keep, Source}
import cats.data.NonEmptyList
import cats.syntax.bifunctor._
import cats.syntax.option._
import cats.syntax.show._
import cats.syntax.bifunctor.*
import cats.syntax.option.*
import cats.syntax.show.*
import cats.{Bifunctor, Show}
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.serialization.*
import uk.sky.kafka.topicloader.config.{Config, TopicLoaderConfig}

import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.jdk.CollectionConverters.*

object TopicLoader extends TopicLoader {
private[topicloader] case class LogOffsets(lowest: Long, highest: Long)

private type MaybeConsumerSettings = Option[ConsumerSettings[Array[Byte], Array[Byte]]]

private case class HighestOffsetsWithRecord[K, V](
partitionOffsets: Map[TopicPartition, Long],
consumerRecord: Option[ConsumerRecord[K, V]] = none[ConsumerRecord[K, V]]
Expand Down Expand Up @@ -65,7 +67,7 @@ object TopicLoader extends TopicLoader {

trait TopicLoader extends LazyLogging {

import TopicLoader._
import TopicLoader.*

/** Source that loads the specified topics from the beginning and completes when the offsets reach the point specified
* by the requested strategy. Materializes to a Future[Consumer.Control] where the Future represents the retrieval of
Expand All @@ -74,7 +76,7 @@ trait TopicLoader extends LazyLogging {
def load[K : Deserializer, V : Deserializer](
topics: NonEmptyList[String],
strategy: LoadTopicStrategy,
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None
maybeConsumerSettings: MaybeConsumerSettings = None
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Future[Consumer.Control]] = {
val config =
Config
Expand All @@ -88,10 +90,47 @@ trait TopicLoader extends LazyLogging {
*/
def loadAndRun[K : Deserializer, V : Deserializer](
topics: NonEmptyList[String],
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None
maybeConsumerSettings: MaybeConsumerSettings = None
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = {
val config = Config.loadOrThrow(system.settings.config).topicLoader
val logOffsetsF = logOffsetsForTopics(topics, LoadAll)
loadLogOffsetsAndRun[K, V](logOffsetsF, config, maybeConsumerSettings)
}

/** Same as [[TopicLoader.load]], but for a specified partitions. See
* [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition Akka's source per partition]]
* for how to get a partition assignment from Kafka.
*/
def partitionedLoad[K : Deserializer, V : Deserializer](
partitions: NonEmptyList[TopicPartition],
strategy: LoadTopicStrategy,
maybeConsumerSettings: MaybeConsumerSettings = None
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Future[Consumer.Control]] = {
val config =
Config
.loadOrThrow(system.settings.config)
.topicLoader
load(logOffsetsForPartitions(partitions, strategy), config, maybeConsumerSettings)
}

/** Same as [[TopicLoader.loadAndRun]], but for a specified partitions. See
* [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition Akka's source per partition]]
* for how to get a partition assignment from Kafka.
*/
def partitionedLoadAndRun[K : Deserializer, V : Deserializer](
partitions: NonEmptyList[TopicPartition],
maybeConsumerSettings: MaybeConsumerSettings = None
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = {
val config = Config.loadOrThrow(system.settings.config).topicLoader
val logOffsetsF = logOffsetsForPartitions(partitions, LoadAll)
loadLogOffsetsAndRun[K, V](logOffsetsF, config, maybeConsumerSettings)
}

protected def loadLogOffsetsAndRun[K : Deserializer, V : Deserializer](
logOffsetsF: Future[Map[TopicPartition, LogOffsets]],
config: TopicLoaderConfig,
maybeConsumerSettings: MaybeConsumerSettings = None
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = {
val config = Config.loadOrThrow(system.settings.config).topicLoader
val logOffsetsF = logOffsetsForTopics(topics, LoadAll)
val postLoadingSource = Source.futureSource(logOffsetsF.map { logOffsets =>
val highestOffsets = logOffsets.map { case (p, o) => p -> o.highest }
kafkaSource[K, V](highestOffsets, config, maybeConsumerSettings)
Expand Down Expand Up @@ -151,7 +190,7 @@ trait TopicLoader extends LazyLogging {
protected def load[K : Deserializer, V : Deserializer](
logOffsets: Future[Map[TopicPartition, LogOffsets]],
config: TopicLoaderConfig,
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]]
maybeConsumerSettings: MaybeConsumerSettings
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Future[Consumer.Control]] = {

def topicDataSource(offsets: Map[TopicPartition, LogOffsets]): Source[ConsumerRecord[K, V], Consumer.Control] = {
Expand Down Expand Up @@ -192,15 +231,15 @@ trait TopicLoader extends LazyLogging {
private def kafkaSource[K : Deserializer, V : Deserializer](
startingOffsets: Map[TopicPartition, Long],
config: TopicLoaderConfig,
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]]
maybeConsumerSettings: MaybeConsumerSettings
)(implicit system: ActorSystem) =
Consumer
.plainSource(consumerSettings(maybeConsumerSettings), Subscriptions.assignmentWithOffset(startingOffsets))
.buffer(config.bufferSize.value, OverflowStrategy.backpressure)
.map(cr => cr.bimap(_.deserialize[K](cr.topic), _.deserialize[V](cr.topic)))

def consumerSettings(
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]]
maybeConsumerSettings: MaybeConsumerSettings
)(implicit system: ActorSystem): ConsumerSettings[Array[Byte], Array[Byte]] =
maybeConsumerSettings.getOrElse(
ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package uk.sky.kafka.topicloader

import cats.data.ValidatedNec
import cats.implicits._
import cats.implicits.*
import com.typesafe.config.ConfigException

import scala.util.Try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package uk.sky.kafka.topicloader.config
import java.util.concurrent.TimeUnit

import cats.data.{Validated, ValidatedNec}
import cats.implicits._
import com.typesafe.config.{Config => TypesafeConfig, ConfigException}
import cats.implicits.*
import com.typesafe.config.{Config as TypesafeConfig, ConfigException}

import scala.concurrent.duration.FiniteDuration
import scala.util.Try
Expand Down
11 changes: 8 additions & 3 deletions src/test/scala/base/IntegrationSpecBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import akka.util.Timeout
import cats.data.NonEmptyList
import cats.syntax.option._
import cats.syntax.option.*
import com.typesafe.config.ConfigFactory
import io.github.embeddedkafka.Codecs.{stringDeserializer, stringSerializer}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, ConsumerRecords}
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.scalatest.Assertion
import org.scalatest.concurrent.Eventually
import utils.RandomPort

import scala.annotation.tailrec
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._
import scala.jdk.CollectionConverters.*

abstract class IntegrationSpecBase extends WordSpecBase with Eventually {

Expand Down Expand Up @@ -96,6 +96,11 @@ abstract class IntegrationSpecBase extends WordSpecBase with Eventually {
publishToKafka(topic, messages)
publishToKafka(topic, filler)
}

def publishToKafka(topic: String, partition: Int, messages: Seq[(String, String)]): Unit =
messages.foreach { case (k, v) =>
publishToKafka(new ProducerRecord[String, String](topic, partition, k, v))
}
}

trait KafkaConsumer { this: TestContext =>
Expand Down
Loading