Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUERY] Customer is not able to complete a service bus message using Azure Java SDK 7.15.2 V2 Stack, Disabling the V2 Stack they are able to complete a message with out issue #39913

Closed
satyayella opened this issue Apr 25, 2024 · 6 comments · Fixed by #39921
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@satyayella
Copy link

satyayella commented Apr 25, 2024

Query/Question

Customer is using Java SDK 7.15.2 (Later than 7.15 is V2 Stack)

Session receiver async client with V2 opt in

They receive message - defer it - receive the deferred message with same client with sequence number - complete call is failing

Error Call Stack:

{'az.sdk.message':'onLinkRemoteOpen','connectionId':'MF_eb2041_1713994904143','entityPath':'test-session-topic/subscriptions/test-session-receiver/$management','linkName':'test-session-topic/subscriptions/test-session-receiver-mgmt:receiver','remoteSource':'Source{address='test-session-topic/subscriptions/test-session-receiver/$management', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}'}

17:41:56.087 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - retrieve deferred message with id:5fff155c-7c23-416b-a88f-a306cca40583, seq num:2, message state:DEFERRED

17:41:56.087 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - Completed processing message:5fff155c-7c23-416b-a88f-a306cca40583, completing the message

17:41:56.099 [message-receiver-1] ERROR com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {'az.sdk.message':'Cannot process the disposition request to set the state as 'Accepted{}' for the delivery with delivery tag (id) 'b277a0f5-8ce6-4309-bd80-49a451084189'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap.','exception':'Cannot process the disposition request to set the state as 'Accepted{}' for the delivery with delivery tag (id) 'b277a0f5-8ce6-4309-bd80-49a451084189'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap.','connectionId':'MF_eb2041_1713994904143'}

17:41:56.102 [message-receiver-1] ERROR com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - complete failed
com.azure.messaging.servicebus.ServiceBusException: Cannot process the disposition request to set the state as 'Accepted{}' for the delivery with delivery tag (id) 'b277a0f5-8ce6-4309-bd80-49a451084189'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap.
at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$50(ServiceBusReceiverAsyncClient.java:1659) ~[azure-messaging-servicebus-7.15.2.jar:7.15.2]
at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3811) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.Mono.subscribe(Mono.java:4490) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4605) ~[reactor-core-3.4.34.jar:3.4.34] 

With out using V2 :

Complete is successful

17:43:09.707 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - Processing message with id:8d1896b7-2f3a-45dc-acde-f9e712862ce2 from session:99

17:43:09.707 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - message seq num:3
17:43:09.818 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - sleep 10 seconds to simulate long running task

17:43:19.913 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - retrieve deferred message with id:8d1896b7-2f3a-45dc-acde-f9e712862ce2, seq num:3, message state:DEFERRED

17:43:19.913 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - Completed processing message:8d1896b7-2f3a-45dc-acde-f9e712862ce2, completing the message

17:43:19.954 [reactor-executor-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - completed successfully

Why is this not a Bug or a feature Request?

We want to further know why the code sample does not work with V2 Opt In

Setup (please complete the following information if applicable):

  • OS: [e.g. iOS] NA
  • IDE: [e.g. IntelliJ] NA
  • Library/Libraries: [e.g. com.azure:azure-core:1.16.0 (groupId:artifactId:version)]

Libraries used:

import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import com.azure.core.util.Configuration;
import com.azure.core.util.ConfigurationBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

Sample Code:

@Slf4j
class SessionV2ConsumerDeferTest {

  private ServiceBusSessionReceiverAsyncClient sessionReceiverAsyncClient;
  private final Scheduler scheduler = Schedulers.newBoundedElastic(1,
      DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "message-receiver");
  private final String CONNECTION_STRING = System.getenv("LOCALRUN_SERVICEBUS_CONNECTIONSTRING");
  private Configuration configuration;

  void setUp() {
    ServiceBusClientBuilder serviceBusClientBuilder = new ServiceBusClientBuilder().connectionString(CONNECTION_STRING);
    String testSubscription = "test-session-receiver";
    String testTopic = "test-session-topic";
    String SESSION_RECEIVER_V2_CONFIG = "com.azure.messaging.servicebus.session.reactor.asyncReceive.v2";
    configuration = new ConfigurationBuilder().putProperty(SESSION_RECEIVER_V2_CONFIG, "true").build();
    sessionReceiverAsyncClient = createSessionReceiverAsyncClient(serviceBusClientBuilder, testTopic, testSubscription);
  }

  private ServiceBusSessionReceiverAsyncClient createSessionReceiverAsyncClient(ServiceBusClientBuilder serviceBusClientBuilder, String testTopic, String testSubscription) {
    return serviceBusClientBuilder
        .configuration(configuration)
        .sessionReceiver()
        .topicName(testTopic)
        .subscriptionName(testSubscription)
        .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
        .disableAutoComplete()
        .prefetchCount(0)
        .maxAutoLockRenewDuration(Duration.ofSeconds(300)) // 5 minutes
        .buildAsyncClient();
  }

  @Test
  void testSessionConsumerV2_deferredMessage() throws InterruptedException {
    // setup session receiver async client
   setUp();

   // accept a session and start receiving
   Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiverAsyncClient.acceptNextSession();

   Flux<Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient>> sessionMessages = Flux.usingWhen(receiverMono,
       receiver -> {
         LOGGER.info("Receiving messages from session:{}", receiver.getSessionId());
         Flux<Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient>> receivedMessageFlux = receiver.receiveMessages()
             .publishOn(scheduler, 1)
             .zipWith(Mono.just(receiver).cache().repeat());
         return receivedMessageFlux.switchOnFirst((signal, serviceBusReceivedMessageFlux) -> {
           if (signal.hasValue()) {
             LOGGER.debug("Adding timeout of {} millis to close the client if no new messages available for the current session",
                 50000);
             return serviceBusReceivedMessageFlux.timeout(Duration.ofMillis(50000), scheduler);
           }
           return serviceBusReceivedMessageFlux;
         });
       },
       receiver -> Mono.fromRunnable(receiver::close));

    CoreSubscriber<Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient>> subscriber =
        createSubscriber();
    sessionMessages.subscribe(subscriber);

    // wait for the test
    TimeUnit.SECONDS.sleep(600);
 }

  private CoreSubscriber<Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient>> createSubscriber() {

    return new CoreSubscriber<>() {
      private Subscription subscription;

      @Override
      public void onSubscribe(@NonNull Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
      }

      @Override
      public void onNext(Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient> receivedMessageClientTuple) {
        ServiceBusReceivedMessage message = receivedMessageClientTuple.getT1();
        ServiceBusReceivedMessage retrievedDeferredMsg = null;
        ServiceBusReceiverAsyncClient receiverAsyncClient = receivedMessageClientTuple.getT2();
        LOGGER.info("Processing message with id:{} from session:{}", message.getMessageId(), receiverAsyncClient.getSessionId());
        try {
          var msgSeqNum = message.getSequenceNumber();
          LOGGER.info("message seq num:{}", msgSeqNum);
          receiverAsyncClient.defer(message).block(Duration.ofSeconds(10));
          LOGGER.info("sleep 10 seconds to simulate long running task");
          TimeUnit.SECONDS.sleep(10);
          retrievedDeferredMsg = receiverAsyncClient.receiveDeferredMessage(msgSeqNum).block();
          assert retrievedDeferredMsg != null;
          LOGGER.info("retrieve deferred message with id:{}, seq num:{}, message state:{}",
              retrievedDeferredMsg.getMessageId(),
              retrievedDeferredMsg.getSequenceNumber(),
              retrievedDeferredMsg.getState().toString());
        } catch (Exception e) {
          LOGGER.error("exception", e);
        }
        assert retrievedDeferredMsg != null;
        LOGGER.info("Completed processing message:{}, completing the message", retrievedDeferredMsg.getMessageId());
        receiverAsyncClient.complete(retrievedDeferredMsg).subscribe(
            null,
            error -> LOGGER.error("complete failed", error),
            () -> LOGGER.info("completed successfully")
        );
        this.subscription.request(1);
      }

      @Override
      public void onError(Throwable throwable) {
        LOGGER.error("Error occurred", throwable);
      }

      @Override
      public void onComplete() {
        LOGGER.info("completed receiving messages for this session.");
      }
    };
  }
}

We want to know why this code fails with V2 Stack enabled and how to correct it.

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [Yes] Query Added
  • [Yes] Setup information Added
@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus labels Apr 25, 2024
Copy link

@anuchandy @conniey @lmolkova

Copy link

Thank you for your feedback. Tagging and routing to the team member best able to assist.

@anuchandy anuchandy self-assigned this Apr 25, 2024
@anuchandy
Copy link
Member

@satyayella, thanks for the report, I'm taking a look.

@anuchandy
Copy link
Member

@satyayella, thanks for the code and I can repro this. Looking into addressing this.

@manishtulsiani
Copy link

Thanks @anuchandy

@anuchandy
Copy link
Member

This is addressed now, thank you for reporting!

@github-actions github-actions bot locked and limited conversation to collaborators Jul 25, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants