Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement loadAndRun #7

Merged
merged 8 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion it/src/test/scala/integration/LoadExampleIntSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ class LoadExampleIntSpec extends KafkaSpecBase[IO] {
private def withKafkaContext(test: TestContext[IO] => IO[Assertion]): IO[Assertion] = {
object testContext extends TestContext[IO]
import testContext.*
embeddedKafka.use(_ => test(testContext))
embeddedKafka.surround(test(testContext))
}
}
65 changes: 45 additions & 20 deletions src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package uk.sky.fs2.kafka.topicloader

import cats.data.{NonEmptyList, NonEmptyMap, OptionT}
import cats.effect.Async
import cats.effect.kernel.Resource
import cats.syntax.all.*
import cats.{Monad, Order}
import fs2.kafka.{ConsumerRecord, ConsumerSettings, KafkaConsumer}
Expand Down Expand Up @@ -43,20 +44,40 @@ trait TopicLoader {
.evalMap { consumer =>
{
for {
allLogOffsets <- OptionT.liftF(logOffsetsForTopics(topics, strategy, consumer))
logOffsets <- OptionT.fromOption(NonEmptyMap.fromMap(SortedMap.from(allLogOffsets)))
logOffsets <- OptionT(logOffsetsForTopics(topics, strategy, consumer))
bcarter97 marked this conversation as resolved.
Show resolved Hide resolved
_ <- OptionT.liftF(
consumer.assign(logOffsets.keys) *>
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
)
} yield load(consumer, logOffsets)
}.getOrElse(Stream.empty)
}
.flatten

def loadAndRun[F[_] : Async : LoggerFactory, K, V](
topics: NonEmptyList[String],
consumerSettings: ConsumerSettings[F, K, V]
)(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] =
KafkaConsumer
.stream(consumerSettings)
.evalMap { consumer =>
{
for {
logOffsets <- OptionT(logOffsetsForTopics(topics, LoadAll, consumer))
_ <- OptionT.liftF(
consumer.assign(logOffsets.keys) *>
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
)
} yield consumer.records
.map(_.record)
.through(filterBelowHighestOffset(logOffsets))
preLoadStream <- OptionT.pure(load(consumer, logOffsets))
} yield preLoadStream.onFinalizeCase(onLoad) ++ consumer.records.map(_.record)
}.getOrElse(Stream.empty)
}
.flatten

def loadAndRun(): Unit = ()
private def load[F[_] : Async : LoggerFactory, K, V](
consumer: KafkaConsumer[F, K, V],
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
): Stream[F, ConsumerRecord[K, V]] = consumer.records.map(_.record).through(filterBelowHighestOffset(logOffsets))

private def filterBelowHighestOffset[F[_] : Monad : LoggerFactory, K, V](
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
Expand All @@ -73,20 +94,24 @@ trait TopicLoader {
topics: NonEmptyList[String],
strategy: LoadTopicStrategy,
consumer: KafkaConsumer[F, K, V]
): F[Map[TopicPartition, LogOffsets]] = for {
_ <- consumer.subscribe(topics)
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
endOffsets <- strategy match {
case LoadAll => consumer.endOffsets(topicPartitions)
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
}
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
partition -> LogOffsets(offset, endOffsets(partition))
}
_ <- consumer.unsubscribe
} yield logOffsets.filter { case (_, o) => o.highest > o.lowest }
): F[Option[NonEmptyMap[TopicPartition, LogOffsets]]] =
for {
_ <- consumer.subscribe(topics)
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
endOffsets <- strategy match {
case LoadAll => consumer.endOffsets(topicPartitions)
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
}
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
partition -> LogOffsets(offset, endOffsets(partition))
}
_ <- consumer.unsubscribe
} yield {
val offsets = logOffsets.filter { case (_, o) => o.highest > o.lowest }
NonEmptyMap.fromMap(SortedMap.from(offsets))
bcarter97 marked this conversation as resolved.
Show resolved Hide resolved
}

private def earliestOffsets[F[_] : Monad, K, V](
consumer: KafkaConsumer[F, K, V],
Expand Down
24 changes: 23 additions & 1 deletion src/test/scala/base/AsyncIntSpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,30 @@
package base

import cats.effect.Async
import cats.effect.std.Dispatcher
import cats.effect.testing.scalatest.AsyncIOSpec
import cats.syntax.all.*
import org.scalactic.source.Position
import org.scalatest.OptionValues
import org.scalatest.concurrent.Eventually
import org.scalatest.enablers.Retrying
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Span
import org.scalatest.wordspec.AsyncWordSpec

trait AsyncIntSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues
import scala.concurrent.duration.*

trait AsyncIntSpec[F[_]] extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues with Eventually {
override implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 500.millis)

implicit def fRetrying[T](implicit F: Async[F]): Retrying[F[T]] = new Retrying[F[T]] {
override def retry(timeout: Span, interval: Span, pos: Position)(fun: => F[T]): F[T] =
Dispatcher.sequential[F].use { dispatcher =>
F.fromFuture(
F.executionContext.map(
Retrying.retryingNatureOfFutureT[T](_).retry(timeout, interval, pos)(dispatcher.unsafeToFuture(fun))
)
)
}
}
}
2 changes: 1 addition & 1 deletion src/test/scala/base/KafkaSpecBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ package base

import utils.{EmbeddedKafka, KafkaHelpers}

abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec with KafkaHelpers[F] with EmbeddedKafka[F]
abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec[F] with KafkaHelpers[F] with EmbeddedKafka[F]
5 changes: 0 additions & 5 deletions src/test/scala/base/UnitSpecBase.scala

This file was deleted.

8 changes: 0 additions & 8 deletions src/test/scala/base/WordSpecBase.scala

This file was deleted.

31 changes: 27 additions & 4 deletions src/test/scala/integration/TopicLoaderIntSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package integration

import base.KafkaSpecBase
import cats.data.NonEmptyList
import cats.effect.IO
import cats.effect.{IO, Ref}
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
import io.github.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.common.errors.TimeoutException as KafkaTimeoutException
Expand Down Expand Up @@ -184,8 +184,31 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] {

"loadAndRun" should {

"execute callback when finished loading and keep streaming" in {
pending
"execute callback when finished loading and keep streaming" in withKafkaContext { ctx =>
import ctx.*

val (preLoad, postLoad) = records(1 to 15).splitAt(10)

for {
loadState <- Ref.of[IO, Boolean](false)
topicState <- Ref.empty[IO, Seq[(String, String)]]
_ <- createCustomTopics(NonEmptyList.one(testTopic1))
_ <- publishStringMessages(testTopic1, preLoad)
assertion <- loadAndRunR(NonEmptyList.one(testTopic1))(
_ => loadState.set(true),
r => topicState.getAndUpdate(_ :+ r).void
).surround {
for {
_ <- eventually(topicState.get.asserting(_ should contain theSameElementsAs preLoad))
_ <- loadState.get.asserting(_ shouldBe true)
_ <- publishStringMessages(testTopic1, postLoad)
assertion <-
eventually(
topicState.get.asserting(_ should contain theSameElementsAs (preLoad ++ postLoad))
)
} yield assertion
}
} yield assertion
}

}
Expand All @@ -198,6 +221,6 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] {
private def withKafkaContext(test: TestContext[IO] => IO[Assertion]): IO[Assertion] = {
object testContext extends TestContext[IO]
import testContext.*
embeddedKafka.use(_ => test(testContext))
embeddedKafka.surround(test(testContext))
}
}
81 changes: 45 additions & 36 deletions src/test/scala/utils/KafkaHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ package utils

import java.util.UUID

import base.AsyncIntSpec
import cats.data.{NonEmptyList, NonEmptySet}
import cats.effect.implicits.*
import cats.effect.kernel.Fiber
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import fs2.Stream
import fs2.kafka.{AutoOffsetReset, ConsumerRecord, ConsumerSettings, KafkaConsumer}
import io.github.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.common.TopicPartition
import org.scalatest.exceptions.TestFailedException
import org.scalatest.Assertion
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.slf4j.Slf4jFactory
import uk.sky.fs2.kafka.topicloader.{LoadTopicStrategy, TopicLoader}

import scala.concurrent.duration.*

trait KafkaHelpers[F[_]] {
self: EmbeddedKafka[F] =>
self: AsyncIntSpec[F] & EmbeddedKafka[F] =>

val groupId = "test-consumer-group"
val testTopic1 = "load-state-topic-1"
Expand Down Expand Up @@ -65,6 +68,29 @@ trait KafkaHelpers[F[_]] {
TopicLoader.load(topics, strategy, consumerSettings).compile.toList.map(_.map(recordToTuple))
}

def loadAndRunR(topics: NonEmptyList[String])(
onLoad: Resource.ExitCase => F[Unit],
onRecord: ((String, String)) => F[Unit]
)(implicit
consumerSettings: ConsumerSettings[F, String, String],
F: Async[F]
): Resource[F, Fiber[F, Throwable, Unit]] = Resource.make {
loadAndRunLoader(topics)(onLoad)
.map(recordToTuple)
.evalTap(onRecord)
.compile
.drain
.start
}(_.cancel.void)

def loadAndRunLoader(topics: NonEmptyList[String])(onLoad: Resource.ExitCase => F[Unit])(implicit
consumerSettings: ConsumerSettings[F, String, String],
F: Async[F]
): Stream[F, ConsumerRecord[String, String]] = {
implicit val loggerFactory: LoggerFactory[F] = Slf4jFactory.create[F]
TopicLoader.loadAndRun(topics, consumerSettings)(onLoad)
}

def moveOffsetToEnd(
partitions: NonEmptySet[TopicPartition]
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): Stream[F, KafkaConsumer[F, String, String]] =
Expand Down Expand Up @@ -92,43 +118,33 @@ trait KafkaHelpers[F[_]] {

def waitForCompaction(
partitions: NonEmptySet[TopicPartition]
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Unit] =
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] =
consumeEventually(partitions) { r =>
for {
records <- r
messageKeys = records.map { case (k, _) => k }
result <-
if (messageKeys.sorted == messageKeys.toSet.toList.sorted) F.unit
else F.raiseError(new TestFailedException("Topic has not compacted within timeout", 1))
} yield result
} yield {
messageKeys should not be empty
messageKeys should contain theSameElementsAs messageKeys.toSet
}
}

def consumeEventually(
partitions: NonEmptySet[TopicPartition],
groupId: String = UUID.randomUUID().toString
)(
f: F[List[(String, String)]] => F[Unit]
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Unit] =
retry(
fa = {
val records = withAssignedConsumer[F[List[ConsumerRecord[String, String]]]](
autoCommit = false,
offsetReset = AutoOffsetReset.Earliest,
partitions,
groupId.some
)(
_.records
.map(_.record)
.interruptAfter(5.second)
.compile
.toList
)

f(records.map(_.map(r => r.key -> r.value)))
},
delay = 1.second,
max = 5
)
f: F[List[(String, String)]] => F[Assertion]
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] =
eventually {
val records = withAssignedConsumer[F[List[ConsumerRecord[String, String]]]](
autoCommit = false,
offsetReset = AutoOffsetReset.Earliest,
partitions,
groupId.some
)(_.records.map(_.record).interruptAfter(5.second).compile.toList)

f(records.map(_.map(r => r.key -> r.value)))
}

def withAssignedConsumer[T](
autoCommit: Boolean,
Expand Down Expand Up @@ -164,11 +180,4 @@ trait KafkaHelpers[F[_]] {
val settings = groupId.fold(baseSettings)(baseSettings.withGroupId)
KafkaConsumer[F].resource(settings)
}

def retry[A](fa: F[A], delay: FiniteDuration, max: Int)(implicit F: Async[F]): F[A] =
if (max <= 1) fa
else
fa handleErrorWith { _ =>
F.sleep(delay) *> retry(fa, delay, max - 1)
}
}