Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Process messages of multiple sessions in parallel and keep ordering #27336

Closed
christian-vorhemus opened this issue Feb 28, 2022 · 5 comments
Assignees
Labels
bug This issue requires a change to an existing behavior in the product in order to be resolved. Client This issue points to a problem in the data-plane of the library. Service Bus
Milestone

Comments

@christian-vorhemus
Copy link
Member

christian-vorhemus commented Feb 28, 2022

We are using the Java azure-messaging-servicebus 7.5.2 Maven package. Our goal is to process multiple sessions in parallel and keep message ordering. For demonstration purpose, let's assume we put the following messages into an Azure Service Bus Queue:

• Message “a” to session “0”
• Message “b” to session “0”
• Message “c” to session “0”
• Message “d” to session “1”
• Message “e” to session “1”
• Message “f” to session “1”

The following class is used to receive the messages

public class App 
{
    static ServiceBusProcessorClient receiver;

    public void consume() {
        receiver.start();
    }

    public static void onMessage(ServiceBusReceivedMessageContext messageContext) {
        ServiceBusReceivedMessage serviceBusReceivedMessage = messageContext.getMessage();

        String m = serviceBusReceivedMessage.getBody().toString();
        Instant timestamp = Instant.now();
        System.out.println("[" + timestamp.toString() + "] Receceived " + m + " in session "+serviceBusReceivedMessage.getSessionId());

        try {
            // Simulate long running task
            Thread.currentThread().sleep(10000);
        }
        catch (Exception ex){
            ex.printStackTrace();
        }

        System.out.println("Completed");
        messageContext.complete();
    }

    public static void onError(ServiceBusErrorContext ctx) {
        ctx.getException().printStackTrace();
        System.out.println(ctx.getEntityPath());
        System.out.println("Doing: " + ctx.getErrorSource());
    }


    public static void main( String[] args )
    {
        System.out.println("Started");

        ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder builder = new ServiceBusClientBuilder().
                connectionString("Endpoint=sb://[...]").
                sessionProcessor().
                receiveMode(ServiceBusReceiveMode.PEEK_LOCK).
                disableAutoComplete().
                maxConcurrentSessions(2).
                maxConcurrentCalls(1).
                queueName("samplequeue_session").
                maxAutoLockRenewDuration(Duration.ofSeconds(30));
        builder.processMessage(App::onMessage);
        builder.processError(App::onError);
        receiver = builder.buildProcessorClient();

        receiver.start();

    }
}

this might print

[2022-02-28T16:43:31.504979200Z] Receceived a in session 0
Completed
[2022-02-28T16:44:41.575305200Z] Receceived b in session 0
Completed

so only one session is processed a time. In previous versions of the SDK, for example in 3.6.6, the following code...

public class App 
{
    static class MyReader implements ISessionHandler {
        public MyReader() {
        }
        @Override
        public CompletableFuture<Void> onMessageAsync(IMessageSession session, IMessage message) {
            try {
                byte[] body = message.getBody();
                String content = "";
                try {
                content = new String(body, "UTF-8");
                } catch(Exception e) {
                    System.out.println(e.toString());
                }
                Instant timestamp = Instant.now();
                System.out.println(
                        "[" + timestamp.toString() + "] Received " + content +" in session " + session.getSessionId()
                );
                
                try {
                    // Simulate long running task
                    Thread.currentThread().sleep(10000);
                }
                catch (Exception ex){
                    ex.printStackTrace();
                }

                System.out.println("Completed " + content);
                session.complete(message.getLockToken());
                return CompletableFuture.completedFuture(null);

            } catch (Exception e) {
                System.out.println(e.getMessage());
                try {
                    session.close();
                    throw new RuntimeException();
                } catch (ServiceBusException ex) {
                    throw new RuntimeException();
                }
            }

        }

        @Override
        public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession session) {
            System.out.println("closeing session " + session.getSessionId());
            return CompletableFuture.completedFuture(null);
        }

        @Override
        public void notifyException(Throwable exception, ExceptionPhase phase) {
            System.out.println(phase + "-" + exception);

        }
    }

    public static void main( String[] args )
    {
        System.out.println("Started");

        int maxConcurrentSessions = 2;
        int maxConcurrentCallsPerSession = 1;
        boolean autoComplete = false;
        Duration maxAutoRenewDuration = Duration.ofSeconds(5);

        try {
            QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder("Endpoint=sb://[...]", "samplequeue_session"), ReceiveMode.PEEKLOCK);
            ExecutorService executorService = Executors.newCachedThreadPool();
            SessionHandlerOptions sho = new SessionHandlerOptions(maxConcurrentSessions, maxConcurrentCallsPerSession, autoComplete, maxAutoRenewDuration);

            receiveClient.registerSessionHandler(new MyReader(), sho, executorService);
        } catch(Exception e) {
            System.out.printf("%s", e.toString());
        }

    }
}

... might lead to this output

[2022-02-28T17:07:14.525477500Z] Received a in session 0
[2022-02-28T17:07:14.745662Z] Received d in session 1
Completed a
Completed d
[2022-02-28T17:07:24.618105700Z] Received b in session 0
[2022-02-28T17:07:24.847110400Z] Received e in session 1
Completed b
Completed e

We see that messages in 2 sessions are processed in parallel and message ordering within a session is kept - as it should be. This opens up two questions:

  • What is the recommended way to get the same behavior we see in 3.6.6 in 7.5.2?
  • Why is the property called maxConcurrentSessions when there are not more sessions processed concurrently?

improve documentation

As part of this feature-work, make sure to improve the Javadoc on maxConcurrentSessions and maxConcurrentCalls

@ghost ghost added the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Feb 28, 2022
@joshfree joshfree added Client This issue points to a problem in the data-plane of the library. Service Bus labels Feb 28, 2022
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Feb 28, 2022
@joshfree joshfree added the question The issue doesn't require a change to the product in order to be resolved. Most issues start as that label Feb 28, 2022
@joshfree
Copy link
Member

@anuchandy could you assist @christian-vorhemus with their questions?

/cc @Azure/azsdk-sb-java

@conniey
Copy link
Member

conniey commented Mar 1, 2022

I took a quick look at this and we may have to move the parallelization up in the reactor chain. Current, we merge all the concurrent sessions into a single flux (ServiceBusSessionManager.java) and then parallelise the subscriber calls (ServiceBusProcessorClient.java) here.

@jwyseu
Copy link

jwyseu commented Apr 7, 2022

I think this ticket is kinda related to #24047
@anuchandy Could you verify? Maybe both can be tackled together?

@anuchandy
Copy link
Member

anuchandy commented Apr 8, 2022

Hi @jwyseu, we haven't got a chance to pick this task due to other priorities.

As this introduces behavior changes, and we want to re-think how we allocate the threads and max-cap for the thread-pool (impacting existing customers), the next step is to develop a design doc, brainstorm, and get the required approvals. Once that phase is over, we'll implement and update the documentation/samples to call out the change.

You're right that the issue you linked 24047 is a duplicate. I just closed it so that we have one ticket to track.

@anuchandy anuchandy changed the title Process messages of multiple sessions in parallel and keep ordering [FEATURE REQ] Process messages of multiple sessions in parallel and keep ordering Apr 26, 2022
@anuchandy anuchandy added the bug This issue requires a change to an existing behavior in the product in order to be resolved. label Apr 26, 2022
@anuchandy anuchandy changed the title [FEATURE REQ] Process messages of multiple sessions in parallel and keep ordering [BUG] Process messages of multiple sessions in parallel and keep ordering Apr 26, 2022
@conniey conniey removed the question The issue doesn't require a change to the product in order to be resolved. Most issues start as that label Apr 26, 2022
@liukun-msft liukun-msft self-assigned this Jul 25, 2022
@liukun-msft liukun-msft added this to the Backlog milestone Oct 25, 2022
@anuchandy
Copy link
Member

Identified why this is happening, opened a work item to rework session-processor here #33706. Closing this infavor of 33706

@github-actions github-actions bot locked and limited conversation to collaborators May 24, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug This issue requires a change to an existing behavior in the product in order to be resolved. Client This issue points to a problem in the data-plane of the library. Service Bus
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants