Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Abandoned messages in a session queue are not made re-available immediately which disrupts the ordered processing of session based messages #24064

Closed
3 tasks done
ankitrajsingh opened this issue Sep 10, 2021 · 10 comments · Fixed by #29696
Assignees
Labels
bug This issue requires a change to an existing behavior in the product in order to be resolved. Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Service Bus
Milestone

Comments

@ankitrajsingh
Copy link

ankitrajsingh commented Sep 10, 2021

Describe the bug
Upon abandoning a message in a session based queue, the message is not made re-available immediately upon next receive operation. It becomes re-available after some of the messages with higher sequence number have been processed. Hence the ordering/sequencing of the messages are broken.

Exception or Stack Trace
No Exception Raised

To Reproduce
Steps to reproduce the behavior:

  1. Produce messages (at least 10) against a session in the session enabled queue having max delivery count of at least 3.
  2. Create a message receiving callback with the logic to randomly abandon some of the messages received and log the messages along with their sequence number and session id.
  3. Create a session based ServiceBusProcessorClient with the message callback defined in the previous step.
  4. Start the session based ServiceBusProcessorClient.
  5. Search the logs for any single abandoned message and record the sequence id.
  6. Search for the repeated occurrences of this sequence id in the entire log generated. The occurrence of such logs will be interleaved with the log entry of messages having greater sequence number.

Code Snippet

//Custom Message Payload
public class Order {

  public Integer orderNo;

  public Order(Integer orderNo) {
    this.orderNo = orderNo;
  }

  public Integer setOrderNo(Integer orderNo) {
    this.orderNo = orderNo;
  }

  public Integer getOrderNo() {
    return this.orderNo;
  }

}




Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
    ServiceBusReceivedMessage message = context.getMessage();
    Order order = null;
    try {
      order = objectMapper.readValue(message.getBody().toString(), Order.class);
      log.info("Processing message. Session: {}, Sequence #: {}, orderNo: {}", message.getSessionId(), message.getSequenceNumber(), order.orderNo);
      //abandoning orders have ids as multiple of 4
      if (order.orderNo%4 == 0) {
        context.abandon();
      } else {
        context.complete();
      }
    } catch (Exception e) {
      log.error("Error in objectmapper", e);
      context.abandon();
    }
};

Consumer<ServiceBusErrorContext> onError = context -> {
  System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
                    context.getFullyQualifiedNamespace(), context.getEntityPath());
  if (context.getException() instanceof ServiceBusException) {
    ServiceBusException exception = (ServiceBusException) context.getException();
    System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
                      exception.getReason());
  } else {
    System.out.printf("Error occurred: %s%n", context.getException());
  }
};

ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
        .connectionString("<<conn_string>>")
        .sessionProcessor()
        .prefetchCount(0)
        .queueName("<<queue_name>>")
        .maxConcurrentSessions(1)
        .processMessage(onMessage)
        .processError(onError)
        .disableAutoComplete()
        .buildProcessorClient();

// Start the processor in the background
sessionProcessor.start();

Expected behavior
Scenario: Say two ordered message exist in a session with sequence number 1 and 2. Upon processing message with seq no. 1, please assume it is abandoned.

Expectation: Message with seq no. 1 is to be made re-available immediately on next receive operation. Message having seq no. 2 should only be made available when message with seq no. 1 has either been delivered successfully(completed) or has exceeded its max delivery limit.

Actual: Message with seq no. 2 is being made available on the next receive operation. Only after it has been processed, the message with seq no. 1 is made re-available for reprocessing.

Screenshots
Please see the coloured rectangles(two different colours). Each colour corresponds to a single message being redelivered on abandoning it. The rectangles belonging to a particular abandoned message is interleaved with logs of messages having greater seq number.

image

Setup (please complete the following information):

  • OS: MacOS
  • IDE: IntelliJ
  • Library/Libraries: com.azure:azure-messaging-servicebus:7.4.0
  • Java version: 11
  • App Server/Environment: Tomcat
  • Frameworks: Spring Boot

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Sep 10, 2021
@ankitrajsingh ankitrajsingh changed the title [BUG] Abandoned messages in a session are not made available immediately which disrupts the ordering [BUG] Abandoned messages in a session queue are not made available immediately which disrupts the ordered processing of session based messages Sep 10, 2021
@ankitrajsingh ankitrajsingh changed the title [BUG] Abandoned messages in a session queue are not made available immediately which disrupts the ordered processing of session based messages [BUG] Abandoned messages in a session queue are not made re-available immediately which disrupts the ordered processing of session based messages Sep 11, 2021
@ankitrajsingh
Copy link
Author

ankitrajsingh commented Sep 11, 2021

  • This works upto some extent (not sure about distributed env behaviour) if we use low level session receiver instead and close the receiver object after abandoning a message, processing one message at a time. Sample snippet attached below.
  • Closing the receiver object causes the lock to be released at session level on the ASB side and session is made visible to all the clients for locking and listening again.
  • Any client which locks onto the same session once it is visible again, is made to replay the previously abandoned message immediately. This maintains the ordered processing of messages.

It feels like an issue of either

  • Message lock/disposition not getting released/propagated in time by the client in order to replay message immediately without releasing session lock OR
  • The client itself is somehow pre-calculating the next message in sequence to fetch without taking into account that the current message can be abandoned and needs to replayed by polling from ASB.

Sample Snippet for iteratively closing the low level receiver on message complete or abandon-

ServiceBusSessionReceiverClient sessionReceiverClient = new ServiceBusClientBuilder()
            .connectionString("<<connection_string>>")
            .sessionReceiver()
            .disableAutoComplete()
            .queueName("<<queue_name>>")
            .buildClient();

while (true) {
  ServiceBusReceiverClient sessionReceiver = sessionReceiverClient.acceptNextSession();
  sessionReceiver.receiveMessages(1).forEach(message -> {
      try {
        Order order = objectMapper.readValue(message.getBody().toString(), Order.class);
        log.info("Processing message. Session: {}, Sequence #: {}, orderNo: {}", message.getSessionId(), message.getSequenceNumber(), order.orderNo);
        if (order.orderNo%4 == 0) {
          sessionReceiver.abandon(message);
        } else {
          sessionReceiver.complete(message);
        }
      } catch (Exception e) {
        log.error("Error in objectmapper", e);
        sessionReceiver.abandon(message);
      }
  });
 sessionReceiver.close();
}

@joshfree joshfree added Client This issue points to a problem in the data-plane of the library. Service Bus labels Sep 13, 2021
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Sep 13, 2021
@joshfree
Copy link
Member

@anuchandy could you please follow up with @ankitrajsingh?

@ankitrajsingh
Copy link
Author

Hi @anuchandy, please let me know if something else is needed on the issue. Would be great if you can help us out. Thanks!

@ankitrajsingh
Copy link
Author

Hi @joshfree @anuchandy , could you guys please assist with this issue?

@ramya-rao-a
Copy link
Contributor

Thanks for reporting @ankitrajsingh

@anuchandy, @conniey and I discussed this offline and wondered if there are any service limitation here in providing the abandoned message immediately. I tried our JS client and am able to get the abandoned message in the next receive operation. So, this must be a bug with the Java SDK

@anuchandy, @conniey While the behavior in the processor client can be explained by us adding a credit on the line before the user settles a message, is that the same thing that is happening with the low level receiver client as well?

@anuchandy anuchandy added this to the Backlog milestone Apr 26, 2022
@anuchandy anuchandy modified the milestones: Backlog, [2022] July Apr 26, 2022
@anuchandy anuchandy removed their assignment Apr 26, 2022
@conniey conniey added bug This issue requires a change to an existing behavior in the product in order to be resolved. and removed question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Apr 26, 2022
@liukun-msft
Copy link
Contributor

Hi @ankitrajsingh, I am working on this issue and below are some updates:

I can repro the issue by using ServiceBusProcessorClient, but not by using ServiceBusReceiverClient. If possible, can you also provide the detailed logs when using ServiceBusReceiverClient?

The root cause has been identified: As we are using reactor Flux.publishOn and Flux.merge(), if we link these two operators, processor client will internally change receive and consume progress in the asynchronized way. The client will request next message after first message is received (not consumed). Thus, when the first message is abandoned, the next coming message is not correct.

However, there is a follow up similar issue when concurrent session number > 1. I am still working on the fix and will keep updates.

@ankitrajsingh
Copy link
Author

HI @liukun-msft , at this point of time i would not be able to provide the logs for ServiceBusReceiverClient as i have left the project, but yes I can confirm that it works with ServiceBusReceiverClient as mentioned in my detailed comment: #24064 (comment)

Btw, Thanks for long needed update. And your reasoning seems correct and right to the point as myself was able to reach same conclusion when debugging the library.

The client will request next message after first message is received (**not consumed**).

@liukun-msft
Copy link
Contributor

When we set maxConcurrentSession > 1, there is another known issue #27336 that only one session messages are processed at a time.

Current, we use Flux.merge() to merge all sessions into one Flux and use parallel() to process messages in parallel. However, Use Flux.merge() + parrallel() does not mean that we can process message from different sessions independently. When a message emitted by Flux.merge() fails, it will be saved in an internal queue and wait for next request. Hence, it is possible that the client process messages only for one session at a time (messages received from other sessions may be put into the queue).

When a message is saved to the queue, Flux.merge() will prefetch a new message and not waiting for it to be consumed. Thus, abandon message could have the same ordering issue.

I'll take some time to summarize related docs and re-think the design of session receiver.

@ankitrajsingh
Copy link
Author

ankitrajsingh commented Jul 4, 2022

Hi @liukun-msft , The issue you described was raised way back by me.
Please see if it helps you: 24027
PS: The issue 27336 is a duplicate of my issue, hence my issue was closed

@liukun-msft
Copy link
Contributor

Hi @ankitrajsingh, thanks so much for providing that. All the issues you reported are really helpful!
I'll check all the related issues.

@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug This issue requires a change to an existing behavior in the product in order to be resolved. Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Service Bus
Projects
None yet
6 participants