-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transaction support #307
transaction support #307
Conversation
17b7c8f
to
ff71a92
Compare
pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala
Outdated
Show resolved
Hide resolved
* headers and so on. The producer will generate an appropriate | ||
* Pulsar [[ProducerMessage]] with this t set as the value. | ||
*/ | ||
def sendAsync[F[_] : AsyncHandler](t: T): F[MessageId] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that Pulsar supports both send
and sendAsync
with transactions, while it only supports acknowledgeAsync
, but it seems odd that you'd want to use an async method one place but not in another, so I think I'll omit that from the transactional API.
700832d
to
395aede
Compare
for { | ||
msg <- consumer.receiveAsync | ||
msgId <- txn(producer).sendAsync(msg.value + "_test") | ||
_ <- txn(consumer).acknowledgeAsync(msg.messageId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this syntax but this instantiates the wrapper every single call. performance-wise it's negligible, but it seems slightly weird to re-instantiate the wrapper every single time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe we're meant to do txnProducer = txn(producer)
for multiple calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think whether you use an intermediate variable or not is down to personal preference. I was leaning toward just using the extra parameter like producer.sendAsync(value, txn)
, which has the advantage of being simpler to implement, but I sort of like the txn
coming first to make it clear the work is done in a transaction.
@sksamuel let me know if you have any thoughts on this. I'm open to making changes in the API. |
4b62fa7
to
c71fbf1
Compare
273d64c
to
01ccd32
Compare
b8010c3
to
d39edcd
Compare
@sksamuel I rebased this on top of the current master. Do you have any thoughts on the API? |
Hi! I just took over the maintenance of this project. Right now, I have no strong opinion about the API. It seems quite straightforward to me. (For reasons I'm stuck on pulsar 2.7.1. So no txn for me right now… 😢 ) |
ok @judu , so you don't mind merging this? |
Changelog: - [Org] Change com.sksamuel to com.clever-cloud for nexus credentials reasons - [Org] Update all refs to github.com/CleverCloud - [Deps] Upgrade deps: - scala 2.13.7, 2.12.15 - play-json 2.8.2 - pulsar-client 2.9.0 - json4s 4.0.3 - scalaz 7.2.33 - monix 3.4.0 - cats-effect 2.5.4 - jackson 2.13.0 - java8compat 1.0.1 (0.9.1 for scala 2.12) - akka-stream 2.6.17 - log4j 2.14.1 - [Fix] Add fs2 to project aggregate in build.sbt #364 (thanks @valdo404) - [Fix] Metadata is not copied to Java messages #334 (thanks @sksamuel) - [Feature] Add transaction support #307 (thanks @gmethvin) - [Feature] Add replicateSubscriptionState in consumerConfig #347 (thanks @deveshgithub) - [Feature] Add enableBatchIndexAcknowledgement option #359 (thanks @gmethvin)
No description provided.