-
Notifications
You must be signed in to change notification settings - Fork 72
Upgrading
Some noteworthy items for upgrading:
-
An implicit
RecoveryStrategy
is no longer optional (a default is no longer provided). It used to default toRecoveryStrategy.limitedDeliver()
. -
RecoveryStrategy.LimitedDeliver.*
was merged intoRecoveryStrategy
.RecoveryStrategy.abandonedQueue
is aRecoveryStrategy
, now. TheAbandonStrategy
type was removed in favor of unification. -
abandonQueue
/limitedRedeliver
now preserve original routingKey and exchange in message headers. seeRecoveryStrategy.
x-original-routing-keyand
RecoveryStrategy.
x-original-exchange`, along with the directives to get the original routingKey / exchange if it exists. -
Arguments required for a
RecoveryStrategy
have been reduced. If you defined your ownRecoveryStrategy
, then you can get the entire delivery payload by usingextract(identity)
in your returned handler. -
|
erroneously made the apply function run asynchronously. This is no longer. -
Handler
type has been simplified. Result is communicated by aPromise[AckResult]
, rather than aPromise[Either[Rejection, AckOrNack]]
. ValidAckResult
s live in the companion object.AckResult
is a sealed trait.Rejections
are exceptions that are passed via aAckResult.Fail
.
Substitute as follows:
-
TopicBinding(...)
->Binding.topic(...)
-
FanoutBinding(...)
->Binding.fanout(...)
-
HeadersBinding(...)
->Binding.headers(...)
VerifiedQueuePublisher has been axed in favor for a more general solution. Instead of:
val factory = Message.factory(VerifiedQueuePublisher("no-existe"))
You now specify a publisher to a passive queue:
val factory = Message.factory(Publisher.queue(Queue.passive("no-existe")))
This change enables other use cases, such as actually defining a queue on first publish
val factory = Message.factory(
Publisher.queue(
Queue("ahora-existe")))
val factory = Message.factory(
Publisher.exchange(
Binding.topic(
Queue("ahora-existe", topics = List("los.nachos.*")))))
When you publish a message, RabbitControl responds with Message.ConfirmResponse, rather than true
or false
.
All messages are assigned a sequential id
.
The sink no longer generates messages for you.
Before:
AckedFlow[T].
to(
MessagePublisherSink[Int](
rabbitControl,
Message.factory(Publisher.queue(queueName))))
After:
AckedFlow[T].
map(Message.queue(_, queueName)).
to(MessagePublisherSink(rabbitControl))
This change allows the stream to dynamically determine message destination and properties from the stream element.
op-rabbit.topic-exchange-name
defaults to amq.topic
.
rabbitmq
config block is no longer recognized.
Message.apply(publisher, data)
is now Message.apply(data, publisher)
, to be consistent with the rest of the Message factory methods.
I completely blew my commitment to keep API changes to a minimum from RC1 onward. As I was working on tutorial, there were several warts that just were nagging me, and I decided it would be best to make radical API changes while the RC label is still in tact.
This represents one of the largest API changes, of all. I'm seriously excited about it.
-
TopicMessage
,QueueMessage
,ExchangeMessage
objects have becomeMessage.topic
,Message.queue
,Message.exchange
, etc. -
ConfirmedMessage
has become simplyMessage
, as it is the default. -
UnconfirmedMessage
gained a symmetrical Factory function API (UnconfirmedMessage.topic
,UnconfirmedMessage.factory
, etc.) -
QueuePublisher
,ExchangePublisher
, andTopicPublisher
have becomePublisher.topic
,Publisher.exchange
,Publisher.topic
, etc.; There is only onePublisher
instance. The factory methods are just for convenience and offer defaults that make sense for each of the scenarios.
TypedHeader
has been introduced to describe message-headers constrained to a specific type (RabbitMQ headers are, otherwise, untyped and limited to a subset of primitives, Maps and Seqs and some others).
property(Header("my-custom-header").as[Int])
has become property(TypedHeader[Int]("my-custom-header"))
.
-
QueueBinding
has becomeQueue
,ExchangeBinding
has becomeExchange
.-
TopicBinding
,HeadersBinding
,FanoutBinding
receive aQueue
definition and anExchange
definition, rather than having the parameters for the creation of each flatten. -
PassiveQueueBinding
has becomeQueue.passive
.Queue.passive
can receive aQueue
definition which will be used to declare the queue in the event that the queue in question doesn't already exist. -
Exchange.passive
has been created, similarly, can receive a non-passiveExchange
definition. -
Exchange
definitions are generically typed; if you pass an Exchange[Exchange.Topic.Value] to a HeadersBinding, the compiler will yell at you.
-
- Modeled Queue / Exchange arguments introduced, providing compiler-level safety for both argument names, types; Where a duration is concerned, the Modeled argument receives a FiniteDuration, which it maps to an integer of milliseconds.
Here's a complex example using the new Queue definition syntax, showing how easy it is to passively declare a queue, and create it if it doesn't exist.
import Queue.ModeledArgs._
Queue.passive(
Queue(
s"op-rabbit.retry.${queueName}",
durable = true,
arguments = Seq(
`x-expires`(30 seconds),
`x-message-ttl`(5 minutes),
`x-dead-letter-exchange`(""), // default exchange
`x-dead-letter-routing-key`(queueName))))
ConfirmedPublisherSink and Sources lost their name
argument.
-
nack
behavior has been simplified. It's no longer communicated as an exception, and providing a reason for message-rejection has been axed. It's gained the parameterrequeue
=false
, orrequeue
=true
. -
RecoveryStrategy.nack
has been simplified, also. It defaults torequeue = false
. -
RecoveryStrategy.limitedRedeliver
uses a RabbitMQ queue with dead-letter forwarding options and aTTL
to handle message retry. The messages still go to the back of the line, but the consumer is no longer slowed down in failure mode. - The
Directives
queue binding DSL has been updated to incorporate the above changes. -
Subscription.register
becameSubscription.run
; The same change applies for theregister
method on a Subscription definition instance.
-
com.spingo.op_rabbit.consumer._
was moved intocom.spingo.op_rabbit._
; update your imports, accordingly. - The method of instantiating
Subscription
has changed, substantially.Subscription
is now completely stateless. - Casting Header values in the Handler DSL has changed, and supports
optionalProperty
.
Before, subscriptions were declared and registered like this:
val subscription = new Subscription {
// A qos of 3 will cause up to 3 concurrent messages to be processed at any given time.
def config = channel(qos = 3) {
consume(topic("such-message-queue", List("some-topic.#"))) {
body(as[Person]) { person =>
// do work; this body is executed in a separate thread, as provided by the implicit execution context
ack()
}
}
}
}
rabbitControl ! subscription
subscription.initialized.foreach { _ =>
println("Initialized!")
subscription.close()
}
Subscription
had a close()
method and a closed
property. This is no more.
The above is rewritten as follows for v1.0.0-RC1
:
val subscription = Subscription {
// A qos of 3 will cause up to 3 concurrent messages to be processed at any given time.
def config = channel(qos = 3) {
consume(topic("such-message-queue", List("some-topic.#"))) {
body(as[Person]) { person =>
// do work; this body is executed in a separate thread, as provided by the implicit execution context
ack()
}
}
}
}
val subscriptionRef = subscription.register(rabbitControl)
subscriptionRef.initialized.foreach { _ =>
println("Initialized!")
subscriptionRef.close()
}
Before, HeaderValue
s were cast using as follows:
property(Header("x-retries")).as(typeOf[Int])
This did not work for optionalProperty
; the old way no longer works and the new way is shorter:
property(Header("x-retries").as[Int])
optionalProperty(Header("x-retries").as[Int])