Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Service Bus sample:multiple-namespaces and eventhubs-integration #134

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,22 @@

package com.azure.spring.sample.eventhubs;


import com.azure.spring.eventhubs.core.EventHubsProcessorContainer;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.bind.annotation.RestController;

/**
* @author Warren Zhu
*/
@RestController
public class ReceiveController {

private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveController.class);
@Configuration
public class EventHubIntegrationConfiguration {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";

/**
* This message receiver binding with {@link EventHubsInboundChannelAdapter}
* via {@link MessageChannel} has name {@value INPUT_CHANNEL}
*/
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}

@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
Expand All @@ -62,4 +36,5 @@ public EventHubsInboundChannelAdapter messageChannelAdapter(
public MessageChannel input() {
return new DirectChannel();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.sample.eventhubs;

import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

/**
* @author Warren Zhu
*/
@Service
public class ReceiveService {

private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveService.class);
private static final String INPUT_CHANNEL = "input";

/**
* This message receiver binding with {@link EventHubsInboundChannelAdapter}
* via {@link MessageChannel} has name {@value INPUT_CHANNEL}
*/
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ mvn clean spring-boot:run


## Verify This Sample
Send a POST request to service bus queue
```shell
$ curl -X POST http://localhost:8080/queues?message=hello
```


Verify in your app’s logs that a similar message was posted:
```shell
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.azure.spring.sample.servicebus;

import com.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusAutoConfiguration;
import com.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusMessagingAutoConfiguration;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import com.azure.spring.servicebus.core.processor.DefaultServiceBusNamespaceProcessorFactory;
Expand All @@ -13,12 +15,16 @@
import com.azure.spring.servicebus.core.properties.ProducerProperties;
import com.azure.spring.servicebus.support.converter.ServiceBusMessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import reactor.util.function.Tuple2;

@Configuration(proxyBeanMethods = false)
Expand All @@ -28,6 +34,13 @@
MultipleNamespacesAzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class
})
public class MultipleNamespacesAzureServiceBusMessagingAutoConfiguration {

private static final String RECEIVE_QUEUE_NAME = "queue1";
private static final String INPUT_CHANNEL = "queue1.input";

@Autowired
CustomizedServiceBusProperties properties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private, or constructor injection?


/**
* Configure the {@link ServiceBusProcessorContainer}
*/
Expand Down Expand Up @@ -77,4 +90,28 @@ public ServiceBusTemplate serviceBusTemplate(ServiceBusProducerFactory senderCli
return serviceBusTemplate;
}
}

@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, RECEIVE_QUEUE_NAME,
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}

@Bean(name = INPUT_CHANNEL)
public MessageChannel input() {
return new DirectChannel();
}

@Bean
public PropertiesSupplier<String, ProducerProperties> producerPropertiesSupplier() {
return properties.producerPropertiesSupplier();
}

@Bean
public PropertiesSupplier<Tuple2<String, String>, ProcessorProperties> processorPropertiesSupplier() {
return properties.processorPropertiesSupplier();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,27 @@
import com.azure.spring.integration.handler.DefaultMessageHandler;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import com.azure.spring.service.servicebus.properties.ServiceBusEntityType;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class QueueReceiveController {
@Service
public class QueueReceiveService {

private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveController.class);
private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveService.class);
private static final String INPUT_CHANNEL = "queue1.input";
private static final String RECEIVE_QUEUE_NAME = "queue1";
private static final String OUTPUT_CHANNEL = "queue2.output";
private static final String OUTPUT_CHANNEL_QUEUE2 = "queue2.output";
private static final String FORWARD_QUEUE_NAME = "queue2";

@Autowired
Expand All @@ -54,21 +48,7 @@ public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) C
}

@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, RECEIVE_QUEUE_NAME,
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}

@Bean(name = INPUT_CHANNEL)
public MessageChannel input() {
return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
@ServiceActivator(inputChannel = OUTPUT_CHANNEL_QUEUE2)
public MessageHandler queueMessageForwarder(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler(FORWARD_QUEUE_NAME, serviceBusTemplate);
Expand All @@ -89,9 +69,9 @@ public void onFailure(Throwable ex) {

/**
* Message gateway binding with {@link MessageHandler}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL_QUEUE2}
*/
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL_QUEUE2)
public interface QueueForwardGateway {
void send(String text);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,24 @@
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class QueueSendController {
@Service
public class QueueSendService {

private static final Logger LOGGER = LoggerFactory.getLogger(QueueSendController.class);
private static final String OUTPUT_CHANNEL = "queue1.output";
private static final Logger LOGGER = LoggerFactory.getLogger(QueueSendService.class);
private static final String OUTPUT_CHANNEL_QUEUE1 = "queue1.output";
private static final String QUEUE_NAME = "queue1";

@Autowired
QueueSendGateway messagingGateway;

/**
* Posts a message to a Service Bus Queue
*/
@PostMapping("/queues")
public String send(@RequestParam("message") String message) {
this.messagingGateway.send(message);
return message;
}

@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
@ServiceActivator(inputChannel = OUTPUT_CHANNEL_QUEUE1)
public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
Expand All @@ -61,9 +46,9 @@ public void onFailure(Throwable ex) {

/**
* Message gateway binding with {@link MessageHandler}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL}
* via {@link MessageChannel} has name {@value OUTPUT_CHANNEL_QUEUE1}
*/
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL_QUEUE1)
public interface QueueSendGateway {
void send(String text);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,31 @@

package com.azure.spring.sample.servicebus;

import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.servicebus.core.properties.ProcessorProperties;
import com.azure.spring.servicebus.core.properties.ProducerProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import reactor.util.function.Tuple2;
import com.azure.spring.sample.servicebus.QueueSendService.QueueSendGateway;

@SpringBootApplication
@EnableIntegration
@EnableConfigurationProperties(CustomizedServiceBusProperties.class)
@Configuration(proxyBeanMethods = false)
public class ServiceBusIntegrationApplication {

public static void main(String[] args) {
SpringApplication.run(ServiceBusIntegrationApplication.class, args);
}
public class ServiceBusIntegrationApplication implements CommandLineRunner {

@Autowired
CustomizedServiceBusProperties properties;
QueueSendGateway messagingGateway;
Copy link
Contributor

@stliu stliu Jan 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private


@Bean
public PropertiesSupplier<String, ProducerProperties> producerPropertiesSupplier() {
return properties.producerPropertiesSupplier();
public static void main(String[] args) {
SpringApplication.run(ServiceBusIntegrationApplication.class, args);
}

@Bean
public PropertiesSupplier<Tuple2<String, String>, ProcessorProperties> processorPropertiesSupplier() {
return properties.processorPropertiesSupplier();
@Override
public void run(String... args) {
this.messagingGateway.send("hello");
}

}