Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transaction support #307

Merged
merged 1 commit into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down Expand Up @@ -64,7 +64,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: run tests
run: sbt ++2.12.10 test
Expand All @@ -41,7 +41,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: run tests
run: sbt ++2.13.3 test
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down Expand Up @@ -66,7 +66,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down
2 changes: 0 additions & 2 deletions .sbtopts
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
-J-Xmx4G
-J-XX:MaxMetaspaceSize=1G
-J-XX:MaxPermSize=1G
-J-XX:+CMSClassUnloadingEnabled
5 changes: 3 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
version: '2'
version: "3"

services:

standalone:
image: apachepulsar/pulsar:2.8.0
ports:
- "6650:6650"
- "8080:8080"
environment:
- BOOKIE_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"
- transactionCoordinatorEnabled=true
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/standalone.conf
&& bin/pulsar standalone --advertised-address standalone"
dashboard:
profiles: ["dashboard"]
image: apachepulsar/pulsar-dashboard
depends_on:
- standalone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.sksamuel.exts.Logging
import com.sksamuel.pulsar4s.Consumer
import com.sksamuel.pulsar4s.ConsumerMessage
import com.sksamuel.pulsar4s.MessageId
import com.sksamuel.pulsar4s.TransactionContext
import org.apache.pulsar.client.api.ConsumerStats

import scala.concurrent.ExecutionContext
Expand All @@ -23,10 +24,14 @@ import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal

trait CommittableMessage[T] {
def ack(cumulative: Boolean = false): Future[Done]
trait CommittableMessage[T] extends TransactionalCommittableMessageOps {
def nack(): Future[Done]
def message: ConsumerMessage[T]
def tx(implicit txn: TransactionContext): TransactionalCommittableMessageOps
}

trait TransactionalCommittableMessageOps {
def ack(cumulative: Boolean = false): Future[Done]
}

class PulsarCommittableSourceGraphStage[T](
Expand All @@ -44,18 +49,23 @@ class PulsarCommittableSourceGraphStage[T](

private class CommittableMessageImpl[T](
val consumer: Consumer[T],
val message: ConsumerMessage[T]
val message: ConsumerMessage[T],
val ctx: Option[TransactionContext] = None
)(implicit ec: ExecutionContext) extends CommittableMessage[T] {
def messageId: MessageId = message.messageId
override def ack(cumulative: Boolean): Future[Done] = {
logger.debug(s"Acknowledging message: $message")
val txnOps = ctx.map(consumer.tx(_)).getOrElse(consumer)
val ackFuture = if (cumulative) {
consumer.acknowledgeCumulativeAsync(message.messageId)
txnOps.acknowledgeCumulativeAsync(message.messageId)
} else {
consumer.acknowledgeAsync(message.messageId)
txnOps.acknowledgeAsync(message.messageId)
}
ackFuture.map(_ => Done)
}
override def tx(implicit ctx: TransactionContext): TransactionalCommittableMessageOps = {
new CommittableMessageImpl(consumer, message, Some(ctx))
}
override def nack(): Future[Done] = {
logger.debug(s"Negatively acknowledging message: $message")
consumer.negativeAcknowledgeAsync(message.messageId).map(_ => Done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.sksamuel.pulsar4s
import com.sksamuel.pulsar4s._
import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.{Consumer => _, MessageId => _, Producer => _, PulsarClient => _, Reader => _, _}
import org.apache.pulsar.client.api.transaction.Transaction

import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.concurrent.ExecutionException
Expand Down Expand Up @@ -131,14 +132,16 @@ trait CatsAsyncHandlerLowPriority {
}

override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
Async[F].delay {
consumer.acknowledgeAsync(messageId)
}.liftF.void
Async[F].delay(consumer.acknowledgeAsync(messageId)).liftF.void

override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] =
Async[F].delay(consumer.acknowledgeAsync(messageId, txn)).liftF.void

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
Async[F].delay {
consumer.acknowledgeCumulativeAsync(messageId)
}.liftF.void
Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId)).liftF.void

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] =
Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId, txn)).liftF.void

override def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
Async[F].delay {
Expand Down Expand Up @@ -169,6 +172,25 @@ trait CatsAsyncHandlerLowPriority {

override def send[T](builder: TypedMessageBuilder[T]): F[MessageId] =
Async[F].delay(builder.sendAsync()).liftF.map(MessageId.fromJava)

override def withTransaction[E, A](
builder: api.transaction.TransactionBuilder,
action: TransactionContext => F[Either[E, A]]
): F[Either[E, A]] = {
Resource.makeCase(startTransaction(builder)) { (txn, exitCase) =>
if (exitCase == ExitCase.Completed) Async[F].unit else txn.abort
}.use { txn =>
action(txn).flatMap { result =>
(if (result.isRight) txn.commit else txn.abort).map(_ => result)
}
}
}

def startTransaction(builder: api.transaction.TransactionBuilder): F[TransactionContext] =
Async[F].delay(builder.build()).liftF.map(TransactionContext(_))
def commitTransaction(txn: Transaction): F[Unit] = Async[F].delay(txn.commit()).liftF.map(_ => ())
def abortTransaction(txn: Transaction): F[Unit] = Async[F].delay(txn.abort()).liftF.map(_ => ())

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import scala.language.higherKinds
import scala.concurrent.duration._
import scala.util.Random

class CatsAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterAll with Eventually {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.sksamuel.pulsar4s

import java.util.concurrent.CompletableFuture

import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.TypedMessageBuilder
import org.apache.pulsar.client.api.transaction.Transaction

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
Expand All @@ -15,7 +18,9 @@ trait AsyncHandler[F[_]] {
def createConsumer[T](builder: api.ConsumerBuilder[T]): F[Consumer[T]]
def createReader[T](builder: api.ReaderBuilder[T]): F[Reader[T]]

def send[T](t: T, producer: api.Producer[T]): F[MessageId]
@deprecated("Use send(builder) instead", "2.8.0")
def send[T](t: T, producer: api.Producer[T]): F[MessageId] = send(producer.newMessage().value(t))

def send[T](builder: TypedMessageBuilder[T]): F[MessageId]
def receive[T](consumer: api.Consumer[T]): F[ConsumerMessage[T]]
def receiveBatch[T](consumer: api.Consumer[T]): F[Vector[ConsumerMessage[T]]]
Expand All @@ -39,8 +44,18 @@ trait AsyncHandler[F[_]] {
def getLastMessageId[T](consumer: api.Consumer[T]): F[MessageId]

def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit]
def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit]
def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]

def withTransaction[E, A](
builder: api.transaction.TransactionBuilder,
action: TransactionContext => F[Either[E, A]]
): F[Either[E, A]]
def startTransaction(builder: api.transaction.TransactionBuilder): F[TransactionContext]
def commitTransaction(txn: Transaction): F[Unit]
def abortTransaction(txn: Transaction): F[Unit]
}

object AsyncHandler {
Expand Down
45 changes: 34 additions & 11 deletions pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,28 @@ import java.io.Closeable
import java.util.concurrent.TimeUnit
import com.sksamuel.exts.Logging
import org.apache.pulsar.client.api.ConsumerStats
import org.apache.pulsar.client.api.transaction.Transaction

import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds
import scala.util.Try

trait Consumer[T] extends Closeable {
/**
* Operations on the consumer that may be used in a transactional context.
*/
trait TransactionalConsumerOps[T] {
final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeAsync(message.messageId)

def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeCumulativeAsync(message.messageId)

def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]
}

trait Consumer[T] extends Closeable with TransactionalConsumerOps[T] {

/**
* Receives a single message.
Expand Down Expand Up @@ -65,23 +81,18 @@ trait Consumer[T] extends Closeable {
def acknowledgeCumulative(message: ConsumerMessage[T]): Unit
def acknowledgeCumulative(messageId: MessageId): Unit

final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeAsync(message.messageId)

def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def negativeAcknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
negativeAcknowledgeAsync(message.messageId)

def negativeAcknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeCumulativeAsync(message.messageId)

def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

def unsubscribe(): Unit
def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit]

/**
* Get an instance of `TransactionalConsumerOps` that provides transactional operations on the consumer.
*/
def tx(implicit ctx: TransactionContext): TransactionalConsumerOps[T]
}

class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Logging {
Expand Down Expand Up @@ -109,6 +120,7 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin

override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId)

override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId)

Expand Down Expand Up @@ -142,4 +154,15 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin

override def unsubscribe(): Unit = consumer.unsubscribe()
override def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit] = implicitly[AsyncHandler[F]].unsubscribeAsync(consumer)

override def tx(implicit ctx: TransactionContext): TransactionalConsumerOps[T] =
new DefaultTransactionalConsumer[T](consumer, ctx.transaction)
}

class DefaultTransactionalConsumer[T](consumer: JConsumer[T], transaction: Transaction) extends TransactionalConsumerOps[T] {
override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId, transaction)

override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId, transaction)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.concurrent.CompletableFuture

import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.{ConsumerBuilder, ReaderBuilder, TypedMessageBuilder}
import org.apache.pulsar.client.api.transaction.Transaction

import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.compat.java8.FutureConverters
Expand Down Expand Up @@ -32,11 +33,6 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
override def createReader[T](builder: ReaderBuilder[T]): Future[Reader[T]] =
builder.createAsync().thenApply[Reader[T]](new DefaultReader(_)).toScala

override def send[T](t: T, producer: api.Producer[T]): Future[MessageId] = {
val future = producer.sendAsync(t)
FutureConverters.toScala(future).map(MessageId.fromJava)
}

override def receive[T](consumer: api.Consumer[T]): Future[ConsumerMessage[T]] = {
val future = consumer.receiveAsync()
FutureConverters.toScala(future).map(ConsumerMessage.fromJava)
Expand Down Expand Up @@ -75,12 +71,18 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] =
consumer.acknowledgeAsync(messageId).toScala

override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] =
Future.successful(consumer.negativeAcknowledge(messageId))
override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] =
consumer.acknowledgeAsync(messageId, txn).toScala

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] =
consumer.acknowledgeCumulativeAsync(messageId).toScala

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] =
consumer.acknowledgeCumulativeAsync(messageId, txn).toScala

override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] =
Future.successful(consumer.negativeAcknowledge(messageId))

override def close(reader: api.Reader[_]): Future[Unit] = reader.closeAsync().toScala
override def flush(producer: api.Producer[_]): Future[Unit] = producer.flushAsync().toScala

Expand All @@ -92,4 +94,24 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut

override def send[T](builder: TypedMessageBuilder[T]): Future[MessageId] =
builder.sendAsync().toScala.map(MessageId.fromJava)

override def withTransaction[E, A](
builder: api.transaction.TransactionBuilder,
action: TransactionContext => Future[Either[E, A]]
): Future[Either[E, A]] = {
startTransaction(builder).flatMap { txn =>
Future.unit.flatMap(_ => action(txn)).transformWith {
case Success(Right(value)) =>
txn.commit.transform(_ => Success(Right(value)))
case result =>
txn.abort.transform(_ => result)
}
}
}

override def startTransaction(builder: api.transaction.TransactionBuilder): Future[TransactionContext] =
builder.build().toScala.map(TransactionContext(_))
override def commitTransaction(txn: Transaction): Future[Unit] = txn.commit().toScala.map(_ => ())
override def abortTransaction(txn: Transaction): Future[Unit] = txn.abort().toScala.map(_ => ())

}
Loading