Skip to content

Commit

Permalink
Remove ServiceBusMessageProcessor (#12885)
Browse files Browse the repository at this point in the history
* Remove unneeded ServiceBusMessageProcessor.
  • Loading branch information
conniey authored Jul 8, 2020
1 parent f8c7aae commit 3221338
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 855 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,17 @@

package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageManagementOperations;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.ServiceBusMessageProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;

import static com.azure.core.util.FluxUtil.monoError;

Expand All @@ -32,22 +26,17 @@ class ServiceBusAsyncConsumer implements AutoCloseable {
private final String linkName;
private final ServiceBusReceiveLinkProcessor linkProcessor;
private final MessageSerializer messageSerializer;
private final ServiceBusMessageProcessor processor;
private final Flux<ServiceBusReceivedMessage> processor;

ServiceBusAsyncConsumer(String linkName, ServiceBusReceiveLinkProcessor linkProcessor,
MessageSerializer messageSerializer, boolean isAutoComplete, boolean autoLockRenewal,
Duration maxAutoLockRenewDuration, AmqpRetryOptions retryOptions,
BiFunction<String, String, Mono<Instant>> renewMessageLock) {
MessageSerializer messageSerializer) {
this.linkName = linkName;
this.linkProcessor = linkProcessor;
this.messageSerializer = messageSerializer;

final MessageManagement messageManagement = new MessageManagement(linkProcessor, renewMessageLock);

this.processor = linkProcessor
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class))
.subscribeWith(new ServiceBusMessageProcessor(linkName, isAutoComplete, autoLockRenewal,
maxAutoLockRenewDuration, retryOptions, linkProcessor.getErrorContext(), messageManagement));
.publish()
.autoConnect(1);
}

/**
Expand Down Expand Up @@ -88,29 +77,7 @@ Mono<Void> updateDisposition(String lockToken, DispositionStatus dispositionStat
@Override
public void close() {
if (!isDisposed.getAndSet(true)) {
processor.onComplete();
linkProcessor.cancel();
}
}

private static final class MessageManagement implements MessageManagementOperations {
private final ServiceBusReceiveLinkProcessor link;
private final BiFunction<String, String, Mono<Instant>> renewMessageLock;

private MessageManagement(ServiceBusReceiveLinkProcessor link,
BiFunction<String, String, Mono<Instant>> renewMessageLock) {
this.link = link;
this.renewMessageLock = renewMessageLock;
}

@Override
public Mono<Void> updateDisposition(String lockToken, DeliveryState deliveryState) {
return link.updateDisposition(lockToken, deliveryState);
}

@Override
public Mono<Instant> renewMessageLock(String lockToken, String associatedLinkName) {
return renewMessageLock.apply(lockToken, associatedLinkName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1262,9 +1262,7 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() {
new ServiceBusReceiveLinkProcessor(receiverOptions.getPrefetchCount(), retryPolicy, connectionProcessor,
context));
final ServiceBusAsyncConsumer newConsumer = new ServiceBusAsyncConsumer(linkName, linkMessageProcessor,
messageSerializer, false, receiverOptions.autoLockRenewalEnabled(),
receiverOptions.getMaxAutoLockRenewalDuration(), connectionProcessor.getRetryOptions(),
(token, associatedLinkName) -> renewMessageLock(token, associatedLinkName));
messageSerializer);

// There could have been multiple threads trying to create this async consumer when the result was null.
// If another one had set the value while we were creating this resource, dispose of newConsumer.
Expand All @@ -1283,16 +1281,6 @@ ReceiverOptions getReceiverOptions() {
return receiverOptions;
}

/**
* Renews the message lock, and updates its value in the container.
*/
private Mono<Instant> renewMessageLock(String lockToken, String linkName) {
return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(serviceBusManagementNode ->
serviceBusManagementNode.renewMessageLock(lockToken, linkName));
}

/**
* If the receiver has not connected via {@link #receiveMessages()}, all its current operations have been performed
* through the management node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.MessageLockContainer;
import com.azure.messaging.servicebus.implementation.MessageManagementOperations;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusMessageProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import reactor.core.Disposable;
Expand Down Expand Up @@ -75,10 +72,6 @@ class UnnamedSessionReceiver implements AutoCloseable {
this.renewSessionLock = renewSessionLock;
this.lockContainer = new MessageLockContainer(ServiceBusConstants.OPERATION_TIMEOUT);

final AmqpErrorContext errorContext = new LinkErrorContext(receiveLink.getHostname(),
receiveLink.getEntityPath(), null, null);
final SessionMessageManagement messageManagement = new SessionMessageManagement(receiveLink);

receiveLink.setEmptyCreditListener(() -> 1);

final Flux<ServiceBusReceivedMessageContext> receivedMessagesFlux = receiveLink
Expand All @@ -89,15 +82,15 @@ class UnnamedSessionReceiver implements AutoCloseable {
receiveLink.addCredits(prefetch);
})
.takeUntilOther(cancelReceiveProcessor)
.map(message -> messageSerializer.deserialize(message, ServiceBusReceivedMessage.class))
.subscribeWith(new ServiceBusMessageProcessor(receiveLink.getLinkName(), false, false,
Duration.ZERO, retryOptions, errorContext, messageManagement))
.map(message -> {
if (!CoreUtils.isNullOrEmpty(message.getLockToken())) {
lockContainer.addOrUpdate(message.getLockToken(), message.getLockedUntil());
final ServiceBusReceivedMessage deserialized = messageSerializer.deserialize(message,
ServiceBusReceivedMessage.class);

if (!CoreUtils.isNullOrEmpty(deserialized.getLockToken())) {
lockContainer.addOrUpdate(deserialized.getLockToken(), deserialized.getLockedUntil());
}

return new ServiceBusReceivedMessageContext(message);
return new ServiceBusReceivedMessageContext(deserialized);
})
.onErrorResume(error -> {
logger.warning("sessionId[{}]. Error occurred. Ending session.", sessionId, error);
Expand Down
Loading

0 comments on commit 3221338

Please sign in to comment.