Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove checkpoint from Spring Cloud Azure Service Bus #27615

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.util.Objects;
import java.util.stream.Stream;

import static com.azure.spring.cloud.service.implementation.core.PropertiesValidator.validateNamespace;

/**
*
*/
Expand Down Expand Up @@ -160,7 +158,7 @@ public static class Consumer extends AzureServiceBusCommonProperties implements
/**
* Whether to enable auto-complete.
*/
private Boolean autoComplete;
private Boolean autoComplete = true;
/**
* Prefetch count of the consumer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.spring.cloud.stream.binder.eventhubs.core.properties;

import com.azure.spring.messaging.eventhubs.core.properties.ProcessorProperties;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package com.azure.spring.cloud.stream.binder.eventhubs.core.properties;

import com.azure.messaging.eventhubs.LoadBalancingStrategy;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
import com.azure.spring.cloud.service.eventhubs.properties.LoadBalancingProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsProducerProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.provisioning.EventHubsChannelProvisioner;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProducerFactory;
import com.azure.storage.blob.BlobContainerAsyncClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.azure.spring.cloud.stream.binder.eventhubs.core.provisioning.EventHubsChannelProvisioner;
import com.azure.spring.cloud.stream.binder.eventhubs.provisioning.EventHubsChannelResourceManagerProvisioner;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.spring.cloud.stream.binder.servicebus.core.properties;

import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties;

/**
Expand All @@ -13,26 +12,6 @@ public class ServiceBusConsumerProperties extends ProcessorProperties {

private boolean requeueRejected = false;

private CheckpointMode checkpointMode = CheckpointMode.RECORD;

/**
* Get checkpoint mode.
*
* @return checkpointMode the checkpoint mode
*/
public CheckpointMode getCheckpointMode() {
return checkpointMode;
}

/**
* Set checkpoint mode.
*
* @param checkpointMode the checkpoint mode
*/
public void setCheckpointMode(CheckpointMode checkpointMode) {
this.checkpointMode = checkpointMode;
}

/**
* Controls if the failed messages are routed to the DLQ
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@

import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ServiceBusConsumerPropertiesTests {

static final String CONNECTION_STRING = "Endpoint=sb://%s.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=key";
private ServiceBusConsumerProperties consumerProperties;

@BeforeEach
Expand All @@ -26,25 +26,25 @@ void beforeEach() {
}

@Test
void checkpointModeDefaults() {
assertEquals(CheckpointMode.RECORD, consumerProperties.getCheckpointMode());
void autoCompleteDefaultTrue() {
assertTrue(consumerProperties.getAutoComplete());
}

@Test
void customCheckpointMode() {
consumerProperties.setCheckpointMode(CheckpointMode.BATCH);
assertEquals(CheckpointMode.BATCH, consumerProperties.getCheckpointMode());
void customizeAutoComplete() {
consumerProperties.setAutoComplete(false);
assertFalse(consumerProperties.getAutoComplete());
}

@Test
void requeueRejectedDefaultsToFalse() {
assertEquals(false, consumerProperties.isRequeueRejected());
assertFalse(consumerProperties.isRequeueRejected());
}

@Test
void customRequeueRejected() {
consumerProperties.setRequeueRejected(true);
assertEquals(true, consumerProperties.isRequeueRejected());
assertTrue(consumerProperties.isRequeueRejected());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
import com.azure.spring.integration.core.implementation.instrumentation.InstrumentationSendCallback;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.integration.servicebus.implementation.health.ServiceBusProcessorInstrumentation;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.ConsumerIdentifier;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
Expand Down Expand Up @@ -135,7 +134,6 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(destination.getName());
containerProperties.setSubscriptionName(group);
containerProperties.setCheckpointConfig(new CheckpointConfig(properties.getExtension().getCheckpointMode()));

ServiceBusMessageListenerContainer listenerContainer = new ServiceBusMessageListenerContainer(getProcessorFactory(), containerProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.azure.spring.integration.core.handler.DefaultMessageHandler;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
import com.azure.spring.messaging.servicebus.implementation.core.DefaultServiceBusNamespaceProducerFactory;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -169,7 +168,6 @@ private void prepareConsumerProperties() {
serviceBusConsumerProperties.setEntityName(ENTITY_NAME);
serviceBusConsumerProperties.setEntityType(ServiceBusEntityType.QUEUE);
serviceBusConsumerProperties.setNamespace(NAMESPACE_NAME);
serviceBusConsumerProperties.setCheckpointMode(CheckpointMode.RECORD);
ServiceBusBindingProperties bindingProperties = new ServiceBusBindingProperties();
bindingProperties.setConsumer(serviceBusConsumerProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusProducerProperties;
import com.azure.spring.cloud.stream.binder.servicebus.core.provisioning.ServiceBusChannelProvisioner;
import com.azure.spring.cloud.stream.binder.servicebus.provisioning.ServiceBusChannelResourceManagerProvisioner;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
Expand Down Expand Up @@ -98,7 +97,6 @@ void testExtendedBindingPropertiesShouldBind() {
"spring.cloud.stream.servicebus.bindings.input.consumer.max-concurrent-calls=5",
"spring.cloud.stream.servicebus.bindings.input.consumer.max-concurrent-sessions=6",
"spring.cloud.stream.servicebus.bindings.input.consumer.requeue-rejected=true",
"spring.cloud.stream.servicebus.bindings.input.consumer.checkpoint-mode=BATCH",

"spring.cloud.stream.servicebus.bindings.input.producer.domain-name=fake-producer-domain",
"spring.cloud.stream.servicebus.bindings.input.producer.namespace=fake-producer-namespace",
Expand Down Expand Up @@ -131,7 +129,6 @@ void testExtendedBindingPropertiesShouldBind() {
assertEquals(Duration.ofSeconds(2), consumerProperties.getMaxAutoLockRenewDuration());
assertEquals(5, consumerProperties.getMaxConcurrentCalls());
assertTrue(consumerProperties.isRequeueRejected());
assertEquals(CheckpointMode.BATCH, consumerProperties.getCheckpointMode());

ServiceBusProducerProperties producerProperties =
extendedBindingProperties.getExtendedProducerProperties("input");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.checkpoint.AzureCheckpointer;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
import com.azure.spring.messaging.converter.AbstractAzureMessageConverter;
import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusErrorHandler;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.integration.servicebus.implementation.health.ServiceBusProcessorInstrumentation;
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.checkpoint.AzureCheckpointer;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusErrorHandler;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.implementation.core.listener.adapter.RecordMessagingMessageListenerAdapter;
import com.azure.spring.messaging.servicebus.support.ServiceBusMessageHeaders;
Expand Down Expand Up @@ -62,7 +60,7 @@
* ServiceBusProcessorFactory processorFactory) {
* ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
* containerProperties.setEntityName("RECEIVE_QUEUE_NAME");
* containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
* containerProperties.setAutoComplete(false);
* return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
* }
*
Expand All @@ -79,34 +77,32 @@ public class ServiceBusInboundChannelAdapter extends MessageProducerSupport {
private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
private final ServiceBusMessageListenerContainer listenerContainer;
private final ListenerMode listenerMode;
private final CheckpointConfig checkpointConfig;
private InstrumentationManager instrumentationManager;
private String instrumentationId;
private final boolean isAutoComplete;
private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s";
private static final String MSG_SUCCESS_CHECKPOINT = "Checkpointed %s in %s mode";

/**
* Construct a {@link ServiceBusInboundChannelAdapter} with the specified {@link ServiceBusMessageListenerContainer}
* and {@link CheckpointConfig}.
* Construct a {@link ServiceBusInboundChannelAdapter} with the specified {@link ServiceBusMessageListenerContainer}.
*
* @param listenerContainer the processor container
* @param listenerContainer the message listener container.
*/
public ServiceBusInboundChannelAdapter(ServiceBusMessageListenerContainer listenerContainer) {
this(listenerContainer, ListenerMode.RECORD);
}

/**
* Construct a {@link ServiceBusInboundChannelAdapter} with the specified {@link ServiceBusMessageListenerContainer}
* ,{@link ListenerMode} and {@link CheckpointConfig}.
* @param listenerContainer the processor container
* ,{@link ListenerMode}.
*
* @param listenerContainer the message listener container.
* @param listenerMode the listen mode
*/
public ServiceBusInboundChannelAdapter(ServiceBusMessageListenerContainer listenerContainer,
ListenerMode listenerMode) {
this.listenerContainer = listenerContainer;
this.listenerMode = listenerMode;
CheckpointConfig containerCheckpointConfig = listenerContainer.getContainerProperties().getCheckpointConfig();
this.checkpointConfig = containerCheckpointConfig == null ? new CheckpointConfig() : containerCheckpointConfig;
this.isAutoComplete = !Boolean.FALSE.equals(listenerContainer.getContainerProperties().getAutoComplete());
}

@Override
Expand Down Expand Up @@ -196,20 +192,18 @@ public void onMessage(ServiceBusReceivedMessageContext messageContext) {
Map<String, Object> headers = new HashMap<>();
headers.put(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT, messageContext);

if (checkpointConfig.getMode() == CheckpointMode.MANUAL) {
if (!isAutoComplete) {
headers.put(AzureHeaders.CHECKPOINTER, checkpointer);
}

Message<?> message = getMessageConverter().toMessage(messageContext.getMessage(), new MessageHeaders(headers),
payloadType);
sendMessage(message);

if (checkpointConfig.getMode() == CheckpointMode.RECORD) {
if (isAutoComplete) {
checkpointer.success()
.doOnSuccess(t ->
LOGGER.debug(String.format(MSG_SUCCESS_CHECKPOINT, message, checkpointConfig.getMode())))
.doOnError(t ->
LOGGER.warn(String.format(MSG_FAIL_CHECKPOINT, message), t))
.doOnSuccess(t -> LOGGER.debug("Settled {} with autocomplete enabled.", message))
.doOnError(t -> LOGGER.warn(String.format(MSG_FAIL_CHECKPOINT, message), t))
.block();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.servicebus.implementation.health.ServiceBusProcessorInstrumentation;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.converter.AbstractAzureMessageConverter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
Expand Down Expand Up @@ -75,15 +73,13 @@ public void setUp() {

@Test
void defaultRecordListenerMode() {
this.containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.RECORD));
ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(
new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties));
assertThat(channelAdapter).hasFieldOrPropertyWithValue("listenerMode", ListenerMode.RECORD);
}

@Test
void batchListenerModeDoesNotSupport() {
this.containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.BATCH));
ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(
new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties),
ListenerMode.BATCH);
Expand Down Expand Up @@ -123,7 +119,6 @@ void setPayloadType() {

@Test
void sendAndReceive() throws InterruptedException {
this.containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.RECORD));
ServiceBusMessageListenerContainer listenerContainer =
new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties);
ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(listenerContainer);
Expand Down Expand Up @@ -169,7 +164,6 @@ void sendAndReceive() throws InterruptedException {

@Test
void instrumentationErrorHandler() {
this.containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.RECORD));
DefaultInstrumentationManager instrumentationManager = new DefaultInstrumentationManager();
ServiceBusMessageListenerContainer listenerContainer =
new ServiceBusMessageListenerContainer(this.processorFactory, this.containerProperties);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.messaging.checkpoint;
package com.azure.spring.messaging.eventhubs.core.checkpoint;

import java.time.Duration;

Expand Down
Loading