diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/properties/AzureServiceBusProperties.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/properties/AzureServiceBusProperties.java index 32bd7973038e0..f1b081ec59987 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/properties/AzureServiceBusProperties.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/properties/AzureServiceBusProperties.java @@ -20,8 +20,6 @@ import java.util.Objects; import java.util.stream.Stream; -import static com.azure.spring.cloud.service.implementation.core.PropertiesValidator.validateNamespace; - /** * */ @@ -160,7 +158,7 @@ public static class Consumer extends AzureServiceBusCommonProperties implements /** * Whether to enable auto-complete. */ - private Boolean autoComplete; + private Boolean autoComplete = true; /** * Prefetch count of the consumer. */ diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/core/properties/EventHubsConsumerProperties.java b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/core/properties/EventHubsConsumerProperties.java index 0d68e68c5f859..1b3fecad0c060 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/core/properties/EventHubsConsumerProperties.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/core/properties/EventHubsConsumerProperties.java @@ -4,7 +4,7 @@ package com.azure.spring.cloud.stream.binder.eventhubs.core.properties; import com.azure.spring.messaging.eventhubs.core.properties.ProcessorProperties; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; /** * diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/core/properties/EventHubsConsumerPropertiesTests.java b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/core/properties/EventHubsConsumerPropertiesTests.java index d34c2101760cb..f4f9c1ab3ea05 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/core/properties/EventHubsConsumerPropertiesTests.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/core/properties/EventHubsConsumerPropertiesTests.java @@ -4,8 +4,8 @@ package com.azure.spring.cloud.stream.binder.eventhubs.core.properties; import com.azure.messaging.eventhubs.LoadBalancingStrategy; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import com.azure.spring.cloud.service.eventhubs.properties.LoadBalancingProperties; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/EventHubsHealthIndicatorTests.java b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/EventHubsHealthIndicatorTests.java index dc6b809fafa7e..03a23d9bfbf00 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/EventHubsHealthIndicatorTests.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/EventHubsHealthIndicatorTests.java @@ -11,8 +11,8 @@ import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsProducerProperties; import com.azure.spring.cloud.stream.binder.eventhubs.core.provisioning.EventHubsChannelProvisioner; import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate; import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProducerFactory; import com.azure.storage.blob.BlobContainerAsyncClient; diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/config/EventHubsBinderConfigurationTests.java b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/config/EventHubsBinderConfigurationTests.java index e13837fba13d4..d1d729ac0e71a 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/config/EventHubsBinderConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/config/EventHubsBinderConfigurationTests.java @@ -17,7 +17,7 @@ import com.azure.spring.cloud.stream.binder.eventhubs.core.provisioning.EventHubsChannelProvisioner; import com.azure.spring.cloud.stream.binder.eventhubs.provisioning.EventHubsChannelResourceManagerProvisioner; import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer; import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties; import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties; diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus-core/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/core/properties/ServiceBusConsumerProperties.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus-core/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/core/properties/ServiceBusConsumerProperties.java index 1a9ba2d058f10..ff0945f27bf46 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus-core/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/core/properties/ServiceBusConsumerProperties.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus-core/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/core/properties/ServiceBusConsumerProperties.java @@ -3,7 +3,6 @@ package com.azure.spring.cloud.stream.binder.servicebus.core.properties; -import com.azure.spring.messaging.checkpoint.CheckpointMode; import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties; /** @@ -13,26 +12,6 @@ public class ServiceBusConsumerProperties extends ProcessorProperties { private boolean requeueRejected = false; - private CheckpointMode checkpointMode = CheckpointMode.RECORD; - - /** - * Get checkpoint mode. - * - * @return checkpointMode the checkpoint mode - */ - public CheckpointMode getCheckpointMode() { - return checkpointMode; - } - - /** - * Set checkpoint mode. - * - * @param checkpointMode the checkpoint mode - */ - public void setCheckpointMode(CheckpointMode checkpointMode) { - this.checkpointMode = checkpointMode; - } - /** * Controls if the failed messages are routed to the DLQ * diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus-core/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/core/properties/ServiceBusConsumerPropertiesTests.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus-core/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/core/properties/ServiceBusConsumerPropertiesTests.java index 5411d4b4644ee..70c250a0176fc 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus-core/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/core/properties/ServiceBusConsumerPropertiesTests.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus-core/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/core/properties/ServiceBusConsumerPropertiesTests.java @@ -5,19 +5,19 @@ import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.azure.messaging.servicebus.models.SubQueue; -import com.azure.spring.messaging.checkpoint.CheckpointMode; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Duration; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; class ServiceBusConsumerPropertiesTests { - static final String CONNECTION_STRING = "Endpoint=sb://%s.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=key"; private ServiceBusConsumerProperties consumerProperties; @BeforeEach @@ -26,25 +26,25 @@ void beforeEach() { } @Test - void checkpointModeDefaults() { - assertEquals(CheckpointMode.RECORD, consumerProperties.getCheckpointMode()); + void autoCompleteDefaultTrue() { + assertTrue(consumerProperties.getAutoComplete()); } @Test - void customCheckpointMode() { - consumerProperties.setCheckpointMode(CheckpointMode.BATCH); - assertEquals(CheckpointMode.BATCH, consumerProperties.getCheckpointMode()); + void customizeAutoComplete() { + consumerProperties.setAutoComplete(false); + assertFalse(consumerProperties.getAutoComplete()); } @Test void requeueRejectedDefaultsToFalse() { - assertEquals(false, consumerProperties.isRequeueRejected()); + assertFalse(consumerProperties.isRequeueRejected()); } @Test void customRequeueRejected() { consumerProperties.setRequeueRejected(true); - assertEquals(true, consumerProperties.isRequeueRejected()); + assertTrue(consumerProperties.isRequeueRejected()); } @Test diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/ServiceBusMessageChannelBinder.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/ServiceBusMessageChannelBinder.java index 7fe2ba49f1e7e..aef96e1245428 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/ServiceBusMessageChannelBinder.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/ServiceBusMessageChannelBinder.java @@ -15,11 +15,10 @@ import com.azure.spring.integration.core.implementation.instrumentation.InstrumentationSendCallback; import com.azure.spring.integration.core.instrumentation.Instrumentation; import com.azure.spring.integration.core.instrumentation.InstrumentationManager; -import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter; import com.azure.spring.integration.servicebus.implementation.health.ServiceBusProcessorInstrumentation; +import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter; import com.azure.spring.messaging.ConsumerIdentifier; import com.azure.spring.messaging.PropertiesSupplier; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory; import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate; import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer; @@ -135,7 +134,6 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(destination.getName()); containerProperties.setSubscriptionName(group); - containerProperties.setCheckpointConfig(new CheckpointConfig(properties.getExtension().getCheckpointMode())); ServiceBusMessageListenerContainer listenerContainer = new ServiceBusMessageListenerContainer(getProcessorFactory(), containerProperties); diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/ServiceBusHealthIndicatorTests.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/ServiceBusHealthIndicatorTests.java index 7d52a4599f271..fbb6381ba890e 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/ServiceBusHealthIndicatorTests.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/ServiceBusHealthIndicatorTests.java @@ -12,7 +12,6 @@ import com.azure.spring.integration.core.handler.DefaultMessageHandler; import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter; import com.azure.spring.messaging.AzureHeaders; -import com.azure.spring.messaging.checkpoint.CheckpointMode; import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate; import com.azure.spring.messaging.servicebus.implementation.core.DefaultServiceBusNamespaceProducerFactory; import org.junit.jupiter.api.BeforeEach; @@ -169,7 +168,6 @@ private void prepareConsumerProperties() { serviceBusConsumerProperties.setEntityName(ENTITY_NAME); serviceBusConsumerProperties.setEntityType(ServiceBusEntityType.QUEUE); serviceBusConsumerProperties.setNamespace(NAMESPACE_NAME); - serviceBusConsumerProperties.setCheckpointMode(CheckpointMode.RECORD); ServiceBusBindingProperties bindingProperties = new ServiceBusBindingProperties(); bindingProperties.setConsumer(serviceBusConsumerProperties); diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/config/ServiceBusBinderConfigurationTests.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/config/ServiceBusBinderConfigurationTests.java index a821df6953ed9..ede28ff56c5bd 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/config/ServiceBusBinderConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/config/ServiceBusBinderConfigurationTests.java @@ -14,7 +14,6 @@ import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusProducerProperties; import com.azure.spring.cloud.stream.binder.servicebus.core.provisioning.ServiceBusChannelProvisioner; import com.azure.spring.cloud.stream.binder.servicebus.provisioning.ServiceBusChannelResourceManagerProvisioner; -import com.azure.spring.messaging.checkpoint.CheckpointMode; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; @@ -98,7 +97,6 @@ void testExtendedBindingPropertiesShouldBind() { "spring.cloud.stream.servicebus.bindings.input.consumer.max-concurrent-calls=5", "spring.cloud.stream.servicebus.bindings.input.consumer.max-concurrent-sessions=6", "spring.cloud.stream.servicebus.bindings.input.consumer.requeue-rejected=true", - "spring.cloud.stream.servicebus.bindings.input.consumer.checkpoint-mode=BATCH", "spring.cloud.stream.servicebus.bindings.input.producer.domain-name=fake-producer-domain", "spring.cloud.stream.servicebus.bindings.input.producer.namespace=fake-producer-namespace", @@ -131,7 +129,6 @@ void testExtendedBindingPropertiesShouldBind() { assertEquals(Duration.ofSeconds(2), consumerProperties.getMaxAutoLockRenewDuration()); assertEquals(5, consumerProperties.getMaxConcurrentCalls()); assertTrue(consumerProperties.isRequeueRejected()); - assertEquals(CheckpointMode.BATCH, consumerProperties.getCheckpointMode()); ServiceBusProducerProperties producerProperties = extendedBindingProperties.getExtendedProducerProperties("input"); diff --git a/sdk/spring/spring-integration-azure-eventhubs/src/main/java/com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapter.java b/sdk/spring/spring-integration-azure-eventhubs/src/main/java/com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapter.java index 89dcfe65d3849..6c45358f0fa7e 100644 --- a/sdk/spring/spring-integration-azure-eventhubs/src/main/java/com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapter.java +++ b/sdk/spring/spring-integration-azure-eventhubs/src/main/java/com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapter.java @@ -16,8 +16,8 @@ import com.azure.spring.messaging.AzureHeaders; import com.azure.spring.messaging.ListenerMode; import com.azure.spring.messaging.checkpoint.AzureCheckpointer; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import com.azure.spring.messaging.checkpoint.Checkpointer; import com.azure.spring.messaging.converter.AzureMessageConverter; import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer; diff --git a/sdk/spring/spring-integration-azure-eventhubs/src/test/java/com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapterTests.java b/sdk/spring/spring-integration-azure-eventhubs/src/test/java/com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapterTests.java index 5642ff757b99c..68f5ccb3eb662 100644 --- a/sdk/spring/spring-integration-azure-eventhubs/src/test/java/com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapterTests.java +++ b/sdk/spring/spring-integration-azure-eventhubs/src/test/java/com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapterTests.java @@ -17,8 +17,8 @@ import com.azure.spring.integration.core.instrumentation.Instrumentation; import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation; import com.azure.spring.messaging.ListenerMode; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import com.azure.spring.messaging.converter.AbstractAzureMessageConverter; import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory; import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer; diff --git a/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java b/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java index 17e19faed9195..7db986f6b9d19 100644 --- a/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java +++ b/sdk/spring/spring-integration-azure-servicebus/src/main/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapter.java @@ -7,17 +7,15 @@ import com.azure.messaging.servicebus.ServiceBusMessage; import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; +import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusErrorHandler; import com.azure.spring.integration.core.instrumentation.Instrumentation; import com.azure.spring.integration.core.instrumentation.InstrumentationManager; import com.azure.spring.integration.servicebus.implementation.health.ServiceBusProcessorInstrumentation; import com.azure.spring.messaging.AzureHeaders; import com.azure.spring.messaging.ListenerMode; import com.azure.spring.messaging.checkpoint.AzureCheckpointer; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; import com.azure.spring.messaging.checkpoint.Checkpointer; import com.azure.spring.messaging.converter.AzureMessageConverter; -import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusErrorHandler; import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer; import com.azure.spring.messaging.servicebus.implementation.core.listener.adapter.RecordMessagingMessageListenerAdapter; import com.azure.spring.messaging.servicebus.support.ServiceBusMessageHeaders; @@ -62,7 +60,7 @@ * ServiceBusProcessorFactory processorFactory) { * ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); * containerProperties.setEntityName("RECEIVE_QUEUE_NAME"); - * containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); + * containerProperties.setAutoComplete(false); * return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); * } * @@ -79,17 +77,15 @@ public class ServiceBusInboundChannelAdapter extends MessageProducerSupport { private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener(); private final ServiceBusMessageListenerContainer listenerContainer; private final ListenerMode listenerMode; - private final CheckpointConfig checkpointConfig; private InstrumentationManager instrumentationManager; private String instrumentationId; + private final boolean isAutoComplete; private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s"; - private static final String MSG_SUCCESS_CHECKPOINT = "Checkpointed %s in %s mode"; /** - * Construct a {@link ServiceBusInboundChannelAdapter} with the specified {@link ServiceBusMessageListenerContainer} - * and {@link CheckpointConfig}. + * Construct a {@link ServiceBusInboundChannelAdapter} with the specified {@link ServiceBusMessageListenerContainer}. * - * @param listenerContainer the processor container + * @param listenerContainer the message listener container. */ public ServiceBusInboundChannelAdapter(ServiceBusMessageListenerContainer listenerContainer) { this(listenerContainer, ListenerMode.RECORD); @@ -97,16 +93,16 @@ public ServiceBusInboundChannelAdapter(ServiceBusMessageListenerContainer listen /** * Construct a {@link ServiceBusInboundChannelAdapter} with the specified {@link ServiceBusMessageListenerContainer} - * ,{@link ListenerMode} and {@link CheckpointConfig}. - * @param listenerContainer the processor container + * ,{@link ListenerMode}. + * + * @param listenerContainer the message listener container. * @param listenerMode the listen mode */ public ServiceBusInboundChannelAdapter(ServiceBusMessageListenerContainer listenerContainer, ListenerMode listenerMode) { this.listenerContainer = listenerContainer; this.listenerMode = listenerMode; - CheckpointConfig containerCheckpointConfig = listenerContainer.getContainerProperties().getCheckpointConfig(); - this.checkpointConfig = containerCheckpointConfig == null ? new CheckpointConfig() : containerCheckpointConfig; + this.isAutoComplete = !Boolean.FALSE.equals(listenerContainer.getContainerProperties().getAutoComplete()); } @Override @@ -196,7 +192,7 @@ public void onMessage(ServiceBusReceivedMessageContext messageContext) { Map headers = new HashMap<>(); headers.put(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT, messageContext); - if (checkpointConfig.getMode() == CheckpointMode.MANUAL) { + if (!isAutoComplete) { headers.put(AzureHeaders.CHECKPOINTER, checkpointer); } @@ -204,12 +200,10 @@ public void onMessage(ServiceBusReceivedMessageContext messageContext) { payloadType); sendMessage(message); - if (checkpointConfig.getMode() == CheckpointMode.RECORD) { + if (isAutoComplete) { checkpointer.success() - .doOnSuccess(t -> - LOGGER.debug(String.format(MSG_SUCCESS_CHECKPOINT, message, checkpointConfig.getMode()))) - .doOnError(t -> - LOGGER.warn(String.format(MSG_FAIL_CHECKPOINT, message), t)) + .doOnSuccess(t -> LOGGER.debug("Settled {} with autocomplete enabled.", message)) + .doOnError(t -> LOGGER.warn(String.format(MSG_FAIL_CHECKPOINT, message), t)) .block(); } } diff --git a/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java b/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java index ceb8629336e74..5395872ca9cf6 100644 --- a/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java +++ b/sdk/spring/spring-integration-azure-servicebus/src/test/java/com/azure/spring/integration/servicebus/inbound/ServiceBusInboundChannelAdapterTests.java @@ -16,8 +16,6 @@ import com.azure.spring.integration.core.instrumentation.Instrumentation; import com.azure.spring.integration.servicebus.implementation.health.ServiceBusProcessorInstrumentation; import com.azure.spring.messaging.ListenerMode; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; import com.azure.spring.messaging.converter.AbstractAzureMessageConverter; import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory; import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer; @@ -75,7 +73,6 @@ public void setUp() { @Test void defaultRecordListenerMode() { - this.containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.RECORD)); ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter( new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties)); assertThat(channelAdapter).hasFieldOrPropertyWithValue("listenerMode", ListenerMode.RECORD); @@ -83,7 +80,6 @@ void defaultRecordListenerMode() { @Test void batchListenerModeDoesNotSupport() { - this.containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.BATCH)); ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter( new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties), ListenerMode.BATCH); @@ -123,7 +119,6 @@ void setPayloadType() { @Test void sendAndReceive() throws InterruptedException { - this.containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.RECORD)); ServiceBusMessageListenerContainer listenerContainer = new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties); ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer); @@ -169,7 +164,6 @@ void sendAndReceive() throws InterruptedException { @Test void instrumentationErrorHandler() { - this.containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.RECORD)); DefaultInstrumentationManager instrumentationManager = new DefaultInstrumentationManager(); ServiceBusMessageListenerContainer listenerContainer = new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties); diff --git a/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/CheckpointConfig.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/checkpoint/CheckpointConfig.java similarity index 98% rename from sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/CheckpointConfig.java rename to sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/checkpoint/CheckpointConfig.java index 1fba51d71d808..92c59bc2159ff 100644 --- a/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/CheckpointConfig.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/checkpoint/CheckpointConfig.java @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.spring.messaging.checkpoint; +package com.azure.spring.messaging.eventhubs.core.checkpoint; import java.time.Duration; diff --git a/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/CheckpointMode.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/checkpoint/CheckpointMode.java similarity index 92% rename from sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/CheckpointMode.java rename to sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/checkpoint/CheckpointMode.java index d4843aec445c0..87e41581d8efd 100644 --- a/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/CheckpointMode.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/checkpoint/CheckpointMode.java @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.spring.messaging.checkpoint; +package com.azure.spring.messaging.eventhubs.core.checkpoint; import com.azure.spring.messaging.ListenerMode; diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/checkpoint/package-info.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/checkpoint/package-info.java new file mode 100644 index 0000000000000..60667178e008e --- /dev/null +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/checkpoint/package-info.java @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * Event Hubs checkpoint configs. + */ +package com.azure.spring.messaging.eventhubs.core.checkpoint; diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/properties/EventHubsContainerProperties.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/properties/EventHubsContainerProperties.java index f9ddf39914728..99176038d0511 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/properties/EventHubsContainerProperties.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/properties/EventHubsContainerProperties.java @@ -7,7 +7,7 @@ import com.azure.messaging.eventhubs.models.InitializationContext; import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler; import com.azure.spring.cloud.service.listener.MessageListener; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; import java.util.function.Consumer; diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/BatchCheckpointManager.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/BatchCheckpointManager.java index 8cc5a7aaf290e..68c5f14408cd8 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/BatchCheckpointManager.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/BatchCheckpointManager.java @@ -5,8 +5,8 @@ import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.models.EventBatchContext; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/CheckpointManagers.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/CheckpointManagers.java index e00eb3493aa49..e38505a58e1fb 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/CheckpointManagers.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/CheckpointManagers.java @@ -4,7 +4,7 @@ package com.azure.spring.messaging.eventhubs.implementation.checkpoint; import com.azure.spring.messaging.ListenerMode; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/EventCheckpointManager.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/EventCheckpointManager.java index 541f11a043f49..459eda0bf3c65 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/EventCheckpointManager.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/EventCheckpointManager.java @@ -7,7 +7,7 @@ import com.azure.messaging.eventhubs.models.EventBatchContext; import com.azure.messaging.eventhubs.models.EventContext; import com.azure.spring.messaging.eventhubs.implementation.support.EventDataHelper; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; import org.slf4j.Logger; /** diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/ManualCheckpointManager.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/ManualCheckpointManager.java index 38c56623a0973..a4e53de541982 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/ManualCheckpointManager.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/ManualCheckpointManager.java @@ -4,8 +4,8 @@ package com.azure.spring.messaging.eventhubs.implementation.checkpoint; import com.azure.messaging.eventhubs.models.EventContext; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/PartitionCountCheckpointManager.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/PartitionCountCheckpointManager.java index fb9815c4bc84e..81d7d70d8a8a7 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/PartitionCountCheckpointManager.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/PartitionCountCheckpointManager.java @@ -4,8 +4,8 @@ package com.azure.spring.messaging.eventhubs.implementation.checkpoint; import com.azure.messaging.eventhubs.models.EventContext; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/RecordCheckpointManager.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/RecordCheckpointManager.java index 30dfcb1759263..fbafcba639360 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/RecordCheckpointManager.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/RecordCheckpointManager.java @@ -4,8 +4,8 @@ package com.azure.spring.messaging.eventhubs.implementation.checkpoint; import com.azure.messaging.eventhubs.models.EventContext; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/TimeCheckpointManager.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/TimeCheckpointManager.java index 95eda1df9a677..1990aa716da93 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/TimeCheckpointManager.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/implementation/checkpoint/TimeCheckpointManager.java @@ -4,8 +4,8 @@ package com.azure.spring.messaging.eventhubs.implementation.checkpoint; import com.azure.messaging.eventhubs.models.EventContext; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig; +import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; diff --git a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ConsumerProperties.java b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ConsumerProperties.java index 175224c9f86a1..a365e12a3270e 100644 --- a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ConsumerProperties.java +++ b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ConsumerProperties.java @@ -15,7 +15,7 @@ public class ConsumerProperties extends CommonProperties implements ServiceBusReceiverClientProperties { private Boolean sessionEnabled; - private Boolean autoComplete; + private Boolean autoComplete = true; private Integer prefetchCount; private SubQueue subQueue = SubQueue.NONE; private ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PEEK_LOCK; @@ -80,8 +80,8 @@ public ServiceBusReceiveMode getReceiveMode() { } /** - * Set the receive mode. - * @param receiveMode the receive mode. + * Set the receiving mode. + * @param receiveMode the receiving mode. */ public void setReceiveMode(ServiceBusReceiveMode receiveMode) { this.receiveMode = receiveMode; diff --git a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ServiceBusContainerProperties.java b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ServiceBusContainerProperties.java index 9ec81132ea6a3..8856bf1bde24f 100644 --- a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ServiceBusContainerProperties.java +++ b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ServiceBusContainerProperties.java @@ -5,7 +5,6 @@ import com.azure.spring.cloud.service.listener.MessageListener; import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusErrorHandler; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; /** * The properties to describe a Service Bus listener container. @@ -14,7 +13,6 @@ public class ServiceBusContainerProperties extends ProcessorProperties { private MessageListener messageListener; private ServiceBusErrorHandler errorHandler; - private CheckpointConfig checkpointConfig = new CheckpointConfig(); /** * Get the message listener of the container. @@ -48,19 +46,4 @@ public void setErrorHandler(ServiceBusErrorHandler errorHandler) { this.errorHandler = errorHandler; } - /** - * Get the checkpoint config. - * @return the checkpoint config. - */ - public CheckpointConfig getCheckpointConfig() { - return checkpointConfig; - } - - /** - * Set the checkpoint config. - * @param checkpointConfig the checkpoint config. - */ - public void setCheckpointConfig(CheckpointConfig checkpointConfig) { - this.checkpointConfig = checkpointConfig; - } } diff --git a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/implementation/core/DefaultServiceBusNamespaceProcessorFactory.java b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/implementation/core/DefaultServiceBusNamespaceProcessorFactory.java index 6bbc45e6971ae..233b43994ca15 100644 --- a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/implementation/core/DefaultServiceBusNamespaceProcessorFactory.java +++ b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/implementation/core/DefaultServiceBusNamespaceProcessorFactory.java @@ -18,6 +18,7 @@ import com.azure.spring.messaging.ConsumerIdentifier; import com.azure.spring.messaging.PropertiesSupplier; import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory; +import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer; import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties; import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties; import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties; @@ -133,7 +134,26 @@ public ServiceBusProcessorClient createProcessor(String topic, String subscripti return doCreateProcessor(topic, subscription, messageListener, errorHandler, processorProperties); } - private ServiceBusProcessorClient doCreateProcessor(String name, String subscription, + /** + * Create the {@link ServiceBusProcessorClient} with given name, subscription, message listener, error handler, and + * properties. + * + *

+ * This {@link ServiceBusProcessorClient} created from this method will disable the autocomplete, because this + * processor client is used as the delegate in {@link ServiceBusMessageListenerContainer} and we want the listener + * container or any upper layer of the {@link ServiceBusMessageListenerContainer} to handle the settlement of a + * Service Bus message. + * + * @param name the queue name of topic name. + * @param subscription the subscription name. + * @param messageListener the message listener. + * @param errorHandler the error handler. + * @param properties the properties of the processor. + * + * @return the processor client. + */ + private ServiceBusProcessorClient doCreateProcessor(String name, + @Nullable String subscription, @NonNull MessageListener messageListener, @NonNull ServiceBusErrorHandler errorHandler, @Nullable ProcessorProperties properties) { diff --git a/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointable.java b/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointable.java deleted file mode 100644 index f25df012d008e..0000000000000 --- a/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointable.java +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.spring.messaging.checkpoint; - -/** - * Support checkpoint by setting {@link CheckpointConfig} - */ -public interface Checkpointable { - - /** - * Get a {@link CheckpointConfig} instance. - * @return a CheckpointConfig instance. - */ - CheckpointConfig getCheckpointConfig(); - - /** - * Set a {@link CheckpointConfig} instance. - * @param checkpointConfig a CheckpointConfig instance. - */ - void setCheckpointConfig(CheckpointConfig checkpointConfig); -} diff --git a/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointer.java b/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointer.java index de4d8ed0c1783..8ec3a7c432703 100644 --- a/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointer.java +++ b/sdk/spring/spring-messaging-azure/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointer.java @@ -7,7 +7,8 @@ /** * A callback to perform checkpoint for received messages. - * When the {@link CheckpointMode#MANUAL} mode is used, {@link Checkpointer} will be put in messages as the header + * When a manual checkpoint mode is used in Event Hubs or autoComplete is set to false in Service Bus, + * {@link Checkpointer} will be put in messages as the header * {@link com.azure.spring.messaging.AzureHeaders#CHECKPOINTER}. * *

diff --git a/sdk/spring/spring-messaging-azure/src/test/java/com/azure/spring/messaging/core/SendSubscribeOperationTests.java b/sdk/spring/spring-messaging-azure/src/test/java/com/azure/spring/messaging/core/SendSubscribeOperationTests.java deleted file mode 100644 index 29391f51052e4..0000000000000 --- a/sdk/spring/spring-messaging-azure/src/test/java/com/azure/spring/messaging/core/SendSubscribeOperationTests.java +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.spring.messaging.core; - -import com.azure.spring.messaging.AzureHeaders; -import com.azure.spring.messaging.checkpoint.CheckpointConfig; -import com.azure.spring.messaging.checkpoint.CheckpointMode; -import com.azure.spring.messaging.checkpoint.Checkpointer; -import com.azure.spring.messaging.support.pojo.User; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.GenericMessage; - -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - - -public abstract class SendSubscribeOperationTests { - - protected String destination = "test"; - protected String partitionId = "1"; - protected T sendSubscribeOperation; - private Map headers = new HashMap<>(); - protected List> messages = IntStream.range(1, 5) - .mapToObj(String::valueOf) - .map(User::new) - .map(u -> new GenericMessage<>(u, headers)) - .collect(Collectors.toList()); - private String payload = "payload"; - private Message byteMessage = new GenericMessage<>(payload.getBytes(StandardCharsets.UTF_8), headers); - private Message stringMessage = new GenericMessage<>(payload, headers); - protected User user = new User(payload); - protected Message userMessage = new GenericMessage<>(user, headers); - - protected abstract void setCheckpointConfig(CheckpointConfig checkpointConfig); - - @BeforeEach - public abstract void setUp() throws Exception; - - protected abstract void subscribe(String destination, Consumer> consumer, Class payloadType); - - @Test - public void testSendByte() { - subscribe(destination, this::byteHandler, byte[].class); - sendSubscribeOperation.sendAsync(destination, byteMessage); - } - - @Test - public void testSendReceiveWithManualCheckpointMode() { - setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); - subscribe(destination, this::manualCheckpointHandler, User.class); - sendSubscribeOperation.sendAsync(destination, userMessage); - } - - @Test - public void testSendReceiveWithRecordCheckpointMode() { - setCheckpointConfig(new CheckpointConfig(CheckpointMode.RECORD)); - subscribe(destination, this::recordCheckpointHandler, User.class); - messages.forEach(m -> sendSubscribeOperation.sendAsync(destination, m)); - verifyCheckpointSuccessCalled(messages.size()); - } - - @Test - public void testSendString() { - subscribe(destination, this::stringHandler, String.class); - sendSubscribeOperation.sendAsync(destination, stringMessage); - } - - @Test - public void testSendUser() { - subscribe(destination, this::userHandler, User.class); - sendSubscribeOperation.sendAsync(destination, userMessage); - } - - @Deprecated - protected abstract void verifyCheckpointBatchSuccessCalled(int times); - - protected void verifyCheckpointFailure(Checkpointer checkpointer) { - checkpointer.failure(); - verifyCheckpointFailureCalled(1); - } - - protected abstract void verifyCheckpointFailureCalled(int times); - - protected void verifyCheckpointSuccess(Checkpointer checkpointer) { - checkpointer.success(); - verifyCheckpointSuccessCalled(1); - } - - protected abstract void verifyCheckpointSuccessCalled(int times); - - private void byteHandler(Message message) { - assertEquals(payload, new String((byte[]) message.getPayload(), StandardCharsets.UTF_8)); - } - - protected void manualCheckpointHandler(Message message) { - assertTrue(message.getHeaders().containsKey(AzureHeaders.CHECKPOINTER)); - Checkpointer checkpointer = message.getHeaders().get(AzureHeaders.CHECKPOINTER, Checkpointer.class); - assertNotNull(checkpointer); - verifyCheckpointSuccess(checkpointer); - verifyCheckpointFailure(checkpointer); - } - - private void recordCheckpointHandler(Message message) { - // - } - - private void stringHandler(Message message) { - assertEquals(payload, message.getPayload()); - } - - private void userHandler(Message message) { - assertEquals(user, message.getPayload()); - } - - public T getSendSubscribeOperation() { - return sendSubscribeOperation; - } - - public void setSendSubscribeOperation(T sendSubscribeOperation) { - this.sendSubscribeOperation = sendSubscribeOperation; - } - - public String getPartitionId() { - return partitionId; - } - - public void setPartitionId(String partitionId) { - this.partitionId = partitionId; - } - - protected void waitMillis(long millis) { - if (millis <= 0) { - millis = 30; - } - try { - Thread.sleep(millis); - } catch (InterruptedException ignore) { - - } - } -}