-
Notifications
You must be signed in to change notification settings - Fork 107
Spring integration with Azure Messaging design doc
Azure provides multiple messaging services but with various usage scenario and different API. Some popular ones are storage queue, service bus queue and topic and event hub. Customers may start with a simple one with lower throughput, then switch a more advanced and sophisticated one which provides high throughput and dynamic load balancing. Current each message service SDK has its own message format and inconsistent sending and receiving semantics, which makes a steep learning curve and high migration cost to new technologies. So when thinking about how to make Spring integration with Azure message service smoothly, we try to provide a uniform message interface and consistent messaging semantics with Spring.
-
Uniform message interface provided by Spring. Spring
Message
interface is used for all services for sending and receiving. For each service, we provides message converter to convert between Spring message and azure service specific message format. You also can provide your custom message converter. -
Consistent message semantics defined by clearly separated interface. We provides
SendOperation
,SubscribeOperation
andSubscribeOperation
. Different message service implement supported function through its message template. -
Different level of abstraction. The lower level you operate at, you have less dependency and more control. From lower to higher, we provides
Template
,ChannelAdapter
with Spring Integration andBinder
with Spring Cloud Stream.
Concept defined by Spring Integration. A Message is a generic wrapper for any Java object combined with metadata used by the framework while handling that object. It consists of a payload and headers. The payload can be of any type and the headers hold commonly required information such as id, timestamp, correlation id, and return address. Headers are also used for passing values to and from connected transports. For example, when creating a Message from a received File, the file name may be stored in a header to be accessed by downstream components. Likewise, if a Message’s content is ultimately going to be sent by an outbound Mail adapter, the various properties (to, from, cc, subject, etc.) may be configured as Message header values by an upstream component. Developers can also store any arbitrary key-value pairs in the headers.
Interface to support to send Message
to a given destination in an asynchronous way.
public interface SendOperation {
/**
* Send a {@link Message} to the given destination with a given partition supplier.
*/
<T> CompletableFuture<Void> sendAsync(String destination, Message<T> message, PartitionSupplier partitionSupplier);
/**
* Send a {@link Message} to the given destination.
*/
default <T> CompletableFuture<Void> sendAsync(String destination, Message<T> message) {
return sendAsync(destination, message, null);
}
}
Decides whether when to send ackownlegement to service to confirm the success or failure of message processing.
public enum CheckpointMode {
/**
* Checkpoint after each processed record.
* Makes sense only if {@link ListenerMode#RECORD} is used.
*/
RECORD,
/**
* Checkpoint after each processed batch of records.
*/
BATCH,
/**
* User decide when to checkpoint manually
*/
MANUAL,
}
Subscribe to a given destination. Then messages will be pushed to your consumer for processing. The message contains payload of type you provided. You can only subscribe for one destination once. Multiple subscription of one destination makes no effect, only returning false.
public interface SubscribeOperation {
/**
* Register a message consumer to a given destination.
*
* @return {@code true} if the consumer was subscribed or {@code false} if it
* was already subscribed.
*/
boolean subscribe(String destination, Consumer<Message<?>> consumer, Class<?> messagePayloadType);
default boolean subscribe(String destination, Consumer<Message<?>> consumer) {
return this.subscribe(destination, consumer, byte[].class);
}
/**
* Un-register a message consumer.
*
* @return {@code true} if the consumer was un-registered, or {@code false}
* if was not registered.
*/
boolean unsubscribe(String destination);
void setCheckpointMode(CheckpointMode checkpointMode);
}
Same as above, but with a specified consumer group.
public interface SubscribeByGroupOperation {
/**
* Register a message consumer to a given destination with a given consumer group.
*
* @return {@code true} if the consumer was subscribed or {@code false} if it
* was already subscribed.
*/
boolean subscribe(String destination, String consumerGroup, Consumer<Message<?>> consumer,
Class<?> messagePayloadType);
default boolean subscribe(String destination, String consumerGroup, Consumer<Message<?>> consumer) {
return this.subscribe(destination, consumerGroup, consumer, byte[].class);
}
/**
* Un-register a message consumer with a given destination and consumer group.
*
* @return {@code true} if the consumer was un-registered, or {@code false}
* if was not registered.
*/
boolean unsubscribe(String destination, String consumerGroup);
void setCheckpointMode(CheckpointMode checkpointMode);
}
Supports send and subscribe by group.
public interface EventHubOperation extends SendOperation, SubscribeByGroupOperation {}
Supports send and subscribe by group. But group is implemented by Topic Subscription
.
public interface ServiceBusTopicOperation extends SendOperation, SubscribeByGroupOperation {}
Supports send and subscribe.
public interface ServiceBusQueueOperation extends SendOperation, SubscribeOperation {}