From b8010c3120de9312690136bfff2627b411e8c984 Mon Sep 17 00:00:00 2001 From: Greg Methvin Date: Tue, 11 May 2021 01:19:49 -0700 Subject: [PATCH] transaction support --- .github/workflows/build.yml | 4 +- .github/workflows/pr.yml | 4 +- .github/workflows/release.yml | 4 +- .sbtopts | 2 - docker-compose.yml | 13 ++- .../PulsarCommittableSourceGraphStage.scala | 20 +++- .../pulsar4s/cats/CatsAsyncHandler.scala | 34 +++++-- .../pulsar4s/cats/CatsAsyncHandlerTest.scala | 1 + .../com/sksamuel/pulsar4s/AsyncHandler.scala | 19 +++- .../com/sksamuel/pulsar4s/Consumer.scala | 45 ++++++--- .../pulsar4s/FutureAsyncHandler.scala | 36 ++++++-- .../com/sksamuel/pulsar4s/Producer.scala | 76 ++++++++++------ .../com/sksamuel/pulsar4s/PulsarClient.scala | 91 +++++++++++++++++++ .../pulsar4s/PulsarClientConfig.scala | 1 + .../pulsar4s/FutureAsyncHandlerTest.scala | 63 ++++++++++++- .../pulsar4s/monixs/MonixAsyncHandler.scala | 35 +++++-- .../monixs/MonixAsyncHandlerTest.scala | 22 ++++- .../pulsar4s/scalaz/ScalazAsyncHandler.scala | 30 +++++- .../pulsar4s/zio/ZioAsyncHandler.scala | 36 ++++++-- .../pulsar4s/zio/ZioAsyncHandlerTest.scala | 27 +++++- 20 files changed, 465 insertions(+), 98 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c5b1b151..099146f5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -26,7 +26,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: Import GPG key id: import_gpg @@ -64,7 +64,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: Import GPG key id: import_gpg diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index ed38104c..3f2de4c2 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -22,7 +22,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: run tests run: sbt ++2.12.10 test @@ -41,7 +41,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: run tests run: sbt ++2.13.3 test diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index cff6025a..16d4d44d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -26,7 +26,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: Import GPG key id: import_gpg @@ -66,7 +66,7 @@ jobs: java-version: 11 - name: Launch pulsar docker - run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone + run: docker-compose up -d - name: Import GPG key id: import_gpg diff --git a/.sbtopts b/.sbtopts index 6a552caa..8209f6e9 100644 --- a/.sbtopts +++ b/.sbtopts @@ -1,4 +1,2 @@ -J-Xmx4G -J-XX:MaxMetaspaceSize=1G --J-XX:MaxPermSize=1G --J-XX:+CMSClassUnloadingEnabled \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index af2091a1..d8e51fe3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,6 @@ -version: '2' +version: "3" services: - standalone: image: apachepulsar/pulsar:2.8.0 ports: @@ -9,12 +8,16 @@ services: - "8080:8080" environment: - BOOKIE_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g" + - transactionCoordinatorEnabled=true command: > - /bin/bash -c - "bin/apply-config-from-env.py conf/standalone.conf - && bin/pulsar standalone --advertised-address standalone" + /bin/bash -c " + bin/apply-config-from-env.py conf/standalone.conf + && (bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone &) + && bin/pulsar standalone --advertised-address standalone + " dashboard: + profiles: ["dashboard"] image: apachepulsar/pulsar-dashboard depends_on: - standalone diff --git a/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala b/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala index 8ccc5749..9b486be5 100644 --- a/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala +++ b/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala @@ -12,6 +12,7 @@ import com.sksamuel.exts.Logging import com.sksamuel.pulsar4s.Consumer import com.sksamuel.pulsar4s.ConsumerMessage import com.sksamuel.pulsar4s.MessageId +import com.sksamuel.pulsar4s.TransactionContext import org.apache.pulsar.client.api.ConsumerStats import scala.concurrent.ExecutionContext @@ -23,10 +24,14 @@ import scala.util.Success import scala.util.Try import scala.util.control.NonFatal -trait CommittableMessage[T] { - def ack(cumulative: Boolean = false): Future[Done] +trait CommittableMessage[T] extends TransactionalCommittableMessageOps { def nack(): Future[Done] def message: ConsumerMessage[T] + def tx(implicit txn: TransactionContext): TransactionalCommittableMessageOps +} + +trait TransactionalCommittableMessageOps { + def ack(cumulative: Boolean = false): Future[Done] } class PulsarCommittableSourceGraphStage[T]( @@ -44,18 +49,23 @@ class PulsarCommittableSourceGraphStage[T]( private class CommittableMessageImpl[T]( val consumer: Consumer[T], - val message: ConsumerMessage[T] + val message: ConsumerMessage[T], + val ctx: Option[TransactionContext] = None )(implicit ec: ExecutionContext) extends CommittableMessage[T] { def messageId: MessageId = message.messageId override def ack(cumulative: Boolean): Future[Done] = { logger.debug(s"Acknowledging message: $message") + val txnOps = ctx.map(consumer.tx(_)).getOrElse(consumer) val ackFuture = if (cumulative) { - consumer.acknowledgeCumulativeAsync(message.messageId) + txnOps.acknowledgeCumulativeAsync(message.messageId) } else { - consumer.acknowledgeAsync(message.messageId) + txnOps.acknowledgeAsync(message.messageId) } ackFuture.map(_ => Done) } + override def tx(implicit ctx: TransactionContext): TransactionalCommittableMessageOps = { + new CommittableMessageImpl(consumer, message, Some(ctx)) + } override def nack(): Future[Done] = { logger.debug(s"Negatively acknowledging message: $message") consumer.negativeAcknowledgeAsync(message.messageId).map(_ => Done) diff --git a/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala b/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala index 1dd11306..2f75b9bb 100644 --- a/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala +++ b/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala @@ -9,6 +9,7 @@ import com.sksamuel.pulsar4s import com.sksamuel.pulsar4s._ import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{Consumer => _, MessageId => _, Producer => _, PulsarClient => _, Reader => _, _} +import org.apache.pulsar.client.api.transaction.Transaction import scala.collection.JavaConverters.iterableAsScalaIterableConverter import scala.concurrent.ExecutionException @@ -131,14 +132,16 @@ trait CatsAsyncHandlerLowPriority { } override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] = - Async[F].delay { - consumer.acknowledgeAsync(messageId) - }.liftF.void + Async[F].delay(consumer.acknowledgeAsync(messageId)).liftF.void + + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] = + Async[F].delay(consumer.acknowledgeAsync(messageId, txn)).liftF.void override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] = - Async[F].delay { - consumer.acknowledgeCumulativeAsync(messageId) - }.liftF.void + Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId)).liftF.void + + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] = + Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId, txn)).liftF.void override def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] = Async[F].delay { @@ -169,6 +172,25 @@ trait CatsAsyncHandlerLowPriority { override def send[T](builder: TypedMessageBuilder[T]): F[MessageId] = Async[F].delay(builder.sendAsync()).liftF.map(MessageId.fromJava) + + override def withTransaction[E, A]( + builder: api.transaction.TransactionBuilder, + action: TransactionContext => F[Either[E, A]] + ): F[Either[E, A]] = { + Resource.makeCase(startTransaction(builder)) { (txn, exitCase) => + if (exitCase == ExitCase.Completed) Async[F].unit else txn.abort + }.use { txn => + action(txn).flatMap { result => + (if (result.isRight) txn.commit else txn.abort).map(_ => result) + } + } + } + + def startTransaction(builder: api.transaction.TransactionBuilder): F[TransactionContext] = + Async[F].delay(builder.build()).liftF.map(TransactionContext(_)) + def commitTransaction(txn: Transaction): F[Unit] = Async[F].delay(txn.commit()).liftF.map(_ => ()) + def abortTransaction(txn: Transaction): F[Unit] = Async[F].delay(txn.abort()).liftF.map(_ => ()) + } } diff --git a/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala b/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala index 7e50ae91..25c8ecb5 100644 --- a/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala +++ b/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala @@ -12,6 +12,7 @@ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import scala.language.higherKinds +import scala.concurrent.duration._ import scala.util.Random class CatsAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterAll with Eventually { diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala index f2cd6f25..62cd82da 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala @@ -1,7 +1,10 @@ package com.sksamuel.pulsar4s +import java.util.concurrent.CompletableFuture + import org.apache.pulsar.client.api import org.apache.pulsar.client.api.TypedMessageBuilder +import org.apache.pulsar.client.api.transaction.Transaction import scala.concurrent.{ExecutionContext, Future} import scala.util.Try @@ -15,7 +18,9 @@ trait AsyncHandler[F[_]] { def createConsumer[T](builder: api.ConsumerBuilder[T]): F[Consumer[T]] def createReader[T](builder: api.ReaderBuilder[T]): F[Reader[T]] - def send[T](t: T, producer: api.Producer[T]): F[MessageId] + @deprecated("Use send(builder) instead", "2.8.0") + def send[T](t: T, producer: api.Producer[T]): F[MessageId] = send(producer.newMessage().value(t)) + def send[T](builder: TypedMessageBuilder[T]): F[MessageId] def receive[T](consumer: api.Consumer[T]): F[ConsumerMessage[T]] def receiveBatch[T](consumer: api.Consumer[T]): F[Vector[ConsumerMessage[T]]] @@ -39,8 +44,18 @@ trait AsyncHandler[F[_]] { def getLastMessageId[T](consumer: api.Consumer[T]): F[MessageId] def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] - def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] + def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] + def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] + def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] + + def withTransaction[E, A]( + builder: api.transaction.TransactionBuilder, + action: TransactionContext => F[Either[E, A]] + ): F[Either[E, A]] + def startTransaction(builder: api.transaction.TransactionBuilder): F[TransactionContext] + def commitTransaction(txn: Transaction): F[Unit] + def abortTransaction(txn: Transaction): F[Unit] } object AsyncHandler { diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala index b4ab25b9..aea7d99f 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala @@ -4,12 +4,28 @@ import java.io.Closeable import java.util.concurrent.TimeUnit import com.sksamuel.exts.Logging import org.apache.pulsar.client.api.ConsumerStats +import org.apache.pulsar.client.api.transaction.Transaction import scala.concurrent.duration.FiniteDuration import scala.language.higherKinds import scala.util.Try -trait Consumer[T] extends Closeable { +/** + * Operations on the consumer that may be used in a transactional context. + */ +trait TransactionalConsumerOps[T] { + final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = + acknowledgeAsync(message.messageId) + + def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] + + final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = + acknowledgeCumulativeAsync(message.messageId) + + def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] +} + +trait Consumer[T] extends Closeable with TransactionalConsumerOps[T] { /** * Receives a single message. @@ -65,23 +81,18 @@ trait Consumer[T] extends Closeable { def acknowledgeCumulative(message: ConsumerMessage[T]): Unit def acknowledgeCumulative(messageId: MessageId): Unit - final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = - acknowledgeAsync(message.messageId) - - def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] - final def negativeAcknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = negativeAcknowledgeAsync(message.messageId) def negativeAcknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] - final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = - acknowledgeCumulativeAsync(message.messageId) - - def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] - def unsubscribe(): Unit def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit] + + /** + * Get an instance of `TransactionalConsumerOps` that provides transactional operations on the consumer. + */ + def tx(implicit ctx: TransactionContext): TransactionalConsumerOps[T] } class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Logging { @@ -109,6 +120,7 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] = implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId) + override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] = implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId) @@ -142,4 +154,15 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin override def unsubscribe(): Unit = consumer.unsubscribe() override def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit] = implicitly[AsyncHandler[F]].unsubscribeAsync(consumer) + + override def tx(implicit ctx: TransactionContext): TransactionalConsumerOps[T] = + new DefaultTransactionalConsumer[T](consumer, ctx.transaction) +} + +class DefaultTransactionalConsumer[T](consumer: JConsumer[T], transaction: Transaction) extends TransactionalConsumerOps[T] { + override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] = + implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId, transaction) + + override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] = + implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId, transaction) } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala index 9508eb61..ee23d7c2 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala @@ -4,6 +4,7 @@ import java.util.concurrent.CompletableFuture import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{ConsumerBuilder, ReaderBuilder, TypedMessageBuilder} +import org.apache.pulsar.client.api.transaction.Transaction import scala.collection.JavaConverters.iterableAsScalaIterableConverter import scala.compat.java8.FutureConverters @@ -32,11 +33,6 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut override def createReader[T](builder: ReaderBuilder[T]): Future[Reader[T]] = builder.createAsync().thenApply[Reader[T]](new DefaultReader(_)).toScala - override def send[T](t: T, producer: api.Producer[T]): Future[MessageId] = { - val future = producer.sendAsync(t) - FutureConverters.toScala(future).map(MessageId.fromJava) - } - override def receive[T](consumer: api.Consumer[T]): Future[ConsumerMessage[T]] = { val future = consumer.receiveAsync() FutureConverters.toScala(future).map(ConsumerMessage.fromJava) @@ -75,12 +71,18 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] = consumer.acknowledgeAsync(messageId).toScala - override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] = - Future.successful(consumer.negativeAcknowledge(messageId)) + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] = + consumer.acknowledgeAsync(messageId, txn).toScala override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] = consumer.acknowledgeCumulativeAsync(messageId).toScala + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] = + consumer.acknowledgeCumulativeAsync(messageId, txn).toScala + + override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] = + Future.successful(consumer.negativeAcknowledge(messageId)) + override def close(reader: api.Reader[_]): Future[Unit] = reader.closeAsync().toScala override def flush(producer: api.Producer[_]): Future[Unit] = producer.flushAsync().toScala @@ -92,4 +94,24 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut override def send[T](builder: TypedMessageBuilder[T]): Future[MessageId] = builder.sendAsync().toScala.map(MessageId.fromJava) + + override def withTransaction[E, A]( + builder: api.transaction.TransactionBuilder, + action: TransactionContext => Future[Either[E, A]] + ): Future[Either[E, A]] = { + startTransaction(builder).flatMap { txn => + Future.unit.flatMap(_ => action(txn)).transformWith { + case Success(Right(value)) => + txn.commit.transform(_ => Success(Right(value))) + case result => + txn.abort.transform(_ => result) + } + } + } + + override def startTransaction(builder: api.transaction.TransactionBuilder): Future[TransactionContext] = + builder.build().toScala.map(TransactionContext(_)) + override def commitTransaction(txn: Transaction): Future[Unit] = txn.commit().toScala.map(_ => ()) + override def abortTransaction(txn: Transaction): Future[Unit] = txn.abort().toScala.map(_ => ()) + } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Producer.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Producer.scala index 7afd8539..8d49753d 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Producer.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Producer.scala @@ -3,11 +3,40 @@ package com.sksamuel.pulsar4s import java.io.Closeable import com.sksamuel.exts.Logging import org.apache.pulsar.client.api.{ProducerStats, TypedMessageBuilder} +import org.apache.pulsar.client.api.transaction.Transaction import scala.language.higherKinds import scala.util.Try -trait Producer[T] extends Closeable with Logging { +/** + * Operations on the producer that may be used in a transactional context. + */ +trait TransactionalProducerOps[T] { + + /** + * Asynchronously sends a message of type T, returning an effect + * which will be completed with the [[MessageId]] once the message + * is acknowledged by the Pulsar broker. + * + * This method can be used when you have no need to set the + * other properties of a message, such as the event time, key, + * headers and so on. The producer will generate an appropriate + * Pulsar [[ProducerMessage]] with this t set as the value. + */ + def sendAsync[F[_] : AsyncHandler](t: T): F[MessageId] + + /** + * Asynchronously sends a [[ProducerMessage]] of type T, returning an effect + * which will be completed with the [[MessageId]] once the message + * is acknowledged by the Pulsar broker. + * + * This method can be used when you want to specify properties + * on a message such as the event time, key and so on. + */ + def sendAsync[F[_] : AsyncHandler](msg: ProducerMessage[T]): F[MessageId] +} + +trait Producer[T] extends Closeable with Logging with TransactionalProducerOps[T] { /** * Returns the [[ProducerName]] which could have been specified @@ -39,28 +68,6 @@ trait Producer[T] extends Closeable with Logging { */ def send(msg: ProducerMessage[T]): Try[MessageId] - /** - * Asynchronously sends a message of type T, returning an effect - * which will be completed with the [[MessageId]] once the message - * is acknowledged by the Pulsar broker. - * - * This method can be used when you have no need to set the - * other properties of a message, such as the event time, key, - * headers and so on. The producer will generate an appropriate - * Pulsar [[ProducerMessage]] with this t set as the value. - */ - def sendAsync[F[_] : AsyncHandler](t: T): F[MessageId] - - /** - * Asynchronously sends a [[ProducerMessage]] of type T, returning an effect - * which will be completed with the [[MessageId]] once the message - * is acknowledged by the Pulsar broker. - * - * This method can be used when you want to specify properties - * on a message such as the event time, key and so on. - */ - def sendAsync[F[_] : AsyncHandler](msg: ProducerMessage[T]): F[MessageId] - /** * Get the last sequence id that was published by this producer. * @@ -106,14 +113,19 @@ trait Producer[T] extends Closeable with Logging { def flush(): Unit def flushAsync[F[_] : AsyncHandler]: F[Unit] + + /** + * Get an instance of `TransactionalProducerOps` that provides transactional operations on the producer. + */ + def tx(implicit ctx: TransactionContext): TransactionalProducerOps[T] } class DefaultProducer[T](producer: JProducer[T]) extends Producer[T] { override def name: ProducerName = ProducerName(producer.getProducerName) - override def send(t: T): Try[MessageId] = Try(MessageId.fromJava(producer.send(t))) - override def sendAsync[F[_] : AsyncHandler](t: T): F[MessageId] = AsyncHandler[F].send(t, producer) + override final def send(t: T): Try[MessageId] = send(ProducerMessage(t)) + override final def sendAsync[F[_] : AsyncHandler](t: T): F[MessageId] = sendAsync(ProducerMessage(t)) override def send(msg: ProducerMessage[T]): Try[MessageId] = Try(buildMessage(msg).send()) override def sendAsync[F[_] : AsyncHandler](msg: ProducerMessage[T]): F[MessageId] = AsyncHandler[F].send(buildMessage(msg)) @@ -134,13 +146,21 @@ class DefaultProducer[T](producer: JProducer[T]) extends Producer[T] { override def closeAsync[F[_] : AsyncHandler]: F[Unit] = AsyncHandler[F].close(producer) - private def buildMessage(msg: ProducerMessage[T]) = new ProducerMessageBuilder(producer).build(msg) + override def tx(implicit ctx: TransactionContext): TransactionalProducerOps[T] = + new DefaultTransactionalProducer[T](producer, ctx.transaction) + + protected def buildMessage(msg: ProducerMessage[T]) = new ProducerMessageBuilder(producer, None).build(msg) +} + +class DefaultTransactionalProducer[T](producer: JProducer[T], transaction: Transaction) extends DefaultProducer[T](producer) { + override protected def buildMessage(msg: ProducerMessage[T]) = new ProducerMessageBuilder[T](producer, Some(transaction)).build(msg) } -class ProducerMessageBuilder[T](producer: JProducer[T]) { +class ProducerMessageBuilder[T](producer: JProducer[T], transaction: Option[Transaction]) { def build(msg: ProducerMessage[T]): TypedMessageBuilder[T] = { import scala.collection.JavaConverters._ - val builder = producer.newMessage().value(msg.value) + + val builder = transaction.fold(producer.newMessage())(producer.newMessage).value(msg.value) msg.deliverAt.foreach { da => builder.deliverAt(da) } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClient.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClient.scala index 9edd519d..cc042aef 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClient.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClient.scala @@ -5,9 +5,13 @@ import java.util.{UUID, Set => JSet} import com.sksamuel.exts.Logging import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{ConsumerBuilder, ProducerBuilder, ReaderBuilder, Schema} +import org.apache.pulsar.client.api.transaction.Transaction import scala.collection.JavaConverters._ +import scala.concurrent.duration._ import scala.language.higherKinds +import scala.util.Success +import java.util.concurrent.CompletableFuture case class Topic(name: String) @@ -22,6 +26,88 @@ object Subscription { def generate: Subscription = Subscription(UUID.randomUUID.toString) } +sealed trait TransactionContext { + /** + * The underlying transaction associated with this context. + */ + def transaction: Transaction + + /** + * Explicitly commit the transaction. Note that this action must occur _after_ all other actions on the transaction. + */ + def commit[F[_]: AsyncHandler]: F[Unit] + + /** + * Explicitly abort the transaction. Note that this action must occur _after_ all other actions on the transaction. + */ + def abort[F[_]: AsyncHandler]: F[Unit] + + + /** + * Get an instance of `TransactionalConsumerOps` that provides transactional operations on the consumer. + */ + final def apply[T](consumer: Consumer[T]): TransactionalConsumerOps[T] = consumer.tx(this) + + /** + * Get an instance of `TransactionalProducerOps` that provides transactional operations on the producer. + */ + final def apply[T](producer: Producer[T]): TransactionalProducerOps[T] = producer.tx(this) +} +object TransactionContext { + def apply(txn: Transaction): TransactionContext = new TransactionContext { + lazy val transaction: Transaction = txn + def commit[F[_]: AsyncHandler]: F[Unit] = implicitly[AsyncHandler[F]].commitTransaction(txn) + def abort[F[_]: AsyncHandler]: F[Unit] = implicitly[AsyncHandler[F]].abortTransaction(txn) + } +} + +sealed trait TransactionBuilder { + /** + * Start a transaction. + */ + def start[F[_]: AsyncHandler]: F[TransactionContext] + + /** + * Return a new builder with the given timeout. + */ + def withTimeout(timeout: FiniteDuration): TransactionBuilder + + /** + * Given a `TransactionContext => F[A]`, produce an `F[A]` that runs in a new transaction. + * + * If `F` fails, abort the transaction. Otherwise commit the transaction. + */ + def runWith[A, F[_]: AsyncHandler](action: TransactionContext => F[A]): F[A] + + /** + * Given a `TransactionContext => F[Either[E, A]]`, produce an `F[Either[E, A]]` that runs in a new transaction. + * + * If `F` fails or the result is a `Left`, abort the transaction. Otherwise commit the transaction. + */ + def runWithEither[E, A, F[_]: AsyncHandler](action: TransactionContext => F[Either[E, A]]): F[Either[E, A]] +} + +private class TransactionBuilderImpl( + client: org.apache.pulsar.client.api.PulsarClient, + timeout: FiniteDuration = 60.seconds +) extends TransactionBuilder { + private def javaBuilder: api.transaction.TransactionBuilder = + client.newTransaction().withTransactionTimeout(timeout.length, timeout.unit) + + override def start[F[_]: AsyncHandler]: F[TransactionContext] = + implicitly[AsyncHandler[F]].startTransaction(javaBuilder) + override def withTimeout(timeout: FiniteDuration): TransactionBuilder = + new TransactionBuilderImpl(client, timeout) + override def runWith[T, F[_]: AsyncHandler](action: TransactionContext => F[T]): F[T] = { + val async = implicitly[AsyncHandler[F]] + async.transform(runWithEither { ctx => + async.transform[T, Either[T, T]](action(ctx))(r => Success(Right(r))) + })(r => Success(r.merge)) + } + override def runWithEither[E, A, F[_]: AsyncHandler](action: TransactionContext => F[Either[E,A]]): F[Either[E,A]] = + implicitly[AsyncHandler[F]].withTransaction(javaBuilder, action) +} + trait PulsarClient { def close(): Unit def producer[T: Schema](config: ProducerConfig, interceptors: List[ProducerInterceptor[T]] = Nil): Producer[T] @@ -37,6 +123,8 @@ trait PulsarAsyncClient extends PulsarClient { def consumerAsync[T: Schema, F[_] : AsyncHandler](config: ConsumerConfig, interceptors: List[ConsumerInterceptor[T]] = Nil): F[Consumer[T]] def readerAsync[T: Schema, F[_] : AsyncHandler](config: ReaderConfig): F[Reader[T]] + + def transaction: TransactionBuilder } trait ProducerInterceptor[T] extends AutoCloseable { @@ -125,6 +213,7 @@ object PulsarClient { config.keepAliveInterval.map(_.toSeconds.toInt).foreach(builder.keepAliveInterval(_, TimeUnit.SECONDS)) config.statsInterval.map(_.toMillis).foreach(builder.statsInterval(_, TimeUnit.MILLISECONDS)) config.tlsTrustCertsFilePath.foreach(builder.tlsTrustCertsFilePath) + config.enableTransaction.foreach(builder.enableTransaction) if (config.additionalProperties.nonEmpty) builder.loadConf(config.additionalProperties.asJava) @@ -245,4 +334,6 @@ class DefaultPulsarClient(client: org.apache.pulsar.client.api.PulsarClient) ext override def readerAsync[T: Schema, F[_] : AsyncHandler](config: ReaderConfig): F[Reader[T]] = { implicitly[AsyncHandler[F]].createReader(readerBuilder(config)) } + + override def transaction: TransactionBuilder = new TransactionBuilderImpl(client) } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClientConfig.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClientConfig.scala index a9fb1ca0..df3cc4c9 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClientConfig.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/PulsarClientConfig.scala @@ -19,4 +19,5 @@ case class PulsarClientConfig(serviceUrl: String, maxLookupRequests: Option[Int] = None, tlsTrustCertsFilePath: Option[String] = None, ioThreads: Option[Int] = None, + enableTransaction: Option[Boolean] = None, additionalProperties: Map[String, AnyRef] = Map.empty) diff --git a/pulsar4s-core/src/test/scala/com/sksamuel/pulsar4s/FutureAsyncHandlerTest.scala b/pulsar4s-core/src/test/scala/com/sksamuel/pulsar4s/FutureAsyncHandlerTest.scala index e46b5628..f4697955 100644 --- a/pulsar4s-core/src/test/scala/com/sksamuel/pulsar4s/FutureAsyncHandlerTest.scala +++ b/pulsar4s-core/src/test/scala/com/sksamuel/pulsar4s/FutureAsyncHandlerTest.scala @@ -6,9 +6,11 @@ import org.apache.pulsar.client.api.Schema import org.scalatest.BeforeAndAfterAll import scala.concurrent.Await -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import scala.util.Random + class FutureAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterAll { @@ -16,7 +18,10 @@ class FutureAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAft implicit val schema: Schema[String] = Schema.STRING - private val client = PulsarClient("pulsar://localhost:6650") + private val client = PulsarClient(PulsarClientConfig( + serviceUrl = "pulsar://localhost:6650", + enableTransaction = Some(true) + )) private val topic = Topic("persistent://sample/standalone/ns1/futureasync_" + UUID.randomUUID()) override def afterAll(): Unit = { @@ -49,4 +54,58 @@ class FutureAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAft zipped.foreach(t => t._1 shouldBe t._2) consumer.close() } + + test("async producer and consumer can participate in transaction") { + val producer = client.producer(ProducerConfig(topic, sendTimeout = Some(Duration.Zero))) + val consumer = client.consumer(ConsumerConfig(topics = Seq(topic), subscriptionName = Subscription("mysub_" + UUID.randomUUID))) + consumer.seekEarliest() + val msgIdFt = client.transaction.runWith { txn => + for { + msg <- consumer.receiveAsync + msgId <- txn(producer).sendAsync(msg.value + "_test") + _ <- txn(consumer).acknowledgeAsync(msg.messageId) + } yield { + msgId + } + } + Await.result(msgIdFt, Duration.Inf) + consumer.close() + producer.close() + } + + test("async producer and consumer can participate in manually managed transaction") { + val producer = client.producer(ProducerConfig(topic, sendTimeout = Some(Duration.Zero))) + val consumer = client.consumer(ConsumerConfig(topics = Seq(topic), subscriptionName = Subscription("mysub_" + UUID.randomUUID))) + consumer.seekEarliest() + val msgIdFt = for { + txn <- client.transaction.start + msg <- consumer.receiveAsync + msgId <- txn(producer).sendAsync(msg.value + "_test") + _ <- txn(consumer).acknowledgeAsync(msg.messageId) + _ <- txn.commit + } yield { + msgId + } + Await.result(msgIdFt, Duration.Inf) + consumer.close() + producer.close() + } + + test("async producer and consumer can participate in transaction returning either") { + val producer = client.producer(ProducerConfig(topic, sendTimeout = Some(Duration.Zero))) + val consumer = client.consumer(ConsumerConfig(topics = Seq(topic), subscriptionName = Subscription("mysub_" + UUID.randomUUID))) + consumer.seekEarliest() + val msgIdEitherFt = client.transaction.withTimeout(timeout = 1.second).runWithEither { implicit txn => + for { + msg <- consumer.receiveAsync + msgId <- producer.tx.sendAsync(msg.value + "_test") + _ <- consumer.tx.acknowledgeAsync(msg.messageId) + } yield { + if (Random.nextBoolean()) Right(msgId) else Left(s"failed: $msgId") + } + } + Await.result(msgIdEitherFt, Duration.Inf) + consumer.close() + producer.close() + } } diff --git a/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala b/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala index c54dd5f8..4d0d13fe 100644 --- a/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala +++ b/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala @@ -3,16 +3,18 @@ package com.sksamuel.pulsar4s.monixs import java.util.concurrent.CompletableFuture import com.sksamuel.pulsar4s -import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer} +import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer, TransactionContext} import monix.eval.Task import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, TypedMessageBuilder} +import org.apache.pulsar.client.api.transaction.Transaction import scala.collection.JavaConverters.iterableAsScalaIterableConverter import scala.compat.java8.FutureConverters import scala.concurrent.Future import scala.language.implicitConversions import scala.util.{Failure, Success, Try} +import cats.effect.ExitCase class MonixAsyncHandler extends AsyncHandler[Task] { @@ -33,13 +35,6 @@ class MonixAsyncHandler extends AsyncHandler[Task] { override def createReader[T](builder: ReaderBuilder[T]): Task[pulsar4s.Reader[T]] = Task.deferFuture(FutureConverters.toScala(builder.createAsync())).map(new DefaultReader(_)) - override def send[T](t: T, producer: api.Producer[T]): Task[MessageId] = { - Task.deferFuture { - val future = producer.sendAsync(t) - FutureConverters.toScala(future) - }.map { id => MessageId.fromJava(id) } - } - override def receive[T](consumer: api.Consumer[T]): Task[ConsumerMessage[T]] = { Task.deferFuture { val future = consumer.receiveAsync() @@ -85,9 +80,15 @@ class MonixAsyncHandler extends AsyncHandler[Task] { override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeAsync(messageId) + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + consumer.acknowledgeAsync(messageId, txn) + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeCumulativeAsync(messageId) + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + consumer.acknowledgeCumulativeAsync(messageId, txn) + override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = Task { consumer.negativeAcknowledge(messageId) } @@ -104,6 +105,24 @@ class MonixAsyncHandler extends AsyncHandler[Task] { override def send[T](builder: TypedMessageBuilder[T]): Task[MessageId] = Task.deferFuture(builder.sendAsync()).map(MessageId.fromJava) + + override def withTransaction[E, A]( + builder: api.transaction.TransactionBuilder, + action: TransactionContext => Task[Either[E, A]] + ): Task[Either[E, A]] = { + startTransaction(builder).bracketCase { txn => + action(txn).flatMap { result => + (if (result.isRight) txn.commit(this) else txn.abort(this)).map(_ => result) + } + }((txn, exitCase) => if (exitCase == ExitCase.Completed) Task.unit else txn.abort(this)) + } + + override def startTransaction(builder: api.transaction.TransactionBuilder): Task[TransactionContext] = + Task.deferFuture(builder.build()).map(TransactionContext(_)) + override def commitTransaction(txn: Transaction): Task[Unit] = + Task.deferFuture(txn.commit()).map(_ => ()) + override def abortTransaction(txn: Transaction): Task[Unit] = + Task.deferFuture(txn.abort()).map(_ => ()) } object MonixAsyncHandler { diff --git a/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala b/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala index f24e68d2..583472e5 100644 --- a/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala +++ b/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala @@ -7,7 +7,7 @@ import org.apache.pulsar.client.api.Schema import org.scalatest.BeforeAndAfterAll import scala.concurrent.Await -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -18,7 +18,10 @@ class MonixAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfte implicit val schema: Schema[String] = Schema.STRING - val client: PulsarAsyncClient = PulsarClient("pulsar://localhost:6650") + private val client: PulsarAsyncClient = PulsarClient(PulsarClientConfig( + serviceUrl = "pulsar://localhost:6650", + enableTransaction = Some(true) + )) val topic: Topic = Topic("persistent://sample/standalone/ns1/monix_" + UUID.randomUUID()) override def afterAll(): Unit = { @@ -55,4 +58,19 @@ class MonixAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfte r.partitionIndex shouldBe value.messageId.partitionIndex consumer.close() } + + test("producer and consumer can execute a transaction using cats") { + val producer = client.producer(ProducerConfig(topic, sendTimeout = Some(Duration.Zero))) + val consumer = client.consumer(ConsumerConfig(topics = Seq(topic), subscriptionName = Subscription("mysub_" + UUID.randomUUID))) + consumer.seekEarliest() + val msgIdIO = client.transaction.withTimeout(1.second).runWith { implicit txn => + for { + msg <- consumer.receiveAsync + msgId <- producer.tx.sendAsync(msg.value + "_test") + _ <- consumer.tx.acknowledgeAsync(msg.messageId) + } yield msgId + } + Await.result(msgIdIO.runToFuture, Duration.Inf) + consumer.close() + } } diff --git a/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala b/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala index 2539bf9d..8a7c9a5e 100644 --- a/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala +++ b/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala @@ -3,9 +3,10 @@ package com.sksamuel.pulsar4s.scalaz import java.util.concurrent.CompletableFuture import com.sksamuel.pulsar4s -import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer} +import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer, TransactionContext} import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, TypedMessageBuilder} +import org.apache.pulsar.client.api.transaction.Transaction import scalaz.concurrent.Task import scala.collection.JavaConverters.iterableAsScalaIterableConverter @@ -39,9 +40,6 @@ class ScalazAsyncHandler extends AsyncHandler[Task] { override def createReader[T](builder: ReaderBuilder[T]): Task[pulsar4s.Reader[T]] = completableToTask(builder.createAsync()).map(new DefaultReader(_)) - override def send[T](t: T, producer: api.Producer[T]): Task[MessageId] = - completableToTask(producer.sendAsync(t)).map(MessageId.fromJava) - override def receive[T](consumer: api.Consumer[T]): Task[ConsumerMessage[T]] = completableToTask(consumer.receiveAsync).map(ConsumerMessage.fromJava) @@ -73,9 +71,15 @@ class ScalazAsyncHandler extends AsyncHandler[Task] { override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeAsync(messageId) + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + consumer.acknowledgeAsync(messageId, txn) + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeCumulativeAsync(messageId) + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + consumer.acknowledgeCumulativeAsync(messageId, txn) + override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = Task { consumer.negativeAcknowledge(messageId) } @@ -94,6 +98,24 @@ class ScalazAsyncHandler extends AsyncHandler[Task] { override def send[T](builder: TypedMessageBuilder[T]): Task[MessageId] = builder.sendAsync().map(MessageId.fromJava) + + override def withTransaction[E, A]( + builder: api.transaction.TransactionBuilder, + action: TransactionContext => Task[Either[E, A]] + ): Task[Either[E, A]] = { + def close[T](txn: TransactionContext, commit: Boolean, result: T): Task[T] = + (if (commit) txn.commit(this) else txn.abort(this)).map(_ => result) + startTransaction(builder).flatMap { txn => + action(txn) + .flatMap(result => (if (result.isRight) txn.commit(this) else txn.abort(this)).map(_ => result)) + .onFinish(errorOpt => if (errorOpt.isDefined) txn.abort(this) else Task.now(())) + } + } + + override def startTransaction(builder: api.transaction.TransactionBuilder): Task[TransactionContext] = + builder.build().map(TransactionContext(_)) + override def commitTransaction(txn: Transaction): Task[Unit] = txn.commit() + override def abortTransaction(txn: Transaction): Task[Unit] = txn.abort() } object ScalazAsyncHandler { diff --git a/pulsar4s-zio/src/main/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandler.scala b/pulsar4s-zio/src/main/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandler.scala index eb4d58e3..e3adc21b 100644 --- a/pulsar4s-zio/src/main/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandler.scala +++ b/pulsar4s-zio/src/main/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandler.scala @@ -3,10 +3,11 @@ package com.sksamuel.pulsar4s.zio import java.util.concurrent.CompletionStage import com.sksamuel.pulsar4s -import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer} +import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer, TransactionContext} import org.apache.pulsar.client.api import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, TypedMessageBuilder} -import zio.{Task, ZIO} +import org.apache.pulsar.client.api.transaction.Transaction +import zio.{Exit, Task, ZIO} import scala.collection.JavaConverters._ import scala.util.Try @@ -31,9 +32,6 @@ class ZioAsyncHandler extends AsyncHandler[Task] { override def createReader[T](builder: ReaderBuilder[T]): Task[pulsar4s.Reader[T]] = fromFuture(Task(builder.createAsync())) >>= (p => Task(new DefaultReader(p))) - override def send[T](t: T, producer: api.Producer[T]): Task[MessageId] = - fromFuture(Task(producer.sendAsync(t))).map(MessageId.fromJava) - override def send[T](builder: TypedMessageBuilder[T]): Task[MessageId] = fromFuture(Task(builder.sendAsync())).map(MessageId.fromJava) @@ -82,12 +80,36 @@ class ZioAsyncHandler extends AsyncHandler[Task] { override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = fromFuture(Task(consumer.acknowledgeAsync(messageId))).unit - override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = - Task(consumer.negativeAcknowledge(messageId)) + override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + fromFuture(Task(consumer.acknowledgeAsync(messageId, txn))).unit override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = fromFuture(Task(consumer.acknowledgeCumulativeAsync(messageId))).unit + + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] = + fromFuture(Task(consumer.acknowledgeCumulativeAsync(messageId, txn))).unit + + override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = + Task(consumer.negativeAcknowledge(messageId)) + override def withTransaction[E, A]( + builder: api.transaction.TransactionBuilder, + action: TransactionContext => Task[Either[E, A]] + ): Task[Either[E, A]] = { + Task.bracketExit[TransactionContext, Either[E, A]]( + acquire = startTransaction(builder), + release = { + case (txn, Exit.Success(Right(_))) => txn.commit(this).ignore + case (txn, _) => txn.abort(this).ignore + }, + use = action + ) + } + + override def startTransaction(builder: api.transaction.TransactionBuilder): Task[TransactionContext] = + fromFuture(Task(builder.build())).map(TransactionContext(_)) + override def commitTransaction(txn: Transaction): Task[Unit] = fromFuture(Task(txn.commit())).unit + override def abortTransaction(txn: Transaction): Task[Unit] = fromFuture(Task(txn.abort())).unit } object ZioAsyncHandler { diff --git a/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala b/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala index 2aecd2c1..c33ded68 100644 --- a/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala +++ b/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala @@ -1,13 +1,15 @@ package com.sksamuel.pulsar4s.zio -import com.sksamuel.pulsar4s.{ConsumerConfig, ProducerConfig, PulsarClient, Subscription, Topic} +import java.util.UUID +import com.sksamuel.pulsar4s.{ConsumerConfig, MessageId, ProducerConfig, PulsarClient, PulsarClientConfig, Subscription, Topic} import org.apache.pulsar.client.api.Schema import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import zio.Task -import java.util.UUID +import scala.concurrent.duration._ class ZioAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterAll with Eventually { @@ -15,7 +17,10 @@ class ZioAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterA implicit val schema: Schema[String] = Schema.STRING - private val client = PulsarClient("pulsar://localhost:6650") + private val client = PulsarClient(PulsarClientConfig( + serviceUrl = "pulsar://localhost:6650", + enableTransaction = Some(true) + )) private val topic = Topic("persistent://sample/standalone/ns1/zio_" + UUID.randomUUID()) override def afterAll(): Unit = { @@ -51,4 +56,20 @@ class ZioAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterA r.partitionIndex shouldBe value.messageId.partitionIndex consumer.close() } + + test("producer and consumer can execute a transaction using zio") { + val producer = client.producer(ProducerConfig(topic, sendTimeout = Some(Duration.Zero))) + val consumer = client.consumer(ConsumerConfig(topics = Seq(topic), subscriptionName = Subscription("mysub_" + UUID.randomUUID))) + consumer.seekEarliest() + val msgIdIO = client.transaction.withTimeout(1.second).runWith[MessageId, Task] { implicit txn => + for { + msg <- consumer.receiveAsync + msgId <- producer.tx.sendAsync(msg.value + "_test") + _ <- consumer.tx.acknowledgeAsync(msg.messageId) + } yield msgId + } + zio.Runtime.default.unsafeRun(msgIdIO) + consumer.close() + producer.close() + } }