From ff71a92f188a0764f143fb17c6e69af6e48c8cde Mon Sep 17 00:00:00 2001 From: Greg Methvin Date: Tue, 11 May 2021 01:19:49 -0700 Subject: [PATCH] transaction support --- .github/workflows/build.yml | 4 +- .github/workflows/pr.yml | 4 +- .github/workflows/release.yml | 4 +- .sbtopts | 2 - docker-compose.yml | 18 +++++ .../PulsarCommittableSourceGraphStage.scala | 9 +-- .../pulsar4s/cats/CatsAsyncHandler.scala | 23 ++++-- .../com/sksamuel/pulsar4s/AsyncHandler.scala | 13 +++- .../com/sksamuel/pulsar4s/Consumer.scala | 39 ++++++++--- .../pulsar4s/FutureAsyncHandler.scala | 28 ++++++-- .../com/sksamuel/pulsar4s/Producer.scala | 70 +++++++++++-------- .../com/sksamuel/pulsar4s/PulsarClient.scala | 40 +++++++++++ .../pulsar4s/PulsarClientConfig.scala | 1 + .../pulsar4s/FutureAsyncHandlerTest.scala | 23 +++++- .../pulsar4s/monixs/MonixAsyncHandler.scala | 22 ++++-- .../pulsar4s/scalaz/ScalazAsyncHandler.scala | 20 ++++-- .../pulsar4s/zio/ZioAsyncHandler.scala | 27 +++++-- 17 files changed, 262 insertions(+), 85 deletions(-) create mode 100644 docker-compose.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1597b6bc..e58f747d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -26,7 +26,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: Import GPG key id: import_gpg @@ -64,7 +64,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: Import GPG key id: import_gpg diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index ed38104c..3f2de4c2 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -22,7 +22,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: run tests run: sbt ++2.12.10 test @@ -41,7 +41,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: run tests run: sbt ++2.13.3 test diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index cff6025a..16d4d44d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -26,7 +26,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: Import GPG key id: import_gpg @@ -66,7 +66,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: Import GPG key id: import_gpg diff --git a/.sbtopts b/.sbtopts index 6a552caa..8209f6e9 100644 --- a/.sbtopts +++ b/.sbtopts @@ -1,4 +1,2 @@ -J-Xmx4G -J-XX:MaxMetaspaceSize=1G --J-XX:MaxPermSize=1G --J-XX:+CMSClassUnloadingEnabled \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..d4f9a403 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,18 @@ +version: "3" + +services: + pulsar_standalone: + image: apachepulsar/pulsar:2.7.1 + ports: + - "8080:8080" + - "6650:6650" + volumes: + - ./data:/pulsar/data + environment: + - PULSAR_PREFIX_transactionCoordinatorEnabled=true + command: > + /bin/bash -c " + bin/apply-config-from-env.py conf/standalone.conf + && (bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone &) + && bin/pulsar standalone + " diff --git a/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala b/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala index 8ccc5749..39a5b13e 100644 --- a/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala +++ b/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala @@ -12,6 +12,7 @@ import com.sksamuel.exts.Logging import com.sksamuel.pulsar4s.Consumer import com.sksamuel.pulsar4s.ConsumerMessage import com.sksamuel.pulsar4s.MessageId +import com.sksamuel.pulsar4s.TransactionContext import org.apache.pulsar.client.api.ConsumerStats import scala.concurrent.ExecutionContext @@ -24,7 +25,7 @@ import scala.util.Try import scala.util.control.NonFatal trait CommittableMessage[T] { - def ack(cumulative: Boolean = false): Future[Done] + def ack(cumulative: Boolean = false)(implicit txn: TransactionContext = TransactionContext.empty): Future[Done] def nack(): Future[Done] def message: ConsumerMessage[T] } @@ -47,12 +48,12 @@ class PulsarCommittableSourceGraphStage[T]( val message: ConsumerMessage[T] )(implicit ec: ExecutionContext) extends CommittableMessage[T] { def messageId: MessageId = message.messageId - override def ack(cumulative: Boolean): Future[Done] = { + override def ack(cumulative: Boolean)(implicit txn: TransactionContext): Future[Done] = { logger.debug(s"Acknowledging message: $message") val ackFuture = if (cumulative) { - consumer.acknowledgeCumulativeAsync(message.messageId) + txn(consumer).acknowledgeCumulativeAsync(message.messageId) } else { - consumer.acknowledgeAsync(message.messageId) + txn(consumer).acknowledgeAsync(message.messageId) } ackFuture.map(_ => Done) } diff --git a/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala b/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala index 1dd11306..2f49a293 100644 --- a/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala +++ b/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala @@ -9,6 +9,7 @@ import com.sksamuel.pulsar4s import com.sksamuel.pulsar4s._ import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{Consumer => _, MessageId => _, Producer => _, PulsarClient => _, Reader => _, _} +import org.apache.pulsar.client.api.transaction.Transaction import scala.collection.JavaConverters.iterableAsScalaIterableConverter import scala.concurrent.ExecutionException @@ -131,14 +132,16 @@ trait CatsAsyncHandlerLowPriority { } override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] = - Async[F].delay { - consumer.acknowledgeAsync(messageId) - }.liftF.void + Async[F].delay(consumer.acknowledgeAsync(messageId)).liftF.void + + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] = + Async[F].delay(consumer.acknowledgeAsync(messageId, txn)).liftF.void override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] = - Async[F].delay { - consumer.acknowledgeCumulativeAsync(messageId) - }.liftF.void + Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId)).liftF.void + + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] = + Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId, txn)).liftF.void override def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] = Async[F].delay { @@ -169,6 +172,14 @@ trait CatsAsyncHandlerLowPriority { override def send[T](builder: TypedMessageBuilder[T]): F[MessageId] = Async[F].delay(builder.sendAsync()).liftF.map(MessageId.fromJava) + + override def withTransaction[T](create: () => CompletableFuture[Transaction], action: Transaction => F[T]): F[T] = { + Resource.makeCase(Async[F].delay(create()).liftF) { + case (txn, ExitCase.Completed) => Async[F].delay(txn.commit()) + case (txn, _) => Async[F].delay(txn.abort()) + } + .use(action) + } } } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala index f2cd6f25..a64bb445 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala @@ -1,7 +1,10 @@ package com.sksamuel.pulsar4s +import java.util.concurrent.CompletableFuture + import org.apache.pulsar.client.api import org.apache.pulsar.client.api.TypedMessageBuilder +import org.apache.pulsar.client.api.transaction.Transaction import scala.concurrent.{ExecutionContext, Future} import scala.util.Try @@ -15,7 +18,9 @@ trait AsyncHandler[F[_]] { def createConsumer[T](builder: api.ConsumerBuilder[T]): F[Consumer[T]] def createReader[T](builder: api.ReaderBuilder[T]): F[Reader[T]] - def send[T](t: T, producer: api.Producer[T]): F[MessageId] + @deprecated("Use send(builder) instead", "2.8.0") + def send[T](t: T, producer: api.Producer[T]): F[MessageId] = send(producer.newMessage().value(t)) + def send[T](builder: TypedMessageBuilder[T]): F[MessageId] def receive[T](consumer: api.Consumer[T]): F[ConsumerMessage[T]] def receiveBatch[T](consumer: api.Consumer[T]): F[Vector[ConsumerMessage[T]]] @@ -39,8 +44,12 @@ trait AsyncHandler[F[_]] { def getLastMessageId[T](consumer: api.Consumer[T]): F[MessageId] def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] - def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] + def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] + def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] + def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] + + def withTransaction[T](create: () => CompletableFuture[Transaction], action: Transaction => F[T]): F[T] } object AsyncHandler { diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala index b64ceb5e..f1a9ca34 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala @@ -5,11 +5,24 @@ import java.util.concurrent.TimeUnit import com.sksamuel.exts.Logging import org.apache.pulsar.client.api.ConsumerStats +import org.apache.pulsar.client.api.transaction.Transaction import scala.concurrent.duration.FiniteDuration import scala.util.Try -trait Consumer[T] extends Closeable { +trait TransactionalConsumerOps[T] { + final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = + acknowledgeAsync(message.messageId) + + def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] + + final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = + acknowledgeCumulativeAsync(message.messageId) + + def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] +} + +trait Consumer[T] extends Closeable with TransactionalConsumerOps[T] { /** * Receives a single message. @@ -65,23 +78,15 @@ trait Consumer[T] extends Closeable { def acknowledgeCumulative(message: ConsumerMessage[T]): Unit def acknowledgeCumulative(messageId: MessageId): Unit - final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = - acknowledgeAsync(message.messageId) - - def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] - final def negativeAcknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = negativeAcknowledgeAsync(message.messageId) def negativeAcknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] - final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = - acknowledgeCumulativeAsync(message.messageId) - - def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] - def unsubscribe(): Unit def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit] + + def txn(implicit ctx: TransactionContext): TransactionalConsumerOps[T] } class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Logging { @@ -109,6 +114,7 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] = implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId) + override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] = implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId) @@ -142,4 +148,15 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin override def unsubscribe(): Unit = consumer.unsubscribe() override def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit] = implicitly[AsyncHandler[F]].unsubscribeAsync(consumer) + + override def txn(implicit ctx: TransactionContext): TransactionalConsumerOps[T] = + ctx.transaction.map(new DefaultTransactionalConsumer[T](consumer, _)).getOrElse(this) +} + +class DefaultTransactionalConsumer[T](consumer: JConsumer[T], transaction: Transaction) extends TransactionalConsumerOps[T] { + override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] = + implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId, transaction) + + override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] = + implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId, transaction) } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala index 9508eb61..100483e3 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala @@ -4,6 +4,7 @@ import java.util.concurrent.CompletableFuture import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{ConsumerBuilder, ReaderBuilder, TypedMessageBuilder} +import org.apache.pulsar.client.api.transaction.Transaction import scala.collection.JavaConverters.iterableAsScalaIterableConverter import scala.compat.java8.FutureConverters @@ -32,11 +33,6 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut override def createReader[T](builder: ReaderBuilder[T]): Future[Reader[T]] = builder.createAsync().thenApply[Reader[T]](new DefaultReader(_)).toScala - override def send[T](t: T, producer: api.Producer[T]): Future[MessageId] = { - val future = producer.sendAsync(t) - FutureConverters.toScala(future).map(MessageId.fromJava) - } - override def receive[T](consumer: api.Consumer[T]): Future[ConsumerMessage[T]] = { val future = consumer.receiveAsync() FutureConverters.toScala(future).map(ConsumerMessage.fromJava) @@ -75,12 +71,18 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] = consumer.acknowledgeAsync(messageId).toScala - override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] = - Future.successful(consumer.negativeAcknowledge(messageId)) + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] = + consumer.acknowledgeAsync(messageId, txn).toScala override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] = consumer.acknowledgeCumulativeAsync(messageId).toScala + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] = + consumer.acknowledgeCumulativeAsync(messageId, txn).toScala + + override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] = + Future.successful(consumer.negativeAcknowledge(messageId)) + override def close(reader: api.Reader[_]): Future[Unit] = reader.closeAsync().toScala override def flush(producer: api.Producer[_]): Future[Unit] = producer.flushAsync().toScala @@ -92,4 +94,16 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut override def send[T](builder: TypedMessageBuilder[T]): Future[MessageId] = builder.sendAsync().toScala.map(MessageId.fromJava) + + override def withTransaction[T](create: () => CompletableFuture[Transaction], action: Transaction => Future[T]): Future[T] = { + create().toScala.flatMap { txn => + action(txn).transformWith { + case Success(value) => + txn.commit().toScala.transform(_ => Success(value)) + case Failure(exception) => + txn.abort().toScala.transform(_ => Failure(exception)) + } + } + } + } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Producer.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Producer.scala index 86e55def..53900429 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Producer.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Producer.scala @@ -4,10 +4,36 @@ import java.io.Closeable import com.sksamuel.exts.Logging import org.apache.pulsar.client.api.{ProducerStats, TypedMessageBuilder} +import org.apache.pulsar.client.api.transaction.Transaction import scala.util.Try -trait Producer[T] extends Closeable with Logging { +trait TransactionalProducerOps[T] { + + /** + * Asynchronously sends a message of type T, returning an effect + * which will be completed with the [[MessageId]] once the message + * is acknowledged by the Pulsar broker. + * + * This method can be used when you have no need to set the + * other properties of a message, such as the event time, key, + * headers and so on. The producer will generate an appropriate + * Pulsar [[ProducerMessage]] with this t set as the value. + */ + def sendAsync[F[_] : AsyncHandler](t: T): F[MessageId] + + /** + * Asynchronously sends a [[ProducerMessage]] of type T, returning an effect + * which will be completed with the [[MessageId]] once the message + * is acknowledged by the Pulsar broker. + * + * This method can be used when you want to specify properties + * on a message such as the event time, key and so on. + */ + def sendAsync[F[_] : AsyncHandler](msg: ProducerMessage[T]): F[MessageId] +} + +trait Producer[T] extends Closeable with Logging with TransactionalProducerOps[T] { /** * Returns the [[ProducerName]] which could have been specified @@ -39,28 +65,6 @@ trait Producer[T] extends Closeable with Logging { */ def send(msg: ProducerMessage[T]): Try[MessageId] - /** - * Asynchronously sends a message of type T, returning an effect - * which will be completed with the [[MessageId]] once the message - * is acknowledged by the Pulsar broker. - * - * This method can be used when you have no need to set the - * other properties of a message, such as the event time, key, - * headers and so on. The producer will generate an appropriate - * Pulsar [[ProducerMessage]] with this t set as the value. - */ - def sendAsync[F[_] : AsyncHandler](t: T): F[MessageId] - - /** - * Asynchronously sends a [[ProducerMessage]] of type T, returning an effect - * which will be completed with the [[MessageId]] once the message - * is acknowledged by the Pulsar broker. - * - * This method can be used when you want to specify properties - * on a message such as the event time, key and so on. - */ - def sendAsync[F[_] : AsyncHandler](msg: ProducerMessage[T]): F[MessageId] - /** * Get the last sequence id that was published by this producer. * @@ -106,14 +110,16 @@ trait Producer[T] extends Closeable with Logging { def flush(): Unit def flushAsync[F[_] : AsyncHandler]: F[Unit] + + def txn(implicit ctx: TransactionContext): TransactionalProducerOps[T] } class DefaultProducer[T](producer: JProducer[T]) extends Producer[T] { override def name: ProducerName = ProducerName(producer.getProducerName) - override def send(t: T): Try[MessageId] = Try(MessageId.fromJava(producer.send(t))) - override def sendAsync[F[_] : AsyncHandler](t: T): F[MessageId] = AsyncHandler[F].send(t, producer) + override final def send(t: T): Try[MessageId] = send(ProducerMessage(t)) + override final def sendAsync[F[_] : AsyncHandler](t: T): F[MessageId] = sendAsync(ProducerMessage(t)) override def send(msg: ProducerMessage[T]): Try[MessageId] = Try(buildMessage(msg).send()) override def sendAsync[F[_] : AsyncHandler](msg: ProducerMessage[T]): F[MessageId] = AsyncHandler[F].send(buildMessage(msg)) @@ -134,13 +140,21 @@ class DefaultProducer[T](producer: JProducer[T]) extends Producer[T] { override def closeAsync[F[_] : AsyncHandler]: F[Unit] = AsyncHandler[F].close(producer) - private def buildMessage(msg: ProducerMessage[T]) = new ProducerMessageBuilder(producer).build(msg) + override def txn(implicit ctx: TransactionContext): TransactionalProducerOps[T] = + ctx.transaction.map(new DefaultTransactionalProducer[T](producer, _)).getOrElse(this) + + protected def buildMessage(msg: ProducerMessage[T]) = new ProducerMessageBuilder(producer, None).build(msg) +} + +class DefaultTransactionalProducer[T](producer: JProducer[T], transaction: Transaction) extends DefaultProducer[T](producer) { + override protected def buildMessage(msg: ProducerMessage[T]) = new ProducerMessageBuilder[T](producer, Some(transaction)).build(msg) } -class ProducerMessageBuilder[T](producer: JProducer[T]) { +class ProducerMessageBuilder[T](producer: JProducer[T], transaction: Option[Transaction]) { def build(msg: ProducerMessage[T]): TypedMessageBuilder[T] = { import scala.collection.JavaConverters._ - val builder = producer.newMessage().value(msg.value) + + val builder = transaction.fold(producer.newMessage())(producer.newMessage).value(msg.value) msg.deliverAt.foreach { da => builder.deliverAt(da) } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClient.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClient.scala index 34f34a39..4f9c7ee9 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClient.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClient.scala @@ -6,8 +6,10 @@ import java.util.{UUID, Set => JSet} import com.sksamuel.exts.Logging import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{ConsumerBuilder, ProducerBuilder, ReaderBuilder, Schema} +import org.apache.pulsar.client.api.transaction.Transaction import scala.collection.JavaConverters._ +import scala.concurrent.duration._ case class Topic(name: String) @@ -22,6 +24,32 @@ object Subscription { def generate: Subscription = Subscription(UUID.randomUUID.toString) } +sealed trait TransactionContext { + /** + * The underlying transaction associated with this context + */ + def transaction: Option[Transaction] + + /** + * Get an instance of `TransactionalConsumerOps` that provides transactional operations on the consumer. + */ + final def apply[T](consumer: Consumer[T]): TransactionalConsumerOps[T] = consumer.txn(this) + + /** + * Get an instance of `TransactionalProducerOps` that provides transactional operations on the consumer. + */ + final def apply[T](producer: Producer[T]): TransactionalProducerOps[T] = producer.txn(this) +} +object TransactionContext { + def apply(txn: Transaction): TransactionContext = new TransactionContext { + lazy val transaction: Option[Transaction] = Some(txn) + } + + final val empty: TransactionContext = new TransactionContext { + def transaction: Option[Transaction] = None + } +} + trait PulsarClient { def close(): Unit def producer[T: Schema](config: ProducerConfig, interceptors: List[ProducerInterceptor[T]] = Nil): Producer[T] @@ -37,6 +65,10 @@ trait PulsarAsyncClient extends PulsarClient { def consumerAsync[T: Schema, F[_] : AsyncHandler](config: ConsumerConfig, interceptors: List[ConsumerInterceptor[T]] = Nil): F[Consumer[T]] def readerAsync[T: Schema, F[_] : AsyncHandler](config: ReaderConfig): F[Reader[T]] + + final def transaction[T, F[_] : AsyncHandler](action: TransactionContext => F[T]): F[T] = withTransaction(1.minute)(action) + + def withTransaction[T, F[_]: AsyncHandler](timeout: FiniteDuration)(action: TransactionContext => F[T]): F[T] } trait ProducerInterceptor[T] extends AutoCloseable { @@ -125,6 +157,7 @@ object PulsarClient { config.keepAliveInterval.map(_.toSeconds.toInt).foreach(builder.keepAliveInterval(_, TimeUnit.SECONDS)) config.statsInterval.map(_.toMillis).foreach(builder.statsInterval(_, TimeUnit.MILLISECONDS)) config.tlsTrustCertsFilePath.foreach(builder.tlsTrustCertsFilePath) + config.enableTransaction.foreach(builder.enableTransaction) if (config.additionalProperties.nonEmpty) builder.loadConf(config.additionalProperties.asJava) @@ -245,4 +278,11 @@ class DefaultPulsarClient(client: org.apache.pulsar.client.api.PulsarClient) ext override def readerAsync[T: Schema, F[_] : AsyncHandler](config: ReaderConfig): F[Reader[T]] = { implicitly[AsyncHandler[F]].createReader(readerBuilder(config)) } + + override def withTransaction[T, F[_]: AsyncHandler](timeout: FiniteDuration)(action: TransactionContext => F[T]): F[T] = { + implicitly[AsyncHandler[F]].withTransaction( + () => client.newTransaction().withTransactionTimeout(timeout.length, timeout.unit).build(), + txn => action(TransactionContext(txn)) + ) + } } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClientConfig.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClientConfig.scala index a9fb1ca0..df3cc4c9 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClientConfig.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClientConfig.scala @@ -19,4 +19,5 @@ case class PulsarClientConfig(serviceUrl: String, maxLookupRequests: Option[Int] = None, tlsTrustCertsFilePath: Option[String] = None, ioThreads: Option[Int] = None, + enableTransaction: Option[Boolean] = None, additionalProperties: Map[String, AnyRef] = Map.empty) diff --git a/pulsar4s-core/src/test/scala/com/sksamuel/pulsar4s/FutureAsyncHandlerTest.scala b/pulsar4s-core/src/test/scala/com/sksamuel/pulsar4s/FutureAsyncHandlerTest.scala index e46b5628..4109c065 100644 --- a/pulsar4s-core/src/test/scala/com/sksamuel/pulsar4s/FutureAsyncHandlerTest.scala +++ b/pulsar4s-core/src/test/scala/com/sksamuel/pulsar4s/FutureAsyncHandlerTest.scala @@ -6,17 +6,21 @@ import org.apache.pulsar.client.api.Schema import org.scalatest.BeforeAndAfterAll import scala.concurrent.Await -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers + class FutureAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterAll { import scala.concurrent.ExecutionContext.Implicits.global implicit val schema: Schema[String] = Schema.STRING - private val client = PulsarClient("pulsar://localhost:6650") + private val client = PulsarClient(PulsarClientConfig( + serviceUrl = "pulsar://localhost:6650", + enableTransaction = Some(true) + )) private val topic = Topic("persistent://sample/standalone/ns1/futureasync_" + UUID.randomUUID()) override def afterAll(): Unit = { @@ -49,4 +53,19 @@ class FutureAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAft zipped.foreach(t => t._1 shouldBe t._2) consumer.close() } + + test("async producer and consumer can participate in transaction") { + val producer = client.producer(ProducerConfig(topic)) + val consumer = client.consumer(ConsumerConfig(topics = Seq(topic), subscriptionName = Subscription("mysub_" + UUID.randomUUID))) + consumer.seekEarliest() + val msgIdFt = client.transaction { txn => + for { + msg <- consumer.receiveAsync + msgId <- txn(producer).sendAsync(msg.value + "_test") + _ <- txn(consumer).acknowledgeAsync(msg.messageId) + } yield msgId + } + Await.result(msgIdFt, Duration.Inf) + consumer.close() + } } diff --git a/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala b/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala index c54dd5f8..49b6305b 100644 --- a/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala +++ b/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala @@ -7,12 +7,14 @@ import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, De import monix.eval.Task import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, TypedMessageBuilder} +import org.apache.pulsar.client.api.transaction.Transaction import scala.collection.JavaConverters.iterableAsScalaIterableConverter import scala.compat.java8.FutureConverters import scala.concurrent.Future import scala.language.implicitConversions import scala.util.{Failure, Success, Try} +import cats.effect.ExitCase class MonixAsyncHandler extends AsyncHandler[Task] { @@ -33,13 +35,6 @@ class MonixAsyncHandler extends AsyncHandler[Task] { override def createReader[T](builder: ReaderBuilder[T]): Task[pulsar4s.Reader[T]] = Task.deferFuture(FutureConverters.toScala(builder.createAsync())).map(new DefaultReader(_)) - override def send[T](t: T, producer: api.Producer[T]): Task[MessageId] = { - Task.deferFuture { - val future = producer.sendAsync(t) - FutureConverters.toScala(future) - }.map { id => MessageId.fromJava(id) } - } - override def receive[T](consumer: api.Consumer[T]): Task[ConsumerMessage[T]] = { Task.deferFuture { val future = consumer.receiveAsync() @@ -85,9 +80,15 @@ class MonixAsyncHandler extends AsyncHandler[Task] { override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeAsync(messageId) + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + consumer.acknowledgeAsync(messageId, txn) + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeCumulativeAsync(messageId) + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + consumer.acknowledgeCumulativeAsync(messageId, txn) + override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = Task { consumer.negativeAcknowledge(messageId) } @@ -104,6 +105,13 @@ class MonixAsyncHandler extends AsyncHandler[Task] { override def send[T](builder: TypedMessageBuilder[T]): Task[MessageId] = Task.deferFuture(builder.sendAsync()).map(MessageId.fromJava) + + override def withTransaction[T](create: () => CompletableFuture[Transaction], action: Transaction => Task[T]): Task[T] = { + Task.deferFuture(create()).bracketCase(action) { + case (txn, ExitCase.Completed) => Task.deferFuture(txn.commit()).map(_ => ()) + case (txn, _) => Task.deferFuture(txn.abort()).map(_ => ()) + } + } } object MonixAsyncHandler { diff --git a/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala b/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala index a0d65ee4..4b57d4ad 100644 --- a/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala +++ b/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala @@ -1,12 +1,12 @@ package com.sksamuel.pulsar4s.scalaz import java.util.concurrent.CompletableFuture -import java.util.function.BiConsumer import com.sksamuel.pulsar4s import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer} import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, TypedMessageBuilder} +import org.apache.pulsar.client.api.transaction.Transaction import scalaz.concurrent.Task import scala.collection.JavaConverters.iterableAsScalaIterableConverter @@ -40,9 +40,6 @@ class ScalazAsyncHandler extends AsyncHandler[Task] { override def createReader[T](builder: ReaderBuilder[T]): Task[pulsar4s.Reader[T]] = completableToTask(builder.createAsync()).map(new DefaultReader(_)) - override def send[T](t: T, producer: api.Producer[T]): Task[MessageId] = - completableToTask(producer.sendAsync(t)).map(MessageId.fromJava) - override def receive[T](consumer: api.Consumer[T]): Task[ConsumerMessage[T]] = completableToTask(consumer.receiveAsync).map(ConsumerMessage.fromJava) @@ -74,9 +71,15 @@ class ScalazAsyncHandler extends AsyncHandler[Task] { override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeAsync(messageId) + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + consumer.acknowledgeAsync(messageId, txn) + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeCumulativeAsync(messageId) + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + consumer.acknowledgeCumulativeAsync(messageId, txn) + override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = Task { consumer.negativeAcknowledge(messageId) } @@ -95,6 +98,15 @@ class ScalazAsyncHandler extends AsyncHandler[Task] { override def send[T](builder: TypedMessageBuilder[T]): Task[MessageId] = builder.sendAsync().map(MessageId.fromJava) + + override def withTransaction[T](create: () => CompletableFuture[Transaction], action: Transaction => Task[T]): Task[T] = { + create().flatMap { txn => + action(txn).onFinish { + case None => txn.commit() + case Some(e) => txn.abort() + } + } + } } object ScalazAsyncHandler { diff --git a/pulsar4s-zio/src/main/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandler.scala b/pulsar4s-zio/src/main/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandler.scala index eb4d58e3..8b05beb5 100644 --- a/pulsar4s-zio/src/main/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandler.scala +++ b/pulsar4s-zio/src/main/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandler.scala @@ -1,12 +1,14 @@ package com.sksamuel.pulsar4s.zio import java.util.concurrent.CompletionStage +import java.util.concurrent.CompletableFuture import com.sksamuel.pulsar4s import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer} import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, TypedMessageBuilder} -import zio.{Task, ZIO} +import org.apache.pulsar.client.api.transaction.Transaction +import zio.{Exit, Task, ZIO} import scala.collection.JavaConverters._ import scala.util.Try @@ -31,9 +33,6 @@ class ZioAsyncHandler extends AsyncHandler[Task] { override def createReader[T](builder: ReaderBuilder[T]): Task[pulsar4s.Reader[T]] = fromFuture(Task(builder.createAsync())) >>= (p => Task(new DefaultReader(p))) - override def send[T](t: T, producer: api.Producer[T]): Task[MessageId] = - fromFuture(Task(producer.sendAsync(t))).map(MessageId.fromJava) - override def send[T](builder: TypedMessageBuilder[T]): Task[MessageId] = fromFuture(Task(builder.sendAsync())).map(MessageId.fromJava) @@ -82,12 +81,28 @@ class ZioAsyncHandler extends AsyncHandler[Task] { override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = fromFuture(Task(consumer.acknowledgeAsync(messageId))).unit - override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = - Task(consumer.negativeAcknowledge(messageId)) + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + fromFuture(Task(consumer.acknowledgeAsync(messageId, txn))).unit override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = fromFuture(Task(consumer.acknowledgeCumulativeAsync(messageId))).unit + + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + fromFuture(Task(consumer.acknowledgeCumulativeAsync(messageId, txn))).unit + + override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = + Task(consumer.negativeAcknowledge(messageId)) + override def withTransaction[T](create: () => CompletableFuture[Transaction], action: Transaction => Task[T]): Task[T] = { + Task.bracketExit[Transaction, T]( + acquire = fromFuture(Task(create())), + release = { + case (txn, Exit.Success(_)) => fromFuture(Task(txn.abort())).ignore + case (txn, _) => fromFuture(Task(txn.commit())).ignore + }, + action + ) + } } object ZioAsyncHandler {