RabbitEasy is a library for easily integrating RabbitMQ into your Java infrastructure. It is built on top of the official RabbitMQ Java client and improves realization of many common scenarios for Java SE and EE applications.
- connection factory for long living single connections
- managed simple, confirmed and transactional publishers that recover from connection loss
- managed consumers that recover from connection loss and re-attach to the broker
Get it from Maven Central and read the documentation.
- convenient integration for JEE6/CDI applications
- publishing of AMQP messages to exchanges for CDI events
- consuming of AMQP messages from queues as CDI events
Get it from Maven Central and read the documentation.
Also have a look at RabbitOrdering for an extensive example.
- convenient broker definition setup and tear down
- convenient asserts on current broker state
Get it from Maven Central.
A single connection factory always provides the same connection on calling newConnection() as long as the connection persists. A new connection is established as soon as the current connection is lost.
SingleConnectionFactory extends ConnectionFactory from the RabbitMQ standard library and is used just the same way as the factory from the standard library. The only difference: From now on you don't have to care about too many connections being established to a broker any more.
Creating a single connection factory:
ConnectionFactory connectionFactory = new SingleConnectionFactory();
connectionFactory.setHost("example.com");
connectionFactory.setPort(4224);
A message object was introduced to provide convenient and save configuration of a message and to improve the way how message content is written and read. A message is created using a builder pattern.
Creating a message without properties:
Message message = new Message()
.exchange("my.exchange")
.routingKey("my.routing.key")
.body("My message content");
Creating a message with properties:
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.messageId("1")
.build();
Message message = new Message(basicProperties)
.exchange("my.exchange")
.routingKey("my.routing.key")
.body("My message content");
Note: Setting the body will also automatically adjust the content type and encoding in the message properties.
Also, a convenient method is provided to read a message's content and to transform it into the desired Java type directly.
Reading content from a message:
Message message = new Message()
.body("123");
String contentAsString = message.getBodyAs(String.class);
Integer contentAsInteger = message.getBodyAs(Integer.class);
Long contentAsLong = message.getBodyAs(Long.class);
Publishing a message:
ConnectionFactory connectionFactory = new SingleConnectionFactory();
Channel channel = connectionFactory.newConnection().createChannel();
Message message = new Message()
.exchange("my.exchange")
.routingKey("my.routing.key")
.body("My message content")
.publish(channel);
Managing connections and channels oneself easily becomes complex and repatitive. Message publishers manage connections and channels themselves and provide a convenient way of publishing messages without having to take care of scenarios like connection aborts.
A simple publisher publishes messages in a fire-and-forget manner. Publishing messages this way, there is no guarantee that messages reach there destination queues. Choose this publisher for sending messages that are of low importance.
Publishing a message with a simple publisher:
ConnectionFactory connectionFactory = new SingleConnectionFactory();
Message message = new Message()
.exchange("my.exchange")
.body("My message content");
Publisher publisher = new SimplePublisher(connectionFactory);
publisher.publish(message);
publisher.close();
Publishing multiple messages with a simple publisher:
ConnectionFactory connectionFactory = new SingleConnectionFactory();
Message messageOne = new Message()
.exchange("my.exchange")
.body("My message one");
Message messageTwo = new Message()
.exchange("my.exchange")
.body("My message two");
List<Message> messageList = new ArrayList<Message>();
messageList.add(messageOne);
messageList.add(messageTwo);
MessagePublisher publisher = new SimplePublisher(connectionFactory);
publisher.publish(messageList);
publisher.close();
Confirmed publishers are used to publish messages so that every single message is confirmed by the broker to have reached all its destination queues. Choose this publisher for publishing messages that are of importance but where delivery can fail independently of other messages.
Initializing a confirmed publisher:
MessagePublisher publisher = new ConfirmedPublisher(connectionFactory);
Transactional publishers are used to publish a set of messages for which a delivery shall succeed for all messages or none. Choose this publisher for publishing multiple messages for which it is important that delivery succeeds for all or none.
Initializing a transactional publisher:
MessagePublisher publisher = new TransactionalPublisher(connectionFactory);
Generic publishers can be used to publish messages with different reliability constraints, depending on the initialization parameter. This is useful in situations where your implementation is very generic and where reliability depends on the actual use case.
Initializing a generic publisher using publisher confirms:
PublisherReliability reliability = PublisherReliability.CONFIRMED;
GenericPublisher publisher = new GenericPublisher(connectionFactory, reliability);
The counter part of a message publisher is the message consumer. Extending the MessageConsumer class and implementing the handleMessage() method is the purposed way of consuming messages.
Extending the message consumer:
class MyConsumer extends MessageConsumer {
@Override
public void handleMessage(Message message) {
String messageContent = message.getBodyAsString();
System.out.println(messageContent);
}
}
A consumer container is provided to manage the consumers in a central component. The container ensures that its contained consumers stay alive, meaning that it reestablishes lost connections and recreates its channels. Also, the container provides the possibility to monitor the actual consumer's status and to enabled and disable certain consumers.
Initializing a consumer container and adding a consumer bound to "my.queue":
ConnectionFactory connectionFactory = new SingleConnectionFactory();
ConsumerContainer consumerContainer = new ConsumerContainer(connectionFactory);
consumerContainer.addConsumer(new MyConsumer(), "my.queue");
consumerContainer.startAllConsumers();
Adding an auto-acknowledging consumer:
ConnectionFactory connectionFactory = new SingleConnectionFactory();
ConsumerContainer consumerContainer = new ConsumerContainer(connectionFactory);
consumerContainer.addConsumer(new MyConsumer(), "my.queue", true); // <- last parameter indicates auto-ack
consumerContainer.startAllConsumers();
Trying to integrate AMQP and RabbitMQ into JEE with JMS is a rocky road with many compromises. This is, why we suggest to integrate RabbitMQ into JEE via bindings between CDI events and message brokers:
- to fire CDI events remotely, bind them to be published as messages to broker exchanges
- to observe CDI events remotely, bind them to be consumed as messages from broker queues
You could also look at it the other way round:
- to publish messages to broker exchanges, bind them to fired CDI events
- to consume messages from broker queues, bind them to observed CDI events
To bind events, first create a subclass of EventBinder and override its bindEvents() method:
public class MyEventBinder extends EventBinder {
@Override
protected void bindEvents() {
// Your event bindings
}
}
This is how you bind an event to an exchange to publish it:
public class MyEventBinder extends EventBinder {
@Override
protected void bindEvents() {
bind(MyEvent.class).toExchange("my.exchange").withRoutingKey("my.routing.Key");
}
}
Firing a CDI event is going to publish a message to the given exchange and routing key:
public class MyEventSource {
@Inject MyEvent event;
@Inject Event<MyEvent> eventControl;
public void testEventFiring() {
eventControl.fire(event);
}
}
This is going to publish the fired event to local observers of MyEvent and is also going to publish a message to the exchange "my.exchange" with routing key "my.routing.Key" as we have defined it in the binding.
Binding an event to a queue for consuming events works the same:
public class MyEventBinder extends EventBinder {
@Override
protected void bindEvents() {
bind(MyEvent.class).toQueue("my.queue");
}
}
Now, CDI observers of the bound event are going to consume messages from "my.queue" in form of the bound event:
public class MyEventObserver {
public void testEventObserving(@Observes MyEvent event) {
// Processing of an event
}
}
To enable your bindings, inject an instance of your event binder and call its initialize() method. Here is an example of how to enable an event binder in a servlet context listener:
public class MyServletContextListener implements ServletContextListener {
@Inject MyEventBinder eventBinder;
public void contextInitialized(ServletContextEvent e) {
eventBinder.initialize();
}
}
Important: Ensure that your CDI provider is already initialized at this point.
Per default, localhost and the standard AMQP port 5672 are used to establish connections. You can configure the used connection for your binder via annotations:
@ConnectionConfiguration(host = "my.host", port=1337)
public class MyEventBinder extends EventBinder {
@Override
protected void bindEvents() {
// Your event bindings
}
}
You can also define multiple connection configurations which can be enabled and disabled with profiles. The system property "rabbiteasy.profile" can be used to define the profile name.
In the example below, three profiles are defined: One for a staging and one for a quality environment. If none of those profiles is given in the system property then the first configuration is taken because it has no profile property and is such is treated as default configuration:
@ConnectionConfigurations({
@ConnectionConfiguration(host = "live.host"),
@ConnectionConfiguration(profile="staging", host = "staging.host"),
@ConnectionConfiguration(profile="quality", host = "quality.host")
})
public class MyEventBinder extends EventBinder {
@Override
protected void bindEvents() {
// Your event bindings
}
}
Interfaces on events are used to make the framework aware of content that shall be published with or consumed from a message. The framework recognizes the existence of those interfaces on your event classes and takes care of message body serialization and deserialization.
To transport content within an event, implement the ContainsContent interface and specify of which type the content is in the Generic Parameter. This way, the framework knows how to serialize and deserialize the event content:
public class MyEvent implements ContainsContent<String> {
private String content;
@Override
public void setContent(String content) {
this.content = content;
}
@Override
public String getContent() {
return content;
}
}
Strings and primitives are (de)serialized from/to their textual representation. All other types are (de)serialized from/to their XML representation if not specified differently in the bindings.
Because transporting IDs in events is so common, we also added a ContainsId interface that behaves exactly the same as ContainsContent but with different naming, so your code stays more readable:
public class MyEvent implements ContainsId<Integer> {
private Integer id;
@Override
public Integer getId() {
return id;
}
@Override
public void setId(Integer id) {
this.id = id;
}
}
To transport raw data like binary files within an event, implement the ContainsData interface:
public class MyEvent implements ContainsData {
private byte[] data;
@Override
public byte[] getData() {
return data;
}
@Override
public void setData(byte[] data) {
this.data = data;
}
}