Skip to content

Commit

Permalink
transaction support
Browse files Browse the repository at this point in the history
  • Loading branch information
gmethvin committed May 12, 2021
1 parent 17ee059 commit ff71a92
Show file tree
Hide file tree
Showing 17 changed files with 262 additions and 85 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down Expand Up @@ -64,7 +64,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: run tests
run: sbt ++2.12.10 test
Expand All @@ -41,7 +41,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: run tests
run: sbt ++2.13.3 test
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down Expand Up @@ -66,7 +66,7 @@ jobs:
java-version: 11

- name: Launch pulsar docker
run: docker run -d -it -p 6650:6650 -p 8080:8080 -v /tmp/data:/pulsar/data apachepulsar/pulsar:2.7.1 bin/pulsar standalone
run: docker-compose up -d

- name: Import GPG key
id: import_gpg
Expand Down
2 changes: 0 additions & 2 deletions .sbtopts
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
-J-Xmx4G
-J-XX:MaxMetaspaceSize=1G
-J-XX:MaxPermSize=1G
-J-XX:+CMSClassUnloadingEnabled
18 changes: 18 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: "3"

services:
pulsar_standalone:
image: apachepulsar/pulsar:2.7.1
ports:
- "8080:8080"
- "6650:6650"
volumes:
- ./data:/pulsar/data
environment:
- PULSAR_PREFIX_transactionCoordinatorEnabled=true
command: >
/bin/bash -c "
bin/apply-config-from-env.py conf/standalone.conf
&& (bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone &)
&& bin/pulsar standalone
"
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.sksamuel.exts.Logging
import com.sksamuel.pulsar4s.Consumer
import com.sksamuel.pulsar4s.ConsumerMessage
import com.sksamuel.pulsar4s.MessageId
import com.sksamuel.pulsar4s.TransactionContext
import org.apache.pulsar.client.api.ConsumerStats

import scala.concurrent.ExecutionContext
Expand All @@ -24,7 +25,7 @@ import scala.util.Try
import scala.util.control.NonFatal

trait CommittableMessage[T] {
def ack(cumulative: Boolean = false): Future[Done]
def ack(cumulative: Boolean = false)(implicit txn: TransactionContext = TransactionContext.empty): Future[Done]
def nack(): Future[Done]
def message: ConsumerMessage[T]
}
Expand All @@ -47,12 +48,12 @@ class PulsarCommittableSourceGraphStage[T](
val message: ConsumerMessage[T]
)(implicit ec: ExecutionContext) extends CommittableMessage[T] {
def messageId: MessageId = message.messageId
override def ack(cumulative: Boolean): Future[Done] = {
override def ack(cumulative: Boolean)(implicit txn: TransactionContext): Future[Done] = {
logger.debug(s"Acknowledging message: $message")
val ackFuture = if (cumulative) {
consumer.acknowledgeCumulativeAsync(message.messageId)
txn(consumer).acknowledgeCumulativeAsync(message.messageId)
} else {
consumer.acknowledgeAsync(message.messageId)
txn(consumer).acknowledgeAsync(message.messageId)
}
ackFuture.map(_ => Done)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.sksamuel.pulsar4s
import com.sksamuel.pulsar4s._
import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.{Consumer => _, MessageId => _, Producer => _, PulsarClient => _, Reader => _, _}
import org.apache.pulsar.client.api.transaction.Transaction

import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.concurrent.ExecutionException
Expand Down Expand Up @@ -131,14 +132,16 @@ trait CatsAsyncHandlerLowPriority {
}

override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
Async[F].delay {
consumer.acknowledgeAsync(messageId)
}.liftF.void
Async[F].delay(consumer.acknowledgeAsync(messageId)).liftF.void

override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] =
Async[F].delay(consumer.acknowledgeAsync(messageId, txn)).liftF.void

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
Async[F].delay {
consumer.acknowledgeCumulativeAsync(messageId)
}.liftF.void
Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId)).liftF.void

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] =
Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId, txn)).liftF.void

override def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
Async[F].delay {
Expand Down Expand Up @@ -169,6 +172,14 @@ trait CatsAsyncHandlerLowPriority {

override def send[T](builder: TypedMessageBuilder[T]): F[MessageId] =
Async[F].delay(builder.sendAsync()).liftF.map(MessageId.fromJava)

override def withTransaction[T](create: () => CompletableFuture[Transaction], action: Transaction => F[T]): F[T] = {
Resource.makeCase(Async[F].delay(create()).liftF) {
case (txn, ExitCase.Completed) => Async[F].delay(txn.commit())
case (txn, _) => Async[F].delay(txn.abort())
}
.use(action)
}
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.sksamuel.pulsar4s

import java.util.concurrent.CompletableFuture

import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.TypedMessageBuilder
import org.apache.pulsar.client.api.transaction.Transaction

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
Expand All @@ -15,7 +18,9 @@ trait AsyncHandler[F[_]] {
def createConsumer[T](builder: api.ConsumerBuilder[T]): F[Consumer[T]]
def createReader[T](builder: api.ReaderBuilder[T]): F[Reader[T]]

def send[T](t: T, producer: api.Producer[T]): F[MessageId]
@deprecated("Use send(builder) instead", "2.8.0")
def send[T](t: T, producer: api.Producer[T]): F[MessageId] = send(producer.newMessage().value(t))

def send[T](builder: TypedMessageBuilder[T]): F[MessageId]
def receive[T](consumer: api.Consumer[T]): F[ConsumerMessage[T]]
def receiveBatch[T](consumer: api.Consumer[T]): F[Vector[ConsumerMessage[T]]]
Expand All @@ -39,8 +44,12 @@ trait AsyncHandler[F[_]] {
def getLastMessageId[T](consumer: api.Consumer[T]): F[MessageId]

def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit]
def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit]
def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]

def withTransaction[T](create: () => CompletableFuture[Transaction], action: Transaction => F[T]): F[T]
}

object AsyncHandler {
Expand Down
39 changes: 28 additions & 11 deletions pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,24 @@ import java.util.concurrent.TimeUnit

import com.sksamuel.exts.Logging
import org.apache.pulsar.client.api.ConsumerStats
import org.apache.pulsar.client.api.transaction.Transaction

import scala.concurrent.duration.FiniteDuration
import scala.util.Try

trait Consumer[T] extends Closeable {
trait TransactionalConsumerOps[T] {
final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeAsync(message.messageId)

def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeCumulativeAsync(message.messageId)

def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]
}

trait Consumer[T] extends Closeable with TransactionalConsumerOps[T] {

/**
* Receives a single message.
Expand Down Expand Up @@ -65,23 +78,15 @@ trait Consumer[T] extends Closeable {
def acknowledgeCumulative(message: ConsumerMessage[T]): Unit
def acknowledgeCumulative(messageId: MessageId): Unit

final def acknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeAsync(message.messageId)

def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def negativeAcknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
negativeAcknowledgeAsync(message.messageId)

def negativeAcknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeCumulativeAsync(message.messageId)

def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

def unsubscribe(): Unit
def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit]

def txn(implicit ctx: TransactionContext): TransactionalConsumerOps[T]
}

class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Logging {
Expand Down Expand Up @@ -109,6 +114,7 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin

override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId)

override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId)

Expand Down Expand Up @@ -142,4 +148,15 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin

override def unsubscribe(): Unit = consumer.unsubscribe()
override def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit] = implicitly[AsyncHandler[F]].unsubscribeAsync(consumer)

override def txn(implicit ctx: TransactionContext): TransactionalConsumerOps[T] =
ctx.transaction.map(new DefaultTransactionalConsumer[T](consumer, _)).getOrElse(this)
}

class DefaultTransactionalConsumer[T](consumer: JConsumer[T], transaction: Transaction) extends TransactionalConsumerOps[T] {
override def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeAsync(consumer, messageId, transaction)

override def acknowledgeCumulativeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId, transaction)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.concurrent.CompletableFuture

import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.{ConsumerBuilder, ReaderBuilder, TypedMessageBuilder}
import org.apache.pulsar.client.api.transaction.Transaction

import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.compat.java8.FutureConverters
Expand Down Expand Up @@ -32,11 +33,6 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
override def createReader[T](builder: ReaderBuilder[T]): Future[Reader[T]] =
builder.createAsync().thenApply[Reader[T]](new DefaultReader(_)).toScala

override def send[T](t: T, producer: api.Producer[T]): Future[MessageId] = {
val future = producer.sendAsync(t)
FutureConverters.toScala(future).map(MessageId.fromJava)
}

override def receive[T](consumer: api.Consumer[T]): Future[ConsumerMessage[T]] = {
val future = consumer.receiveAsync()
FutureConverters.toScala(future).map(ConsumerMessage.fromJava)
Expand Down Expand Up @@ -75,12 +71,18 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] =
consumer.acknowledgeAsync(messageId).toScala

override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] =
Future.successful(consumer.negativeAcknowledge(messageId))
override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] =
consumer.acknowledgeAsync(messageId, txn).toScala

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] =
consumer.acknowledgeCumulativeAsync(messageId).toScala

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] =
consumer.acknowledgeCumulativeAsync(messageId, txn).toScala

override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] =
Future.successful(consumer.negativeAcknowledge(messageId))

override def close(reader: api.Reader[_]): Future[Unit] = reader.closeAsync().toScala
override def flush(producer: api.Producer[_]): Future[Unit] = producer.flushAsync().toScala

Expand All @@ -92,4 +94,16 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut

override def send[T](builder: TypedMessageBuilder[T]): Future[MessageId] =
builder.sendAsync().toScala.map(MessageId.fromJava)

override def withTransaction[T](create: () => CompletableFuture[Transaction], action: Transaction => Future[T]): Future[T] = {
create().toScala.flatMap { txn =>
action(txn).transformWith {
case Success(value) =>
txn.commit().toScala.transform(_ => Success(value))
case Failure(exception) =>
txn.abort().toScala.transform(_ => Failure(exception))
}
}
}

}
Loading

0 comments on commit ff71a92

Please sign in to comment.