Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service Bus: System.ObjectDisposedException when opening a receiver link #19731

Closed
kanatgit opened this issue Mar 23, 2021 · 34 comments
Closed
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@kanatgit
Copy link

kanatgit commented Mar 23, 2021

Hi,

image

I am getting thousands of ObjectDisplosedException a day throw out of Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenReceiverLinkAsync() at line 312.

image

By looking into the servicebus source code, the problem is the ActiveConnection was disposed. The version of Azure.Messaging.ServiceBus is 7.1.0.0. Is this an issue of servicebus code? Any suggestion? My sample code is provided at the end.

Thanks

Exception Detail:
{"HResult":-2146232798,"Message":"Cannot access a disposed object.\r\nObject name: 'FaultTolerantAmqpObject1'.","Source":"Microsoft.Azure.Amqp","ObjectName":"FaultTolerantAmqpObject1","Type":"System.ObjectDisposedException"}

Call Stack:

System.ObjectDisposedException:
   at Microsoft.Azure.Amqp.Singleton`1+<GetOrCreateAsync>d__13.MoveNext (Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope+<OpenReceiverLinkAsync>d__54.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<OpenReceiverLinkAsync>d__30.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1+<OnCreateAsync>d__6.MoveNext (Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Microsoft.Azure.Amqp.Singleton`1+<GetOrCreateAsync>d__13.MoveNext (Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Microsoft.Azure.Amqp.Singleton`1+<GetOrCreateAsync>d__13.MoveNext (Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<ReceiveMessagesAsyncInternal>d__34.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<ReceiveMessagesAsyncInternal>d__34.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<>c__DisplayClass33_0+<<ReceiveMessagesAsync>b__0>d.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy+<RunOperation>d__20.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy+<RunOperation>d__20.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<ReceiveMessagesAsync>d__33.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.ServiceBusReceiver+<ReceiveMessagesAsync>d__37.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.ServiceBusReceiver+<ReceiveMessageAsync>d__39.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
   at Azure.Messaging.ServiceBus.ReceiverManager+<ReceiveAndProcessMessagesAsync>d__19.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)

My sample code:

    public class QueueWorker<TMessage> : BackgroundService where TMessage : class
    {
        private readonly IProcessQueueMessages<TMessage> _messageProcessor;

        private readonly ServiceBusClient _serviceBusClient;

        protected ILogger<QueueWorker<TMessage>> Logger { get; }

        public QueueWorker(ILogger<QueueWorker<TMessage>> logger, IProcessQueueMessages<TMessage> messageProcessor, ServiceBusClient serviceBusClient)
        {
            Logger = logger;
            _messageProcessor = messageProcessor;
            _serviceBusClient = serviceBusClient;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var queueName = _messageProcessor.QueueName;
            if(string.IsNullOrWhiteSpace(queueName))
            {
                Logger.LogWarning(_messageProcessor.QueueNameMissingWarningMessage);
                return;
            }

            var messageProcessor = CreateServiceBusProcessor(queueName);
            messageProcessor.ProcessMessageAsync += HandleMessageAsync;
            messageProcessor.ProcessErrorAsync += HandleReceivedExceptionAsync;

            Logger.LogInformation($"Starting message pump on queue {queueName} in namespace {messageProcessor.FullyQualifiedNamespace}");
            await messageProcessor.StartProcessingAsync(stoppingToken);
            Logger.LogInformation("Message pump started");

            while (!stoppingToken.IsCancellationRequested)
            {
                await Task.Delay(TimeSpan.FromSeconds(1));
            }

            Logger.LogInformation("Closing message pump");

            messageProcessor.ProcessMessageAsync -= HandleMessageAsync;
            messageProcessor.ProcessErrorAsync -= HandleReceivedExceptionAsync;

            await messageProcessor.CloseAsync(cancellationToken: stoppingToken);
            Logger.LogInformation("Message pump closed : {Time}", DateTimeOffset.UtcNow);
        }

        private ServiceBusProcessor CreateServiceBusProcessor(string queueName)
        {
            return _serviceBusClient.CreateProcessor(queueName);
        }

        private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs)
        {
            TMessage deserializedMessage = null;
            try
            {
                var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body.ToArray());
                Logger.LogInformation("Received message {MessageId} with body {MessageBody}",
                    processMessageEventArgs.Message.MessageId, rawMessageBody);

                deserializedMessage = JsonConvert.DeserializeObject<TMessage>(rawMessageBody);
                if (deserializedMessage != null)
                {
                    await _messageProcessor.ProcessMessage(deserializedMessage, processMessageEventArgs.Message.MessageId,
                        processMessageEventArgs.Message.ApplicationProperties,
                        processMessageEventArgs.CancellationToken);
                }
                else
                {
                    Logger.LogError(
                        "Unable to deserialize to message contract {ContractName} for message {MessageBody}",
                        typeof(TMessage), rawMessageBody);
                }

                Logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId);

                await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
            }
            catch (Exception ex)
            {
                Logger.LogError(ex, "Unable to handle message: {ProcessMessageEventArgs}", processMessageEventArgs);

                if(!_messageProcessor.CanRetry(ex, processMessageEventArgs.Message?.Body, deserializedMessage))
                {
                    await processMessageEventArgs.DeadLetterMessageAsync(processMessageEventArgs.Message);
                }
            }
        }

        private Task HandleReceivedExceptionAsync(ProcessErrorEventArgs exceptionEvent)
        {
            Logger.LogError(exceptionEvent.Exception, "Unable to process message");
            return Task.CompletedTask;
        }
    }
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Mar 23, 2021
@jsquire jsquire changed the title System.ObjectDisposedException Service Bus: System.ObjectDisposedException when opening a receiver link Mar 23, 2021
@jsquire jsquire self-assigned this Mar 23, 2021
@jsquire jsquire added Client This issue points to a problem in the data-plane of the library. Service Bus labels Mar 23, 2021
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Mar 23, 2021
@jsquire
Copy link
Member

jsquire commented Mar 23, 2021

Hi @kanatgit. Thank you for reaching out and we regret that you're experiencing difficulties. May I ask if you're seeing these exceptions during periods of time when there is no activity on the Service Bus instance or on a recurring basis of around 20 minutes?

@kanatgit
Copy link
Author

kanatgit commented Mar 23, 2021

@jsquire, Thanks for reply. This happens when there is activity on the service bus. Please see the graph for the exception distribution:
image

image

@JoshLove-msft
Copy link
Member

Hi @kanatgit, can you provide more information on how the ServiceBusClient is being managed? Is it possible that the client is being closed while the processor is still running?

@kanatgit
Copy link
Author

kanatgit commented Mar 23, 2021

In our asp.net core web application Startup.cs file, ServiceBusClient is registered as
public void ConfigureServices(IServiceCollection services)
{
...
services.AddHostedService<QueueWorker>();
services.AddAzureClients(cfg =>
{
cfg.AddServiceBusClient(_queueConfiguration.ConnectionString);
});
...
}

QueueWorker is the only class depends on ServiceBusClient ( ServiceBusClient is instantiated by AutoFac container).

@JoshLove-msft
Copy link
Member

Once these errors are hit, does the app ever process messages again or does it just keep spewing these errors until you shut it down?
Do you have any logging in your shutdown code? Can you verify that the app is not being shut down unexpectedly?

@JoshLove-msft
Copy link
Member

JoshLove-msft commented Mar 23, 2021

I think I have an idea of what could be happening. You are removing the event handlers while the processor is still running, which I believe will trigger an exception which would cause the processor to never get closed:

  Logger.LogInformation("Closing message pump");

  messageProcessor.ProcessMessageAsync -= HandleMessageAsync;
  messageProcessor.ProcessErrorAsync -= HandleReceivedExceptionAsync;

  await messageProcessor.CloseAsync(cancellationToken: stoppingToken);

This means when the app is shut down, the processor will still keep running. Instead you can close the processor before attempting to remove the handlers. Also, you are passing the stoppingToken into the CloseAsync call, so the CloseAsync would not work anyway. Finally, it may be better to override StopAsync to handle the processor shut down rather than putting it in ExecuteAsync.

@kanatgit
Copy link
Author

@JoshLove-msft , the removing handler code wasn't there when the exceptions were observed initially. I added them later just to see if it helps to solve the problem.

@JoshLove-msft
Copy link
Member

@JoshLove-msft , the removing handler code wasn't there when the exceptions were observed initially. I added them later just to see if it helps to solve the problem.

Ah ok, well the stoppingToken would have prevented the processor from being closed.

@JoshLove-msft
Copy link
Member

Also, I think we can try to fix this in the SDK so that the processor doesn't continually try to process if the client has been closed. Please let us know if it turns out that is what is happening here.

@kanatgit
Copy link
Author

I don't understand why you think the stopppingToken prevents messageProcessor.CloseAsync(cancellationToken: stoppingToken) to succeed. There is while loop to check the stoppingToken.isCanCancellationRequested before this CloseAsync() call. I also checked ServiceBusProcessor.CloseAsync() implementation, which waits the stopppingToken to be triggered, which I think should succeed when app shuts down.

while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}

        Logger.LogInformation("Closing message pump");

        messageProcessor.ProcessMessageAsync -= HandleMessageAsync;
        messageProcessor.ProcessErrorAsync -= HandleReceivedExceptionAsync;

        await messageProcessor.CloseAsync(cancellationToken: stoppingToken);

public virtual async Task CloseAsync(
CancellationToken cancellationToken = default)
{
IsClosed = true;

        if (IsProcessing)
        {
            await StopProcessingAsync(cancellationToken).ConfigureAwait(false);
        }
        foreach (ReceiverManager receiverManager in _receiverManagers)
        {
            await receiverManager.CloseReceiverIfNeeded(
                cancellationToken,
                forceClose: true)
                .ConfigureAwait(false);
        }
    }

@JoshLove-msft
Copy link
Member

JoshLove-msft commented Mar 24, 2021

StopProcessingAsync throws if the cancellationToken is cancelled.

@kanatgit
Copy link
Author

Ah, cancellatinoToken really means to cancel stopProcessing. I misunderstood it. I will remove the cancellationToken parameter to test our web app tomorrow and will keep you updated. Thanks for the help.

@JoshLove-msft JoshLove-msft added the needs-author-feedback Workflow: More information is needed from author to address the issue. label Mar 25, 2021
@JoshLove-msft
Copy link
Member

Hi @kanatgit, just want to check to see if you were able to test this out.

@kanatgit
Copy link
Author

kanatgit commented Mar 30, 2021

@JoshLove-msft , I implemented two of your suggestions,

  1. Call the CloseAsync() first, then remove the handlers.
  2. Call CloseAsync() without the cancellation token.
    image

Unfortunately, those two changes are not enough to solve the problem, the same exceptions still happened yesterday.
image
image

As you can see, two messages "closing message pump" and "Message pump closed", one should be logged before CloseAync(), one after CloseAync(). From the AppInsight Logs, you can see that "Message pump closed" for ProcessId: "24436" is missing, which indicates that CloseAync() never returns. The exceptions also happens right after the timestamp of "closing message pump" message for ProcessId:"24436". I don't know why there were multiple processes, not sure if it is something related to azure environment.

I am implementing your 3rd suggesting to the put the CloseAsync() method in an overriden BackgroudService.StopAsync() method. While testing this on my local machine, I found that ServicebusClient.CloseAsync() now takes 17 seconds to return, which was returned instantly when it was called from ExecuteAsyn(); I stopped the web app by pressing Ctrl+C after the app started, at that point, the servicebus queue should be empty. Do you have any explanation for this time consumption change? I will deploy this 3rd change to Azure see how it works and keep you updated. Thank you for your continuous help.
image

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Mar 30, 2021
@JoshLove-msft
Copy link
Member

While testing this on my local machine, I found that ServicebusClient.CloseAsync() now takes 17 seconds to return, which was returned instantly when it was called from ExecuteAsyn(); I stopped the web app by pressing Ctrl+C after the app started, at that point, the servicebus queue should be empty. Do you have any explanation for this time consumption change? I will deploy this 3rd change to Azure see how it works and keep you updated. Thank you for your continuous help.

Do you mean when calling processor.CloseAsync? It is expected that it would take some time while closing due to the processor waiting for any receive calls to complete. We are planning on improving this behavior in an upcoming release (see #17734). As for why the call would return immediately when in the Execute method, I'm not sure. Do you have logging enabled for the Service Bus SDK?

@kanatgit
Copy link
Author

kanatgit commented Apr 6, 2021

Yes, the processor.CloseAsync() method takes 17 seconds to finish on my local machine. I implemented your 3rd suggestion calling processor.CloseAsnc() in overriden StopAsnc() function and deployed it to azure environment. Good news is that I haven't seen the ObjectDisposedExceptions happened, but it seems that a lot of times processor.CloseAsync() doesn't return because the "Message pump closed" log are missing most of the time when app got redeployed.
image

@JoshLove-msft
Copy link
Member

Interesting.. are you able to enable SDK logs?

@JoshLove-msft
Copy link
Member

@kanatgit just checking in on this. Were you able to enable SDK logging so we can debug further?

@jsquire
Copy link
Member

jsquire commented Apr 15, 2021

@JoshLove-msft: Based on the stack trace, I'm reasonably sure this is related to #20255 with the root cause being unexpected behavior by the AMQP library. We've discussed with the team that maintains the AMQP library and they'll be addressing it in a future update. I'm working on a mitigation for both Event Hubs and Service Bus in the meantime.

@jsquire
Copy link
Member

jsquire commented Apr 15, 2021

@kanatgit, the frequency that of this in your logs and the time clustering may be a concern. Assuming that I'm correct about the root cause, this log pattern would seem to indicate that your application is experiencing network instability which is causing connections/links to fail. With the forthcoming mitigation in the SDK, it will detect the scenario and try to recover by creating a new connection, but I wonder if you'll continue to experience connectivity difficulties.

Can you help us understand the host environment and application a bit better?

@jsquire jsquire added needs-author-feedback Workflow: More information is needed from author to address the issue. and removed needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team labels Apr 15, 2021
@kanatgit
Copy link
Author

kanatgit commented Apr 15, 2021

I have been investigating why this problem only happens at azure environment. The screen shots below shows our web app deployment pipelines. As you can see the task of stop app service returns in 1 second, which doesn't wait the servicebus processor.CloseAsync() to return, that's why "Message pump closed" log is missing. Web app is shutdown ungracefully and I don't see any option/configuration is available to change this behavior.

image
image

I have tried various suggestions I found online:

  1. Register IHostApplicationLifetime.ApplicationStopping.Register(OnStopping)
    private void OnStopping()
    {
    Logger.LogInformation("Closing message pump");

         _serviceBusProcessor.CloseAsync().GetAwaiter().GetResult();
    
          _serviceBusProcessor.ProcessErrorAsync -= HandleReceivedExceptionAsync;
        _serviceBusProcessor.ProcessMessageAsync -= HandleMessageAsync;
    
         Logger.LogInformation("Message pump closed : {Time}", DateTimeOffset.UtcNow);
    
         _telemetryClient.Flush();
     }
    
  2. Set timeout to wait background service to finish.

webHostBuilder
.UseContentRoot(Directory.GetCurrentDirectory())
.UseIISIntegration()
.UseStartup()
.UseShutdownTimeout(TimeSpan.FromSeconds(60));

  1. Use IDiposing() to call the _serviceBusProcessor.CloseAsync().

  2. Override BackgroundService.StopAsync() to call _serviceBusProcessor.CloseAsync().

Unfortunately, none of them makes the Azure deployment to wait the _serviceBusProcessor.CloseAsync() to finish, and I don't see any configuration in Azure portal wait the web app to shutdown gracefully.

@jsquire, this is not a network issue, all the ObjectDisposedExceptions happens during web app shutdown, during that time, serviceBusProcessor.CloseAsync() is hanging, then web app is brutally shut down. I think that serviceBusClient gets released, while servicebus receiver retrieving messages is still going on and caused the exceptions.

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Apr 15, 2021
@jsquire
Copy link
Member

jsquire commented Apr 15, 2021

@kanatgit. That makes sense and would trigger the behavior that I was talking about; tearing down the host environment is very likely to cause changes to the state of the network resources, upsetting the AMQP transport. What I'd expect to see in this case is one instance of the ObjectDisposedException for each concurrent receive. Currently, they should be treated as terminal and the processor should be aware that it is stopping and should not relaunch the faulted task.

Does that match what you're seeing?

@jsquire jsquire added needs-author-feedback Workflow: More information is needed from author to address the issue. and removed needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team labels Apr 15, 2021
@jsquire
Copy link
Member

jsquire commented Apr 15, 2021

I talked to @JoshLove-msft offline, and it looks like there are two issues at play. The gap that I wasn't seeing earlier is that the processor also isn't reacting to the client being closed, so the change in network state was likely first caused by the client closing rather than the AppService shutting down. Apologies for the confusion.

@JoshLove-msft
Copy link
Member

I think there are a few things going on here.

In the SDK itself, we have a bug where the receiver/sender/etc does not check if the client was closed before attempting operations. It checks only if the link is closed. A similar gap appears to exist in Event Hubs where closing the connection without closing the Consumer would result in the same behavior.

This bug can be worked around by ensuring that processor.close is called, but there is some odd behavior occurring - it didn't seem to get called when it was in the Execute method. Moving the processor.close call into the stop method works locally, but it doesn't always return when deployed. The best way to debug further would be to enable SDK logs - I'm not seeing these in the latest screenshot.

@kanatgit
Copy link
Author

kanatgit commented Apr 15, 2021

@JoshLove-msft, can you tell me specifically how do I enable SDK logs? Do you mean add
"Logging": {
"LogLevel": {
"Default": "Debug"
}
to appsettings.json? I have a debug console which shows servicebus debug infomation on my local machine, but I don't know how to enable SDK debug info on azure environment.

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Apr 15, 2021
@JoshLove-msft
Copy link
Member

Is this an App Service instance? This might help - https://docs.microsoft.com/en-us/azure/app-service/troubleshoot-diagnostic-logs

@kanatgit
Copy link
Author

@JoshLove-msft , Thanks for the app service logs document link. Unfortunately in order to use the app service logs, it requires to install .net core site extention, which is incompatible with our app, causing our app unable to start.

image

@kanatgit
Copy link
Author

kanatgit commented Apr 16, 2021

I found a temporary solution : Ignoring the ObjectDisposedException in the Error Handler HandleReceivedExceptionAsync() during shutdown.

public class QueueWorker : BackgroundService where TMessage : class
{
private readonly IProcessQueueMessages _messageProcessor;

    private readonly ServiceBusClient _serviceBusClient;

    private ServiceBusProcessor _serviceBusProcessor;

    private readonly IHostApplicationLifetime _appLifetime;

    private bool _isStopping;



    protected ILogger<QueueWorker<TMessage>> Logger { get; }

    public QueueWorker(ILogger<QueueWorker<TMessage>> logger, IProcessQueueMessages<TMessage> messageProcessor, ServiceBusClient serviceBusClient, IHostApplicationLifetime appLifetime)
    {
        Logger = logger;
        _messageProcessor = messageProcessor;
        _serviceBusClient = serviceBusClient;
        _appLifetime = appLifetime;
        _isStopping = false;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var queueName = _messageProcessor.QueueName;
        if(string.IsNullOrWhiteSpace(queueName))
        {
            Logger.LogWarning(_messageProcessor.QueueNameMissingWarningMessage);
            return;
        }

        _serviceBusProcessor = CreateServiceBusProcessor(queueName);
        _serviceBusProcessor.ProcessMessageAsync += HandleMessageAsync;
        _serviceBusProcessor.ProcessErrorAsync += HandleReceivedExceptionAsync;

        Logger.LogInformation($"Starting message pump on queue {queueName} in namespace {_serviceBusProcessor.FullyQualifiedNamespace}");
        await _serviceBusProcessor.StartProcessingAsync(stoppingToken);
        _appLifetime.ApplicationStopping.Register(OnStopping);
        Logger.LogInformation("Message pump started");

    }

    private void OnStopping()
    {
        Logger.LogInformation("Closing message pump");
        _isStopping = true;

        _serviceBusProcessor.CloseAsync().GetAwaiter().GetResult();
        _serviceBusProcessor.ProcessErrorAsync -= HandleReceivedExceptionAsync;
        _serviceBusProcessor.ProcessMessageAsync -= HandleMessageAsync;


        Logger.LogInformation("Message pump closed : {Time}", DateTimeOffset.UtcNow);

    }

    private ServiceBusProcessor CreateServiceBusProcessor(string queueName)
    {
        return _serviceBusClient.CreateProcessor(queueName);
    }

    private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs)
    {
        TMessage deserializedMessage = null;
        try
        {
            var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body.ToArray());
            Logger.LogInformation("Received message {MessageId} with body {MessageBody}",
                processMessageEventArgs.Message.MessageId, rawMessageBody);

            deserializedMessage = JsonConvert.DeserializeObject<TMessage>(rawMessageBody);
            if (deserializedMessage != null)
            {
                await _messageProcessor.ProcessMessage(deserializedMessage, processMessageEventArgs.Message.MessageId,
                    processMessageEventArgs.Message.ApplicationProperties,
                    processMessageEventArgs.CancellationToken);
            }
            else
            {
                Logger.LogError(
                    "Unable to deserialize to message contract {ContractName} for message {MessageBody}",
                    typeof(TMessage), rawMessageBody);
            }

            Logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId);

            await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
        }
        catch (Exception ex)
        {
            Logger.LogError(ex, "Unable to handle message: {ProcessMessageEventArgs}", processMessageEventArgs);

            if(!_messageProcessor.CanRetry(ex, processMessageEventArgs.Message?.Body, deserializedMessage))
            {
                await processMessageEventArgs.DeadLetterMessageAsync(processMessageEventArgs.Message);
            }
        }
    }

    private Task HandleReceivedExceptionAsync(ProcessErrorEventArgs exceptionEvent)
    {
        // During application shutdown, microsoft service bus code throws a lot of ObjectDisposedException,
        // We have reported this issue to microsoft, whille microsoft is still working on this issue,
        // we just simply ignore this type of exception during shutdown to prevent it flooding app loggings.
        if (_isStopping && exceptionEvent.Exception is ObjectDisposedException)
        {
            return Task.CompletedTask;
        }

        Logger.LogError(exceptionEvent.Exception, "Unable to process message");
        return Task.CompletedTask;
    }
}

@JoshLove-msft
Copy link
Member

@JoshLove-msft , Thanks for the app service logs document link. Unfortunately in order to use the app service logs, it requires to install .net core site extention, which is incompatible with our app, causing our app unable to start.

@pakrym do you know how we can enable SDK logging here?

@kanatgit
Copy link
Author

kanatgit commented Apr 17, 2021

@JoshLove-msft , I managed to get the logging in azure environment with debug level enabled.

Here is a bad logging, servicebusProcessor.CloseAsnc() never returns because the "Message pump closed" is missing. Please pay attention to the stop error at row 51. Even though the ObjectDisplosedException that I originally reported does not show up, somehow it's getting harder for me to reproduce it, but it is still good to show why the servicebusProcessor is not closed during web app shutdown.

image
image

Here is a good logging for your reference. (You can see both the "Closing message pump" and "Message pump closed" messages).

image
image

@kanatgit
Copy link
Author

@jsquire ,Thanks for your continuous efforts bringing the fix. May I ask when this fix will be available in NuGet Repository?

@jsquire
Copy link
Member

jsquire commented Apr 23, 2021

@kanatgit. You're very welcome. These changes will be part of our next release in early May. I can't speak to whether that planned as a beta or GA; I'll need to defer to @JoshLove-msft for insight.

@jsquire
Copy link
Member

jsquire commented Jul 3, 2021

Since the fixes needed are now confirmed to have reached GA, I'm going to tag this as having been addressed and close it out. Please feel free to "/unresolve" if you believe that further discussion is needed.

@jsquire jsquire added the issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. label Jul 3, 2021
@ghost
Copy link

ghost commented Jul 3, 2021

Hi @kanatgit. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

@ghost ghost removed the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Jul 3, 2021
@jsquire jsquire closed this as completed Jul 3, 2021
@github-actions github-actions bot locked and limited conversation to collaborators Mar 27, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests

3 participants