Skip to content

Commit

Permalink
transaction support
Browse files Browse the repository at this point in the history
  • Loading branch information
gmethvin committed Jun 13, 2021
1 parent 17ee059 commit c71fbf1
Show file tree
Hide file tree
Showing 20 changed files with 475 additions and 94 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down Expand Up @@ -64,7 +64,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: run tests
run: sbt ++2.12.10 test
Expand All @@ -41,7 +41,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: run tests
run: sbt ++2.13.3 test
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down Expand Up @@ -66,7 +66,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down
2 changes: 0 additions & 2 deletions .sbtopts
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
-J-Xmx4G
-J-XX:MaxMetaspaceSize=1G
-J-XX:MaxPermSize=1G
-J-XX:+CMSClassUnloadingEnabled
18 changes: 18 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: "3"

services:
pulsar_standalone:
image: apachepulsar/pulsar:2.7.1
ports:
- "8080:8080"
- "6650:6650"
volumes:
- ./data:/pulsar/data
environment:
- PULSAR_PREFIX_transactionCoordinatorEnabled=true
command: >
/bin/bash -c "
bin/apply-config-from-env.py conf/standalone.conf
&& (bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone &)
&& bin/pulsar standalone
"
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.sksamuel.exts.Logging
import com.sksamuel.pulsar4s.Consumer
import com.sksamuel.pulsar4s.ConsumerMessage
import com.sksamuel.pulsar4s.MessageId
import com.sksamuel.pulsar4s.TransactionContext
import org.apache.pulsar.client.api.ConsumerStats

import scala.concurrent.ExecutionContext
Expand All @@ -23,10 +24,14 @@ import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal

trait CommittableMessage[T] {
def ack(cumulative: Boolean = false): Future[Done]
trait CommittableMessage[T] extends TransactionalCommittableMessageOps {
def nack(): Future[Done]
def message: ConsumerMessage[T]
def tx(implicit txn: TransactionContext): TransactionalCommittableMessageOps
}

trait TransactionalCommittableMessageOps {
def ack(cumulative: Boolean = false): Future[Done]
}

class PulsarCommittableSourceGraphStage[T](
Expand All @@ -44,18 +49,23 @@ class PulsarCommittableSourceGraphStage[T](

private class CommittableMessageImpl[T](
val consumer: Consumer[T],
val message: ConsumerMessage[T]
val message: ConsumerMessage[T],
val ctx: Option[TransactionContext] = None
)(implicit ec: ExecutionContext) extends CommittableMessage[T] {
def messageId: MessageId = message.messageId
override def ack(cumulative: Boolean): Future[Done] = {
logger.debug(s"Acknowledging message: $message")
val txnOps = ctx.map(consumer.tx(_)).getOrElse(consumer)
val ackFuture = if (cumulative) {
consumer.acknowledgeCumulativeAsync(message.messageId)
txnOps.acknowledgeCumulativeAsync(message.messageId)
} else {
consumer.acknowledgeAsync(message.messageId)
txnOps.acknowledgeAsync(message.messageId)
}
ackFuture.map(_ => Done)
}
override def tx(implicit ctx: TransactionContext): TransactionalCommittableMessageOps = {
new CommittableMessageImpl(consumer, message, Some(ctx))
}
override def nack(): Future[Done] = {
logger.debug(s"Negatively acknowledging message: $message")
consumer.negativeAcknowledgeAsync(message.messageId).map(_ => Done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.sksamuel.pulsar4s
import com.sksamuel.pulsar4s._
import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.{Consumer => _, MessageId => _, Producer => _, PulsarClient => _, Reader => _, _}
import org.apache.pulsar.client.api.transaction.Transaction

import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.concurrent.ExecutionException
Expand Down Expand Up @@ -131,14 +132,16 @@ trait CatsAsyncHandlerLowPriority {
}

override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
Async[F].delay {
consumer.acknowledgeAsync(messageId)
}.liftF.void
Async[F].delay(consumer.acknowledgeAsync(messageId)).liftF.void

override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] =
Async[F].delay(consumer.acknowledgeAsync(messageId, txn)).liftF.void

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
Async[F].delay {
consumer.acknowledgeCumulativeAsync(messageId)
}.liftF.void
Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId)).liftF.void

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] =
Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId, txn)).liftF.void

override def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
Async[F].delay {
Expand Down Expand Up @@ -169,6 +172,25 @@ trait CatsAsyncHandlerLowPriority {

override def send[T](builder: TypedMessageBuilder[T]): F[MessageId] =
Async[F].delay(builder.sendAsync()).liftF.map(MessageId.fromJava)

override def withTransaction[E, A](
builder: api.transaction.TransactionBuilder,
action: TransactionContext => F[Either[E, A]]
): F[Either[E, A]] = {
Resource.makeCase(startTransaction(builder)) { (txn, exitCase) =>
if (exitCase == ExitCase.Completed) Async[F].unit else txn.abort
}.use { txn =>
action(txn).flatMap { result =>
(if (result.isRight) txn.commit else txn.abort).map(_ => result)
}
}
}

def startTransaction(builder: api.transaction.TransactionBuilder): F[TransactionContext] =
Async[F].delay(builder.build()).liftF.map(TransactionContext(_))
def commitTransaction(txn: Transaction): F[Unit] = Async[F].delay(txn.commit()).liftF.map(_ => ())
def abortTransaction(txn: Transaction): F[Unit] = Async[F].delay(txn.abort()).liftF.map(_ => ())

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._
import scala.util.Random

class CatsAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterAll {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.sksamuel.pulsar4s

import java.util.concurrent.CompletableFuture

import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.TypedMessageBuilder
import org.apache.pulsar.client.api.transaction.Transaction

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
Expand All @@ -15,7 +18,9 @@ trait AsyncHandler[F[_]] {
def createConsumer[T](builder: api.ConsumerBuilder[T]): F[Consumer[T]]
def createReader[T](builder: api.ReaderBuilder[T]): F[Reader[T]]

def send[T](t: T, producer: api.Producer[T]): F[MessageId]
@deprecated("Use send(builder) instead", "2.8.0")
def send[T](t: T, producer: api.Producer[T]): F[MessageId] = send(producer.newMessage().value(t))

def send[T](builder: TypedMessageBuilder[T]): F[MessageId]
def receive[T](consumer: api.Consumer[T]): F[ConsumerMessage[T]]
def receiveBatch[T](consumer: api.Consumer[T]): F[Vector[ConsumerMessage[T]]]
Expand All @@ -39,8 +44,18 @@ trait AsyncHandler[F[_]] {
def getLastMessageId[T](consumer: api.Consumer[T]): F[MessageId]

def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit]
def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit]
def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]

def withTransaction[E, A](
builder: api.transaction.TransactionBuilder,
action: TransactionContext => F[Either[E, A]]
): F[Either[E, A]]
def startTransaction(builder: api.transaction.TransactionBuilder): F[TransactionContext]
def commitTransaction(txn: Transaction): F[Unit]
def abortTransaction(txn: Transaction): F[Unit]
}

object AsyncHandler {
Expand Down
45 changes: 34 additions & 11 deletions pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,27 @@ import java.util.concurrent.TimeUnit

import com.sksamuel.exts.Logging
import org.apache.pulsar.client.api.ConsumerStats
import org.apache.pulsar.client.api.transaction.Transaction

import scala.concurrent.duration.FiniteDuration
import scala.util.Try

trait Consumer[T] extends Closeable {
/**
* Operations on the consumer that may be used in a transactional context.
*/
trait TransactionalConsumerOps[T] {
final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeAsync(message.messageId)

def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeCumulativeAsync(message.messageId)

def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]
}

trait Consumer[T] extends Closeable with TransactionalConsumerOps[T] {

/**
* Receives a single message.
Expand Down Expand Up @@ -65,23 +81,18 @@ trait Consumer[T] extends Closeable {
def acknowledgeCumulative(message: ConsumerMessage[T]): Unit
def acknowledgeCumulative(messageId: MessageId): Unit

final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeAsync(message.messageId)

def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def negativeAcknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
negativeAcknowledgeAsync(message.messageId)

def negativeAcknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeCumulativeAsync(message.messageId)

def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

def unsubscribe(): Unit
def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit]

/**
* Get an instance of `TransactionalConsumerOps` that provides transactional operations on the consumer.
*/
def tx(implicit ctx: TransactionContext): TransactionalConsumerOps[T]
}

class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Logging {
Expand Down Expand Up @@ -109,6 +120,7 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin

override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId)

override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId)

Expand Down Expand Up @@ -142,4 +154,15 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin

override def unsubscribe(): Unit = consumer.unsubscribe()
override def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit] = implicitly[AsyncHandler[F]].unsubscribeAsync(consumer)

override def tx(implicit ctx: TransactionContext): TransactionalConsumerOps[T] =
new DefaultTransactionalConsumer[T](consumer, ctx.transaction)
}

class DefaultTransactionalConsumer[T](consumer: JConsumer[T], transaction: Transaction) extends TransactionalConsumerOps[T] {
override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId, transaction)

override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId, transaction)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.concurrent.CompletableFuture

import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.{ConsumerBuilder, ReaderBuilder, TypedMessageBuilder}
import org.apache.pulsar.client.api.transaction.Transaction

import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.compat.java8.FutureConverters
Expand Down Expand Up @@ -32,11 +33,6 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
override def createReader[T](builder: ReaderBuilder[T]): Future[Reader[T]] =
builder.createAsync().thenApply[Reader[T]](new DefaultReader(_)).toScala

override def send[T](t: T, producer: api.Producer[T]): Future[MessageId] = {
val future = producer.sendAsync(t)
FutureConverters.toScala(future).map(MessageId.fromJava)
}

override def receive[T](consumer: api.Consumer[T]): Future[ConsumerMessage[T]] = {
val future = consumer.receiveAsync()
FutureConverters.toScala(future).map(ConsumerMessage.fromJava)
Expand Down Expand Up @@ -75,12 +71,18 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] =
consumer.acknowledgeAsync(messageId).toScala

override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] =
Future.successful(consumer.negativeAcknowledge(messageId))
override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] =
consumer.acknowledgeAsync(messageId, txn).toScala

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] =
consumer.acknowledgeCumulativeAsync(messageId).toScala

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] =
consumer.acknowledgeCumulativeAsync(messageId, txn).toScala

override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] =
Future.successful(consumer.negativeAcknowledge(messageId))

override def close(reader: api.Reader[_]): Future[Unit] = reader.closeAsync().toScala
override def flush(producer: api.Producer[_]): Future[Unit] = producer.flushAsync().toScala

Expand All @@ -92,4 +94,24 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut

override def send[T](builder: TypedMessageBuilder[T]): Future[MessageId] =
builder.sendAsync().toScala.map(MessageId.fromJava)

override def withTransaction[E, A](
builder: api.transaction.TransactionBuilder,
action: TransactionContext => Future[Either[E, A]]
): Future[Either[E, A]] = {
startTransaction(builder).flatMap { txn =>
Future.unit.flatMap(_ => action(txn)).transformWith {
case Success(Right(value)) =>
txn.commit.transform(_ => Success(Right(value)))
case result =>
txn.abort.transform(_ => result)
}
}
}

override def startTransaction(builder: api.transaction.TransactionBuilder): Future[TransactionContext] =
builder.build().toScala.map(TransactionContext(_))
override def commitTransaction(txn: Transaction): Future[Unit] = txn.commit().toScala.map(_ => ())
override def abortTransaction(txn: Transaction): Future[Unit] = txn.abort().toScala.map(_ => ())

}
Loading

0 comments on commit c71fbf1

Please sign in to comment.