Skip to content

Commit

Permalink
Refactor Service Bus sample:multiple-namespaces and eventhubs-integra…
Browse files Browse the repository at this point in the history
…tion (#134)
  • Loading branch information
backwind1233 authored Jan 17, 2022
1 parent ccabc1a commit 52b2f02
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,32 @@

package com.azure.spring.sample.eventhubs;


import com.azure.spring.eventhubs.core.EventHubsProcessorContainer;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.bind.annotation.RestController;

/**
* @author Warren Zhu
* Configuration Class for EventHubIntegration sample.
*/
@RestController
public class ReceiveController {

private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveController.class);
@Configuration
public class EventHubIntegrationConfiguration {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";

/**
* This message receiver binding with {@link EventHubsInboundChannelAdapter}
* via {@link MessageChannel} has name {@value INPUT_CHANNEL}
* {@link EventHubsInboundChannelAdapter} binding with {@link MessageChannel} has name {@value INPUT_CHANNEL}
*
* @param inputChannel the MessageChannel binding with EventHubsInboundChannelAdapter
* @param processorContainer instance of EventHubsProcessorContainer
* @return instance of EventHubsInboundChannelAdapter
*/
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}

@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
Expand All @@ -58,8 +42,14 @@ public EventHubsInboundChannelAdapter messageChannelAdapter(
return adapter;
}

/**
* {@link MessageChannel} with name {@value INPUT_CHANNEL}
*
* @return {@link MessageChannel}
*/
@Bean
public MessageChannel input() {
return new DirectChannel();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.sample.eventhubs;

import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

/**
* @author Warren Zhu
*/
@Service
public class ReceiveService {

private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveService.class);
private static final String INPUT_CHANNEL = "input";

/**
* This message receiver binding with {@link EventHubsInboundChannelAdapter}
* via {@link MessageChannel} has name {@value INPUT_CHANNEL}
*/
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class SendController {
private static final String EVENTHUB_NAME = "eh1";

@Autowired
EventHubOutboundGateway messagingGateway;
private EventHubOutboundGateway messagingGateway;

/**
* Posts a message to an Azure Event Hub
Expand All @@ -40,9 +40,14 @@ public String send(@RequestParam("message") String message) {
return message;
}

@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubsTemplate queueOperation) {
/**
* This message sender binds with {@link MessagingGateway} via {@link MessageChannel} has name
* {@value OUTPUT_CHANNEL}
*
*/
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubsTemplate queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ mvn clean spring-boot:run


## Verify This Sample
Send a POST request to service bus queue
```shell
$ curl -X POST http://localhost:8080/queues?message=hello
```


Verify in your app’s logs that a similar message was posted:
```shell
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import static com.azure.spring.servicebus.core.processor.DefaultServiceBusNamespaceProcessorFactory.INVALID_SUBSCRIPTION;

/**
* Property class for Service Bus multiple namespaces sample.
*/
@ConfigurationProperties("servicebus")
public class CustomizedServiceBusProperties {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.azure.spring.sample.servicebus;

import com.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusAutoConfiguration;
import com.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusMessagingAutoConfiguration;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import com.azure.spring.servicebus.core.processor.DefaultServiceBusNamespaceProcessorFactory;
Expand All @@ -13,12 +15,16 @@
import com.azure.spring.servicebus.core.properties.ProducerProperties;
import com.azure.spring.servicebus.support.converter.ServiceBusMessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import reactor.util.function.Tuple2;

@Configuration(proxyBeanMethods = false)
Expand All @@ -28,6 +34,13 @@
MultipleNamespacesAzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class
})
public class MultipleNamespacesAzureServiceBusMessagingAutoConfiguration {

private static final String RECEIVE_QUEUE_NAME = "queue1";
private static final String INPUT_CHANNEL = "queue1.input";

@Autowired
private CustomizedServiceBusProperties properties;

/**
* Configure the {@link ServiceBusProcessorContainer}
*/
Expand Down Expand Up @@ -77,4 +90,40 @@ public ServiceBusTemplate serviceBusTemplate(ServiceBusProducerFactory senderCli
return serviceBusTemplate;
}
}

/**
* {@link ServiceBusInboundChannelAdapter} binding with {@link MessageChannel} has name {@value INPUT_CHANNEL}
*
* @param inputChannel the MessageChannel binding with ServiceBusInboundChannelAdapter
* @param processorContainer instance of ServiceBusProcessorContainer
* @return instance of ServiceBusInboundChannelAdapter
*/
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, RECEIVE_QUEUE_NAME,
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}

/**
* {@link MessageChannel} with name {@value INPUT_CHANNEL}
*
* @return {@link MessageChannel}
*/
@Bean(name = INPUT_CHANNEL)
public MessageChannel input() {
return new DirectChannel();
}

@Bean
public PropertiesSupplier<String, ProducerProperties> producerPropertiesSupplier() {
return properties.producerPropertiesSupplier();
}

@Bean
public PropertiesSupplier<Tuple2<String, String>, ProcessorProperties> processorPropertiesSupplier() {
return properties.processorPropertiesSupplier();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,31 @@
import com.azure.spring.integration.handler.DefaultMessageHandler;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import com.azure.spring.service.servicebus.properties.ServiceBusEntityType;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class QueueReceiveController {
@Service
public class QueueReceiveService {

private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveController.class);
private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveService.class);
private static final String INPUT_CHANNEL = "queue1.input";
private static final String RECEIVE_QUEUE_NAME = "queue1";
private static final String OUTPUT_CHANNEL = "queue2.output";
private static final String OUTPUT_CHANNEL_QUEUE2 = "queue2.output";
private static final String FORWARD_QUEUE_NAME = "queue2";

@Autowired
QueueForwardGateway messagingGateway;
private QueueForwardGateway messagingGateway;

/**
* This message receiver binding with {@link ServiceBusInboundChannelAdapter}
Expand All @@ -53,22 +47,16 @@ public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) C
this.messagingGateway.send(message);
}

@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, RECEIVE_QUEUE_NAME,
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}

@Bean(name = INPUT_CHANNEL)
public MessageChannel input() {
return new DirectChannel();
}

/**
* Get messages from {@link MessageChannel} with name {@value OUTPUT_CHANNEL_QUEUE2}
* and send messages to queue with name {@value FORWARD_QUEUE_NAME}.
*
* @param serviceBusTemplate template to send messages
* @return instance of {@link MessageChannel}
*/
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
@ServiceActivator(inputChannel = OUTPUT_CHANNEL_QUEUE2)
public MessageHandler queueMessageForwarder(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler(FORWARD_QUEUE_NAME, serviceBusTemplate);
Expand All @@ -89,9 +77,9 @@ public void onFailure(Throwable ex) {

/**
* Message gateway binding with {@link MessageHandler}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL_QUEUE2}
*/
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL_QUEUE2)
public interface QueueForwardGateway {
void send(String text);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,24 @@
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class QueueSendController {
@Service
public class QueueSendService {

private static final Logger LOGGER = LoggerFactory.getLogger(QueueSendController.class);
private static final String OUTPUT_CHANNEL = "queue1.output";
private static final Logger LOGGER = LoggerFactory.getLogger(QueueSendService.class);
private static final String OUTPUT_CHANNEL_QUEUE1 = "queue1.output";
private static final String QUEUE_NAME = "queue1";

@Autowired
QueueSendGateway messagingGateway;

/**
* Posts a message to a Service Bus Queue
*/
@PostMapping("/queues")
public String send(@RequestParam("message") String message) {
this.messagingGateway.send(message);
return message;
}

@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
@ServiceActivator(inputChannel = OUTPUT_CHANNEL_QUEUE1)
public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
Expand All @@ -61,9 +46,9 @@ public void onFailure(Throwable ex) {

/**
* Message gateway binding with {@link MessageHandler}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL_QUEUE1}
*/
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL_QUEUE1)
public interface QueueSendGateway {
void send(String text);
}
Expand Down
Loading

0 comments on commit 52b2f02

Please sign in to comment.