Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Stopping ServiceBusProcessor causes some messages to be left locked #21869

Closed
vbystricky21 opened this issue Jun 15, 2021 · 27 comments · Fixed by #24215
Closed

[BUG] Stopping ServiceBusProcessor causes some messages to be left locked #21869

vbystricky21 opened this issue Jun 15, 2021 · 27 comments · Fixed by #24215
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@vbystricky21
Copy link

Describe the bug
We use ServiceBusProcessor in PeekLock mode. Once processor is stopped/disposed it lefts often some messages locked in the queue. It seems caused by TaskCanceledException thrown in AmqpReceiver.ReceiveMessagesAsyncInternal. That gives us no chance to process already received message, but it is not abandoned either.
Changes introduced in Azure.Messaging.ServiceBus 7.2.0 beta by @JoshLove-msft doesn't seem to solve this issue as processor is stoped rather fast than correct.

Expected behavior
All messages can be processed or abandoned.

Actual behavior (include Exception or Stack Trace)
In flights messages remain locked.

Environment:

  • Azure.Messaging.ServiceBus [7.1.2]
  • Ubuntu 20.04.2 LTS
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Jun 15, 2021
@jsquire jsquire added Client This issue points to a problem in the data-plane of the library. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team Service Bus labels Jun 15, 2021
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Jun 15, 2021
@jsquire
Copy link
Member

jsquire commented Jun 15, 2021

Thank you for your feedback. Tagging and routing to the team member best able to assist.

@JoshLove-msft
Copy link
Member

JoshLove-msft commented Jun 15, 2021

Hi @vbystricky21,
Any chance you can include the code snippet you are using? Calling StopProcessingAsync would actually just stop receiving and delivering messages to your message handler, but would not close the underlying link (we should call this out in the docs). Calling CloseAsync/DisposeAsync will actually close the underlying links which should cause the message to be immediately available for receiving by another receiver/processor.

@JoshLove-msft JoshLove-msft added needs-author-feedback Workflow: More information is needed from author to address the issue. and removed needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team labels Jun 15, 2021
@vbystricky21
Copy link
Author

Hi @JoshLove-msft I see your point. We also expected message to be available, once we dispose everything, but our test suggest something else. Simple code snippet to reproduce described issue with Azure.Messaging.ServiceBus7.1.2 :

private static volatile int _counter = 0;
static async Task Main(string[] args)
{
    var connectionString = "";
    var queueName = "";
    //using var logger = AzureEventSourceListener.CreateConsoleLogger(EventLevel.Verbose);
    var sendingTask = SendMessages(connectionString, queueName);
    await ReceiveMessages(connectionString, queueName, TimeSpan.FromSeconds(2));
    await ReceiveMessages(connectionString, queueName, TimeSpan.FromSeconds(120));
    await sendingTask;
}

static async Task SendMessages(string connectionString, string queueName)
{
    Console.WriteLine("Sending started.");
    await using var client = new ServiceBusClient(connectionString);
    await using var sender = client.CreateSender(queueName);
    for (var i = 1; i <= 30; i++)
    {
        await sender.SendMessageAsync(new ServiceBusMessage(i.ToString()));
        await Task.Delay(100);
    }
    Console.WriteLine("Sending stopped.");
}

static async Task ReceiveMessages(string connectionString, string queueName, TimeSpan processingTime)
{
    Console.WriteLine("Receiving started.");
    var client = new ServiceBusClient(connectionString);

    var opts = new ServiceBusReceiverOptions()
    {
        ReceiveMode = ServiceBusReceiveMode.PeekLock
    };
    var processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions
    {
        AutoCompleteMessages = false,
        ReceiveMode = ServiceBusReceiveMode.PeekLock,
        MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
        PrefetchCount = 0,
        MaxConcurrentCalls = 4
    });

    processor.ProcessErrorAsync += async args =>
    {
        Console.WriteLine($"Error: {args.EntityPath} {args.Exception}");
    };
    processor.ProcessMessageAsync += async args =>
    {
       var tmp =  Interlocked.Increment(ref _counter);
        Console.WriteLine($"{tmp} Processing message: {args.Message.SequenceNumber}, queue time: {DateTime.Now - args.Message.EnqueuedTime}");
        await Task.Delay(300);
        await args.CompleteMessageAsync(args.Message);
    };

    await processor.StartProcessingAsync();
    await Task.Delay((int)processingTime.TotalMilliseconds);
    await processor.DisposeAsync();
    await client.DisposeAsync();
    Console.WriteLine("Receiving stopped.");
}

Output from our tests with lock duration set to 30 seconds:
Sending started.
Receiving started.
1 Processing message: 1042, queue time: 00:00:00.4325006
2 Processing message: 1043, queue time: 00:00:00.3970808
3 Processing message: 1044, queue time: 00:00:00.3986989
4 Processing message: 1045, queue time: 00:00:00.3961532
5 Processing message: 1046, queue time: 00:00:00.3996843
6 Processing message: 1047, queue time: 00:00:00.3973289
7 Processing message: 1048, queue time: 00:00:00.3957621
8 Processing message: 1049, queue time: 00:00:00.3967730
9 Processing message: 1050, queue time: 00:00:00.3976066
10 Processing message: 1051, queue time: 00:00:00.3935886
11 Processing message: 1052, queue time: 00:00:00.3906216
Receiving stopped.
Receiving started.
12 Processing message: 1055, queue time: 00:00:00.6746603
13 Processing message: 1054, queue time: 00:00:00.8147171
14 Processing message: 1056, queue time: 00:00:00.5178016
15 Processing message: 1057, queue time: 00:00:00.3958757
16 Processing message: 1058, queue time: 00:00:00.5981577
17 Processing message: 1059, queue time: 00:00:00.4413859
18 Processing message: 1060, queue time: 00:00:00.4006621
19 Processing message: 1061, queue time: 00:00:00.4001062
20 Processing message: 1062, queue time: 00:00:00.3985140
21 Processing message: 1063, queue time: 00:00:00.3979721
22 Processing message: 1064, queue time: 00:00:00.3950608
23 Processing message: 1065, queue time: 00:00:00.3983110
24 Processing message: 1066, queue time: 00:00:00.3960303
25 Processing message: 1067, queue time: 00:00:00.4164004
26 Processing message: 1068, queue time: 00:00:00.3914412
27 Processing message: 1069, queue time: 00:00:00.3926480
28 Processing message: 1070, queue time: 00:00:00.3919691
29 Processing message: 1071, queue time: 00:00:00.3936227
Sending stopped.
30 Processing message: 1053, queue time: 00:00:30.4354136
Receiving stopped.

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Jun 16, 2021
@jsquire
Copy link
Member

jsquire commented Jun 16, 2021

Hi @vbystricky21. It doesn't look like you're stopping the processor by calling StopProcessingAsync which, as @JoshLove-msft mentioned, is what allows it to gracefully shut down. Instead, you're just calling DisposeAsync which will abruptly close down connections, which does not allow the processor to clean up after itself.

If we alter your ReceiveMessagesAsync to request the processor stop, then your handler should be able to complete messages before the processor closes out the links and connections needed.

static async Task ReceiveMessages(string connectionString, string queueName, TimeSpan processingTime)
{
    Console.WriteLine("Receiving started.");
    var client = new ServiceBusClient(connectionString);

    var opts = new ServiceBusReceiverOptions()
    {
        ReceiveMode = ServiceBusReceiveMode.PeekLock
    };
    var processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions
    {
        AutoCompleteMessages = false,
        ReceiveMode = ServiceBusReceiveMode.PeekLock,
        MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
        PrefetchCount = 0,
        MaxConcurrentCalls = 4
    });

    processor.ProcessErrorAsync += async args =>
    {
        Console.WriteLine($"Error: {args.EntityPath} {args.Exception}");
    };
    processor.ProcessMessageAsync += async args =>
    {
       var tmp =  Interlocked.Increment(ref _counter);
        Console.WriteLine($"{tmp} Processing message: {args.Message.SequenceNumber}, queue time: {DateTime.Now - args.Message.EnqueuedTime}");
        await Task.Delay(300);
        await args.CompleteMessageAsync(args.Message);
    };

    await processor.StartProcessingAsync();
    await Task.Delay((int)processingTime.TotalMilliseconds);

    // -- STOP ADDED
    await processor.StopProcessingAsync();
    //--

    await processor.DisposeAsync();
    await client.DisposeAsync();
    Console.WriteLine("Receiving stopped.");
}

@JoshLove-msft
Copy link
Member

One thing to note - if calling processor.DisposeAsync it is not necessary to call processor.StopProcessingAsync.

@JoshLove-msft
Copy link
Member

Thanks for the code snippet, I will try to repro.

@jsquire
Copy link
Member

jsquire commented Jun 16, 2021

One thing to note - if calling processor.DisposeAsync it is not necessary to call processor.StopProcessingAsync.

Really? That's not my understanding; I'll follow-up offline.

Edit:
I stand corrected. I overlooked some changes in CloseAsync that obviated my understanding. Apologies for the confusion.

@JoshLove-msft
Copy link
Member

I was able to reproduce this. I think what is happening is that there is batching of the credits in the underlying AMQP library, since MaxConcurrentCalls is set to 4. So a single receive call may end up resulting in more messages being requested (I couldn't repro when using MaxConcurrentCalls of 1). However, I'm not sure why closing the link would not cause the message locks to be relinquished. I will have to follow up with the service team about this.

If you need this kind of ordering guarantee, you would need to use sessions. Even without this issue, it is possible your connection could drop and any messages that were already sent by the service would be locked, and potentially remain locked when reconnecting. With sessions, this is not possible as the lock is on the entire session. When using sessions, the session does NOT remain locked for the lock duration when the link is closed, so the issue does seem specific to message lock behavior on the service.

@vbystricky21
Copy link
Author

We don't need FIFO. In fact in production code we use multiple instances of producer that consume messages from same queue, so sessions are not an option for us (code snippet was just to reproduce the bug).
FYI: I was able simulate this with single receiver. Pretty much every time I hit highlighted line with described conditions one message remained locked. Unfortunately I can't provide reliable code snippet as the test demanded some threads freezing.


        private async Task> ReceiveMessagesAsyncInternal(
            int maxMessages,
            TimeSpan? maxWaitTime,
            TimeSpan timeout,
            CancellationToken cancellationToken)
        {
            var link = default(ReceivingAmqpLink);
            var amqpMessages = default(IEnumerable);
            var receivedMessages = new List();

            ThrowIfSessionLockLost();

            try
            {
                if (!_receiveLink.TryGetOpenedObject(out link))
                {
                    link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false);
                }
                cancellationToken.ThrowIfCancellationRequested();

                var messagesReceived = await Task.Factory.FromAsync
                (
                    (callback, state) => link.BeginReceiveRemoteMessages(
                        maxMessages,
                        TimeSpan.FromMilliseconds(20),
                        maxWaitTime ?? timeout,
                        callback,
                        state),
                    (asyncResult) => link.EndReceiveMessages(asyncResult, out amqpMessages),
                    TaskCreationOptions.RunContinuationsAsynchronously
                ).ConfigureAwait(false);

                // IF messagesReceived == true AND cancellationToken.IsCancellationRequested == true 
                // ONE MESSAGE REMAINS LOCKED
                cancellationToken.ThrowIfCancellationRequested();

                // If event messages were received, then package them for consumption and
                // return them.

                if ((messagesReceived) && (amqpMessages != null))
                {
                    foreach (AmqpMessage message in amqpMessages)
                    {
                        if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete)
                        {
                            link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
                        }
                        receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message));
                        message.Dispose();
                    }
                }

                return receivedMessages;
            }
            catch (Exception exception)
            {
                ExceptionDispatchInfo.Capture(AmqpExceptionHelper.TranslateException(
                    exception,
                    link?.GetTrackingId(),
                    null,
                    !cancellationToken.IsCancellationRequested && HasLinkCommunicationError(link)))
                .Throw();

                throw; // will never be reached
            }
        }

@JoshLove-msft
Copy link
Member

Ah, I see - thank you for tracking that down! I don't think there is value in honoring the cancellationToken at that point so we can probably just fix that. I am following up with the service team to try to figure out why the locks are not released when closing the receiver (as this should happen regardless of this fix).

@JoshLove-msft
Copy link
Member

Actually, that code has been updated in the 7.2 version, but the issue persists. I talked to the service team, and it appears that messages can end up being locked by the service after the client closes the link, but before the service can acknowledge that the link is closed. This was made more likely to occur to the issue you noted in 7.1.2 and similarly by how we handle cancellation in 7.2.x. It should be made less likely to occur (though still possible) once full cancellation support is added to the AMQP library.

@vbystricky21
Copy link
Author

So there is no plan to fix this bug, only to reduce its occurrence likelihood ? This quite a show stopper for our production.

Workaround would probably be not to honor cancelation token during message receiving and wait for message to be received or maxWaitTime to elapse. I know this goes against changes you made in 7.2.X, fixing the bug about long processor stopping, but in our case we prefer correct shutdown over its speed. Any chance you make this configurable somehow ?

Of the topic: There is kind of inconsistency in renew lock task cancelation. During shutdown, you cancel this task immediately even if message is still being processed in message handler. In case of auto complete message you always try to complete it, but lock can be already lost. Is there real reason not to cancel lock renewal after all message handling is done ? I know this can be solved in our code, but synchronization between producer shutdown and message handling needed for that, seems to be unnecessary.

@JoshLove-msft
Copy link
Member

Can you clarify why this is a show stopper? The messages will still get delivered but they are locked for the lock duration and the delivery count is incremented. In order to fully fix it, we would need updates on the service side to allow abandoning messages that are locked by a closed receiver.

In terms of the 7.2.x fix, it is still not correct if we simply wait for the timeout as the race condition is still possible, it just becomes less likely as you are polling for a much longer time (60s by default) than the window where this issue can occur.

re: lock renewal - that sounds like a bug and we should fix it.

@vbystricky21
Copy link
Author

In our case, we use many producers, which number varies according to SB queue length. So we typically start/stop some producers every few minutes, which leaves us with several locked messages. Every consumed message represents some OnDemand job we process for our customers . These jobs usually take under 1s, so 30s lock every few minutes degrades our performance.

@JoshLove-msft
Copy link
Member

Is it possible to reuse the same processor by just calling StopProcessing and then restarting using StartProcessing? This would use the same link so you wouldn't be locked out from the message.

@vbystricky21
Copy link
Author

Unfortunately no. Stopping producer in our architecture actually means deleting POD from Azure Kubernetes Services

@vbystricky21
Copy link
Author

Just to clarify things, any change to Azure.Messaging.ServiceBus or underlying Microsoft.Azure.Amqp can only mitigate the risk. And real fix would actually need change to Service Bus Service itself. Did I get it right ?

@JoshLove-msft
Copy link
Member

Correct, in order to prevent this from happening completely, we would need service updates. The issue can be mitigated through client side enhancements to Microsoft.Azure.Amqp to support draining the link credits once the processor is stopped.

@JoshLove-msft
Copy link
Member

This issue has been raised to the service team for future investigation.

@JoshLove-msft JoshLove-msft removed the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Aug 19, 2021
@vbystricky21
Copy link
Author

Hi @JoshLove-msft is there any update ?

@JoshLove-msft
Copy link
Member

@yvgopal do you know if there has been any investigation into this issue from the service side. I believe the issue is assigned to you.

@yvgopal
Copy link
Member

yvgopal commented Sep 3, 2021

you can send a drain flow when you don't want to receive any messages. That is supported in the service.

@JoshLove-msft
Copy link
Member

As discussed offline with @xinchen10, sending the drain flow would still lead to races where messages can be locked. In order to prevent this from occurring, we would need service support for unlocking messages on link close.

@yvgopal
Copy link
Member

yvgopal commented Sep 3, 2021

We can't do that. That would break backward compatibility for some customers. We know some customers who receive using AMQP but complete with the lock token using http.

@JoshLove-msft
Copy link
Member

I see, but it could be an optional setting, right?

@JoshLove-msft
Copy link
Member

JoshLove-msft commented Sep 21, 2021

There appears to be an issue in the underlying AMQP library that is making the behavior worse. I've submitted a PR. That alone would hopefully make the issue less likely, but there may be something we can do the in the Service Bus SDK as well.

@JoshLove-msft
Copy link
Member

I should clarify - I don't think it would be as simple as consuming the updated version of the AMQP library whenever it is available. We will also need an update in the SB library to make sure we are hitting that code path for cancellation.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants