Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Service Bus sample:multiple-namespaces and eventhubs-integration #134

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,32 @@

package com.azure.spring.sample.eventhubs;


import com.azure.spring.eventhubs.core.EventHubsProcessorContainer;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.bind.annotation.RestController;

/**
* @author Warren Zhu
* Configuration Class for EventHubIntegration sample.
*/
@RestController
public class ReceiveController {

private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveController.class);
@Configuration
public class EventHubIntegrationConfiguration {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";

/**
* This message receiver binding with {@link EventHubsInboundChannelAdapter}
* via {@link MessageChannel} has name {@value INPUT_CHANNEL}
* {@link EventHubsInboundChannelAdapter} binding with {@link MessageChannel} has name {@value INPUT_CHANNEL}
*
* @param inputChannel the MessageChannel binding with EventHubsInboundChannelAdapter
* @param processorContainer instance of EventHubsProcessorContainer
* @return instance of EventHubsInboundChannelAdapter
*/
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}

@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
Expand All @@ -58,8 +42,14 @@ public EventHubsInboundChannelAdapter messageChannelAdapter(
return adapter;
}

/**
* {@link MessageChannel} with name {@value INPUT_CHANNEL}
*
* @return {@link MessageChannel}
*/
@Bean
public MessageChannel input() {
return new DirectChannel();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.sample.eventhubs;

import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

/**
* @author Warren Zhu
*/
@Service
public class ReceiveService {

private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveService.class);
private static final String INPUT_CHANNEL = "input";

/**
* This message receiver binding with {@link EventHubsInboundChannelAdapter}
* via {@link MessageChannel} has name {@value INPUT_CHANNEL}
*/
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class SendController {
private static final String EVENTHUB_NAME = "eh1";

@Autowired
EventHubOutboundGateway messagingGateway;
private EventHubOutboundGateway messagingGateway;

/**
* Posts a message to an Azure Event Hub
Expand All @@ -40,9 +40,14 @@ public String send(@RequestParam("message") String message) {
return message;
}

@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubsTemplate queueOperation) {
/**
* This message sender binds with {@link MessagingGateway} via {@link MessageChannel} has name
* {@value OUTPUT_CHANNEL}
*
*/
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubsTemplate queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ mvn clean spring-boot:run


## Verify This Sample
Send a POST request to service bus queue
```shell
$ curl -X POST http://localhost:8080/queues?message=hello
```


Verify in your app’s logs that a similar message was posted:
```shell
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import static com.azure.spring.servicebus.core.processor.DefaultServiceBusNamespaceProcessorFactory.INVALID_SUBSCRIPTION;

/**
* Property class for Service Bus multiple namespaces sample.
*/
@ConfigurationProperties("servicebus")
public class CustomizedServiceBusProperties {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.azure.spring.sample.servicebus;

import com.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusAutoConfiguration;
import com.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusMessagingAutoConfiguration;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import com.azure.spring.servicebus.core.processor.DefaultServiceBusNamespaceProcessorFactory;
Expand All @@ -13,12 +15,16 @@
import com.azure.spring.servicebus.core.properties.ProducerProperties;
import com.azure.spring.servicebus.support.converter.ServiceBusMessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import reactor.util.function.Tuple2;

@Configuration(proxyBeanMethods = false)
Expand All @@ -28,6 +34,13 @@
MultipleNamespacesAzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class
})
public class MultipleNamespacesAzureServiceBusMessagingAutoConfiguration {

private static final String RECEIVE_QUEUE_NAME = "queue1";
private static final String INPUT_CHANNEL = "queue1.input";

@Autowired
private CustomizedServiceBusProperties properties;

/**
* Configure the {@link ServiceBusProcessorContainer}
*/
Expand Down Expand Up @@ -77,4 +90,40 @@ public ServiceBusTemplate serviceBusTemplate(ServiceBusProducerFactory senderCli
return serviceBusTemplate;
}
}

/**
* {@link ServiceBusInboundChannelAdapter} binding with {@link MessageChannel} has name {@value INPUT_CHANNEL}
*
* @param inputChannel the MessageChannel binding with ServiceBusInboundChannelAdapter
* @param processorContainer instance of ServiceBusProcessorContainer
* @return instance of ServiceBusInboundChannelAdapter
*/
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, RECEIVE_QUEUE_NAME,
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}

/**
* {@link MessageChannel} with name {@value INPUT_CHANNEL}
*
* @return {@link MessageChannel}
*/
@Bean(name = INPUT_CHANNEL)
public MessageChannel input() {
return new DirectChannel();
}

@Bean
public PropertiesSupplier<String, ProducerProperties> producerPropertiesSupplier() {
return properties.producerPropertiesSupplier();
}

@Bean
public PropertiesSupplier<Tuple2<String, String>, ProcessorProperties> processorPropertiesSupplier() {
return properties.processorPropertiesSupplier();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,31 @@
import com.azure.spring.integration.handler.DefaultMessageHandler;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import com.azure.spring.service.servicebus.properties.ServiceBusEntityType;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class QueueReceiveController {
@Service
public class QueueReceiveService {

private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveController.class);
private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveService.class);
private static final String INPUT_CHANNEL = "queue1.input";
private static final String RECEIVE_QUEUE_NAME = "queue1";
private static final String OUTPUT_CHANNEL = "queue2.output";
private static final String OUTPUT_CHANNEL_QUEUE2 = "queue2.output";
private static final String FORWARD_QUEUE_NAME = "queue2";

@Autowired
QueueForwardGateway messagingGateway;
private QueueForwardGateway messagingGateway;

/**
* This message receiver binding with {@link ServiceBusInboundChannelAdapter}
Expand All @@ -53,22 +47,16 @@ public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) C
this.messagingGateway.send(message);
}

@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, RECEIVE_QUEUE_NAME,
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}

@Bean(name = INPUT_CHANNEL)
public MessageChannel input() {
return new DirectChannel();
}

/**
* Get messages from {@link MessageChannel} with name {@value OUTPUT_CHANNEL_QUEUE2}
* and send messages to queue with name {@value FORWARD_QUEUE_NAME}.
*
* @param serviceBusTemplate template to send messages
* @return instance of {@link MessageChannel}
*/
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
@ServiceActivator(inputChannel = OUTPUT_CHANNEL_QUEUE2)
public MessageHandler queueMessageForwarder(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler(FORWARD_QUEUE_NAME, serviceBusTemplate);
Expand All @@ -89,9 +77,9 @@ public void onFailure(Throwable ex) {

/**
* Message gateway binding with {@link MessageHandler}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL_QUEUE2}
*/
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL_QUEUE2)
public interface QueueForwardGateway {
void send(String text);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,24 @@
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class QueueSendController {
@Service
public class QueueSendService {

private static final Logger LOGGER = LoggerFactory.getLogger(QueueSendController.class);
private static final String OUTPUT_CHANNEL = "queue1.output";
private static final Logger LOGGER = LoggerFactory.getLogger(QueueSendService.class);
private static final String OUTPUT_CHANNEL_QUEUE1 = "queue1.output";
private static final String QUEUE_NAME = "queue1";

@Autowired
QueueSendGateway messagingGateway;

/**
* Posts a message to a Service Bus Queue
*/
@PostMapping("/queues")
public String send(@RequestParam("message") String message) {
this.messagingGateway.send(message);
return message;
}

@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
@ServiceActivator(inputChannel = OUTPUT_CHANNEL_QUEUE1)
public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
Expand All @@ -61,9 +46,9 @@ public void onFailure(Throwable ex) {

/**
* Message gateway binding with {@link MessageHandler}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL_QUEUE1}
*/
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL_QUEUE1)
public interface QueueSendGateway {
void send(String text);
}
Expand Down
Loading