Skip to content

Commit

Permalink
Merge pull request #540 from gmethvin/pekko
Browse files Browse the repository at this point in the history
Add pekko-stream support
  • Loading branch information
judu authored Dec 7, 2023
2 parents ed03023 + daf24c5 commit ee13db1
Show file tree
Hide file tree
Showing 14 changed files with 1,398 additions and 0 deletions.
16 changes: 16 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ val ExtsVersion = "1.61.1"
val JacksonVersion = "2.14.1"
val Log4jVersion = "2.22.0"
val MonixVersion = "3.4.1"
val PekkoStreamVersion = "1.0.0"
val PlayJsonVersion = "2.10.0-RC7"
val PulsarVersion = "3.1.1"
val ReactiveStreamsVersion = "1.0.2"
Expand Down Expand Up @@ -167,6 +168,7 @@ lazy val root = Project("pulsar4s", file("."))
jackson,
json4s,
monix,
pekko_streams,
playjson,
scalaz,
sprayjson,
Expand Down Expand Up @@ -300,3 +302,17 @@ lazy val akka_streams = Project("pulsar4s-akka-streams", file("pulsar4s-akka-str
.settings(libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % AkkaStreamVersion
))

lazy val pekko_streams = Project("pulsar4s-pekko-streams", file("pulsar4s-pekko-streams"))
.dependsOn(core)
.settings(name := "pulsar4s-pekko-streams")
.settings(allSettings)
.settings(
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-stream" % PekkoStreamVersion
),
// ignore scala-java8-compat issues with scala 2.12
libraryDependencySchemes ++= (CrossVersion.partialVersion(scalaVersion.value).collect {
case (2, 12) => "org.scala-lang.modules" %% "scala-java8-compat" % VersionScheme.Always
})
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.sksamuel.pulsar4s.pekko.streams

import org.apache.pekko.Done
import org.apache.pekko.stream.Attributes
import org.apache.pekko.stream.Outlet
import org.apache.pekko.stream.SourceShape
import org.apache.pekko.stream.stage.AsyncCallback
import org.apache.pekko.stream.stage.GraphStageLogic
import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue
import org.apache.pekko.stream.stage.OutHandler
import com.sksamuel.exts.Logging
import com.sksamuel.pulsar4s.Consumer
import com.sksamuel.pulsar4s.ConsumerMessage
import com.sksamuel.pulsar4s.MessageId
import com.sksamuel.pulsar4s.TransactionContext
import org.apache.pulsar.client.api.ConsumerStats

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal

trait CommittableMessage[T] extends TransactionalCommittableMessageOps {
def nack(): Future[Done]
def message: ConsumerMessage[T]
def tx(implicit txn: TransactionContext): TransactionalCommittableMessageOps
}

trait TransactionalCommittableMessageOps {
def ack(cumulative: Boolean = false): Future[Done]
}

class PulsarCommittableSourceGraphStage[T](
create: () => Consumer[T],
seek: Option[MessageId],
closeDelay: FiniteDuration,
) extends GraphStageWithMaterializedValue[SourceShape[CommittableMessage[T]], Control]
with Logging {

@deprecated("Use main constructor", "2.7.1")
def this(create: () => Consumer[T], seek: Option[MessageId]) = this(create, seek, closeDelay = DefaultCloseDelay)

private val out = Outlet[CommittableMessage[T]]("pulsar.out")
override def shape: SourceShape[CommittableMessage[T]] = SourceShape(out)

private class CommittableMessageImpl[T](
val consumer: Consumer[T],
val message: ConsumerMessage[T],
val ctx: Option[TransactionContext] = None
)(implicit ec: ExecutionContext) extends CommittableMessage[T] {
def messageId: MessageId = message.messageId
override def ack(cumulative: Boolean): Future[Done] = {
logger.debug(s"Acknowledging message: $message")
val txnOps = ctx.map(consumer.tx(_)).getOrElse(consumer)
val ackFuture = if (cumulative) {
txnOps.acknowledgeCumulativeAsync(message.messageId)
} else {
txnOps.acknowledgeAsync(message.messageId)
}
ackFuture.map(_ => Done)
}
override def tx(implicit ctx: TransactionContext): TransactionalCommittableMessageOps = {
new CommittableMessageImpl(consumer, message, Some(ctx))
}
override def nack(): Future[Done] = {
logger.debug(s"Negatively acknowledging message: $message")
consumer.negativeAcknowledgeAsync(message.messageId).map(_ => Done)
}
}

private class PulsarCommittableSourceLogic(shape: Shape) extends GraphStageLogic(shape) with OutHandler with Control {
setHandler(out, this)

implicit def ec: ExecutionContext = materializer.executionContext

@inline private def consumer: Consumer[T] =
consumerOpt.getOrElse(throw new IllegalStateException("Consumer not initialized!"))
private var consumerOpt: Option[Consumer[T]] = None
private var receiveCallback: AsyncCallback[Try[ConsumerMessage[T]]] = getAsyncCallback {
case Success(msg) =>
logger.debug(s"Message received: $msg")
push(out, new CommittableMessageImpl(consumer, msg))
case Failure(e) =>
logger.warn("Error when receiving message", e)
failStage(e)
}
private val stopped: Promise[Done] = Promise()
private val stopCallback: AsyncCallback[Unit] = getAsyncCallback(_ => completeStage())

override def preStart(): Unit = {
try {
val consumer = create()
consumerOpt = Some(consumer)
stopped.future.onComplete { _ =>
// Schedule to stop after a delay to give unacked messages time to finish
materializer.scheduleOnce(closeDelay, () => close())
}
seek foreach consumer.seek
} catch {
case NonFatal(e) =>
logger.error("Error creating consumer!", e)
failStage(e)
}
}

override def onPull(): Unit = {
logger.debug("Pull received; asking consumer for message")
consumer.receiveAsync.onComplete(receiveCallback.invoke)
}

private def close()(implicit ec: ExecutionContext): Future[Done] = {
consumerOpt.fold(Future.successful(Done))(_.closeAsync.map(_ => Done))
}

override def complete()(implicit ec: ExecutionContext): Future[Done] = {
stopCallback.invoke(())
stopped.future
}

override def postStop(): Unit = stopped.success(Done)

override def shutdown()(implicit ec: ExecutionContext): Future[Done] = {
for {
_ <- complete()
_ <- close()
} yield Done
}

override def stats: ConsumerStats = consumer.stats
}

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Control) = {
val logic = new PulsarCommittableSourceLogic(shape)
(logic, logic)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.sksamuel.pulsar4s.pekko.streams

import org.apache.pekko.Done
import org.apache.pekko.stream.stage.{AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, InHandler}
import org.apache.pekko.stream.{Attributes, Inlet, SinkShape}
import com.sksamuel.exts.Logging
import com.sksamuel.pulsar4s.{Producer, ProducerMessage, Topic}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future, Promise}
import scala.util.{Failure, Success, Try}

class PulsarMultiSinkGraphStage[T](createFn: Topic => Producer[T], initTopics: Set[Topic] = Set.empty)
extends GraphStageWithMaterializedValue[SinkShape[(Topic, ProducerMessage[T])], Future[Done]]
with Logging {

private val in = Inlet.create[(Topic, ProducerMessage[T])]("pulsar.in")

override def shape: SinkShape[(Topic, ProducerMessage[T])] = SinkShape.of(in)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {

val promise = Promise[Done]()

val logic: GraphStageLogic = new GraphStageLogic(shape) with InHandler {
setHandler(in, this)

implicit def context: ExecutionContextExecutor = super.materializer.executionContext

var producers: Map[Topic, Producer[T]] = _
var produceCallback: AsyncCallback[Try[_]] = _
var error: Throwable = _

override def preStart(): Unit = {
producers = initTopics.map(t => t -> createFn(t)).toMap
produceCallback = getAsyncCallback {
case Success(_) => pull(in)
case Failure(e) =>
logger.error("Failing pulsar sink stage", e)
failStage(e)
}
pull(in)
}

private def getProducer(topic: Topic): Producer[T] =
producers.get(topic) match {
case Some(p) => p
case None =>
logger.debug(s"creating new producer for topic $topic")
val producer = createFn(topic)
producers += topic -> producer
producer
}

override def onPush(): Unit = {
try {
val (topic, message) = grab(in)
logger.debug(s"Sending message $message to $topic")
val producer = getProducer(topic)
producer.sendAsync(message).onComplete(produceCallback.invoke)
} catch {
case e: Throwable =>
logger.error("Failing pulsar sink stage", e)
failStage(e)
}
}

override def postStop(): Unit = {
logger.debug("Graph stage stopping; closing producers")
val fs = producers.flatMap { case (_, p) =>
Seq(
p.flushAsync,
p.closeAsync
)
}
Await.ready(Future.sequence(fs), 15.seconds)
}

override def onUpstreamFailure(ex: Throwable): Unit = {
promise.tryFailure(ex)
}

override def onUpstreamFinish(): Unit = {
promise.trySuccess(Done)
}
}

(logic, promise.future)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.sksamuel.pulsar4s.pekko.streams

import org.apache.pekko.Done
import org.apache.pekko.stream.{Attributes, Outlet, SourceShape}
import org.apache.pekko.stream.stage.{AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, OutHandler}
import com.sksamuel.exts.Logging
import com.sksamuel.pulsar4s.{ConsumerMessage, MessageId, Reader}
import org.apache.pulsar.client.api.ConsumerStats

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

class PulsarReaderSourceGraphStage[T](create: () => Reader[T], seek: Option[MessageId]) extends GraphStageWithMaterializedValue[SourceShape[ConsumerMessage[T]], Control] with Logging {

private val out = Outlet[ConsumerMessage[T]]("pulsar.out")
override def shape: SourceShape[ConsumerMessage[T]] = SourceShape(out)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Control) = {

val logic: GraphStageLogic with Control = new GraphStageLogic(shape) with OutHandler with Control {
setHandler(out, this)

implicit def ec: ExecutionContext = materializer.executionContext

@inline private def reader: Reader[T] = consumerOpt.getOrElse(throw new IllegalStateException("Reader not initialized!"))
private var consumerOpt: Option[Reader[T]] = None
private val receiveCallback: AsyncCallback[Try[ConsumerMessage[T]]] = getAsyncCallback {
case Success(msg) =>
push(out, msg)
case Failure(e) =>
failStage(e)
}
private val stopped: Promise[Done] = Promise()
private val stopCallback: AsyncCallback[Unit] = getAsyncCallback { _ => completeStage() }

override def preStart(): Unit = {
try {
val reader = create()
consumerOpt = Some(reader)
stopped.future.onComplete { _ =>close()}
seek foreach reader.seek
} catch {
case NonFatal(e) =>
logger.error("Error creating reader!", e)
failStage(e)
}
}

override def onPull(): Unit = {
logger.debug("Pull received; asking reader for message")
reader.nextAsync.onComplete(receiveCallback.invoke(_))
}

override def postStop(): Unit = stopped.success(Done)

override def complete()(implicit ec: ExecutionContext): Future[Done] = {
stopCallback.invoke(())
stopped.future
}

private def close()(implicit ec: ExecutionContext): Future[Done] =
consumerOpt.fold(Future.successful(Done))(_.closeAsync.map(_ => Done))

override def shutdown()(implicit ec: ExecutionContext): Future[Done] =
for {
_ <- complete()
_ <- close()
} yield Done

override def stats: ConsumerStats = ???
}

(logic, logic)
}
}
Loading

0 comments on commit ee13db1

Please sign in to comment.