Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compilation error after 4.0 branch changes #204

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package com.azure.spring.sample.eventhubs;

import com.azure.spring.eventhubs.core.EventHubsProcessorContainer;
import com.azure.spring.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.eventhubs.core.properties.EventHubsContainerProperties;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
Expand All @@ -22,22 +24,28 @@ public class EventHubIntegrationConfiguration {
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";


@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}

/**
* {@link EventHubsInboundChannelAdapter} binding with {@link MessageChannel} has name {@value INPUT_CHANNEL}
*
* @param inputChannel the MessageChannel binding with EventHubsInboundChannelAdapter
* @param processorContainer instance of EventHubsProcessorContainer
* @param listenerContainer instance of EventHubsProcessorContainer
* @return instance of EventHubsInboundChannelAdapter
*/
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsProcessorContainer processorContainer) {
public EventHubsInboundChannelAdapter messageChannelAdapter(@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
CheckpointConfig config = new CheckpointConfig(CheckpointMode.MANUAL);

EventHubsInboundChannelAdapter adapter =
new EventHubsInboundChannelAdapter(processorContainer, EVENTHUB_NAME,
CONSUMER_GROUP, config);
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(listenerContainer, config);
adapter.setOutputChannel(inputChannel);
return adapter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.azure.spring.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.servicebus.core.ServiceBusProducerFactory;
import com.azure.spring.servicebus.core.ServiceBusTemplate;
import com.azure.spring.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.servicebus.core.properties.ProcessorProperties;
import com.azure.spring.servicebus.core.properties.ProducerProperties;
import com.azure.spring.servicebus.core.properties.ServiceBusContainerProperties;
import com.azure.spring.servicebus.implementation.core.DefaultServiceBusNamespaceProcessorFactory;
import com.azure.spring.servicebus.implementation.core.DefaultServiceBusNamespaceProducerFactory;
import com.azure.spring.servicebus.support.converter.ServiceBusMessageConverter;
Expand Down Expand Up @@ -42,7 +43,7 @@ public class MultipleNamespacesAzureServiceBusMessagingAutoConfiguration {
private CustomizedServiceBusProperties properties;

/**
* Configure the {@link ServiceBusProcessorContainer}
* Configure the {@link ServiceBusProcessorFactory}
*/
@Configuration(proxyBeanMethods = false)
public static class ProcessorContainerConfiguration {
Expand All @@ -54,11 +55,6 @@ public ServiceBusProcessorFactory defaultServiceBusNamespaceProcessorFactory(
return new DefaultServiceBusNamespaceProcessorFactory(null, suppliers.getIfAvailable());
}

@Bean
@ConditionalOnMissingBean
public ServiceBusProcessorContainer messageProcessorContainer(ServiceBusProcessorFactory processorFactory) {
return new ServiceBusProcessorContainer(processorFactory);
}
}

/**
Expand Down Expand Up @@ -91,18 +87,27 @@ public ServiceBusTemplate serviceBusTemplate(ServiceBusProducerFactory senderCli
}
}

@Bean
public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(RECEIVE_QUEUE_NAME);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

/**
* {@link ServiceBusInboundChannelAdapter} binding with {@link MessageChannel} has name {@value INPUT_CHANNEL}
*
* @param inputChannel the MessageChannel binding with ServiceBusInboundChannelAdapter
* @param processorContainer instance of ServiceBusProcessorContainer
* @param listenerContainer instance of ServiceBusProcessorContainer
* @return instance of ServiceBusInboundChannelAdapter
*/
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, RECEIVE_QUEUE_NAME,
null, new CheckpointConfig(CheckpointMode.MANUAL));
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
ServiceBusMessageListenerContainer listenerContainer) {
CheckpointConfig checkpointConfig = new CheckpointConfig(CheckpointMode.MANUAL);

ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer, checkpointConfig);
adapter.setOutputChannel(inputChannel);
return adapter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.azure.spring.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.servicebus.core.properties.ServiceBusContainerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
Expand Down Expand Up @@ -40,11 +42,19 @@ public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) C
.subscribe();
}

@Bean("queue-listener-container")
public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(QUEUE_NAME);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, QUEUE_NAME,
null, new CheckpointConfig(CheckpointMode.MANUAL));
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier("queue-listener-container") ServiceBusMessageListenerContainer listenerContainer) {
CheckpointConfig checkpointConfig = new CheckpointConfig(CheckpointMode.MANUAL);
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer, checkpointConfig);
adapter.setOutputChannel(inputChannel);
return adapter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.azure.spring.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.servicebus.core.properties.ServiceBusContainerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
Expand Down Expand Up @@ -41,11 +43,20 @@ public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) C
.subscribe();
}

@Bean("topic-listener-container")
public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(TOPIC_NAME);
containerProperties.setSubscriptionName(SUBSCRIPTION_NAME);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

@Bean
public ServiceBusInboundChannelAdapter topicMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer topicOperation) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(topicOperation, TOPIC_NAME, SUBSCRIPTION_NAME,
new CheckpointConfig(CheckpointMode.MANUAL));
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier("topic-listener-container") ServiceBusMessageListenerContainer listenerContainer) {
CheckpointConfig checkpointConfig = new CheckpointConfig(CheckpointMode.MANUAL);
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer, checkpointConfig);
adapter.setOutputChannel(inputChannel);
return adapter;
}
Expand Down