From 8d06c02f6fa99ee2bf602597454907ee7369ad6a Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 5 Dec 2024 16:49:12 -0800 Subject: [PATCH 1/4] Add extra logging for support case # Conflicts: # src/Transport/Receiving/MessagePump.cs --- src/Transport/Receiving/MessagePump.cs | 73 +++++++++++++++++--------- 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index 07b84833..d4eab899 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -199,37 +199,62 @@ void UpdateProcessingCapacity(int maxConcurrency) public async Task StopReceive(CancellationToken cancellationToken = default) { - // Wiring up the stop token to trigger the cancellation token that is being - // used inside the message handling pipeline - await using var _ = cancellationToken - .Register(state => (state as CancellationTokenSource)?.Cancel(), - messageProcessingCancellationTokenSource, - useSynchronizationContext: false).ConfigureAwait(false); - // Deliberately not passing the cancellation token forward in order to make sure - // the processor waits until all processing handlers have returned. This makes - // the code compliant to the previous version that uses manual receives and is aligned - // with how the cancellation token support was initially designed. - await processor.StopProcessingAsync(CancellationToken.None) - .ConfigureAwait(false); + if (messageProcessingCancellationTokenSource is null) + { + // Receiver hasn't been started or is already stopped + return; + } try { - await processor.CloseAsync(cancellationToken) + Logger.Debug("Registering cancellation token"); + // Wiring up the stop token to trigger the cancellation token that is being + // used inside the message handling pipeline + await using var _ = cancellationToken + .Register(state => (state as CancellationTokenSource)?.Cancel(), + messageProcessingCancellationTokenSource, + useSynchronizationContext: false).ConfigureAwait(false); + Logger.Debug("Cancellation token registered."); + + + // Deliberately not passing the cancellation token forward in order to make sure + // the processor waits until all processing handlers have returned. This makes + // the code compliant to the previous version that uses manual receives and is aligned + // with how the cancellation token support was initially designed. + await processor.StopProcessingAsync(CancellationToken.None) .ConfigureAwait(false); - } - catch (Exception ex) when (ex.IsCausedBy(cancellationToken)) - { - Logger.Debug($"Operation canceled while stopping the receiver {processor.EntityPath}.", ex); - } + Logger.Info("Processor stopped processing."); - processor.ProcessErrorAsync -= OnProcessorError; - processor.ProcessMessageAsync -= OnProcessMessage; + try + { + await processor.CloseAsync(cancellationToken) + .ConfigureAwait(false); + Logger.Info("Processor closed successfully."); + } + catch (Exception ex) when (ex.IsCausedBy(cancellationToken)) + { + Logger.Debug($"Operation canceled while stopping the receiver {processor.EntityPath}.", ex); + } + + processor.ProcessErrorAsync -= OnProcessorError; + processor.ProcessMessageAsync -= OnProcessMessage; + Logger.Debug("Processor event handlers detached."); + + await processor.DisposeAsync().ConfigureAwait(false); + Logger.Debug("Processor disposed."); - await processor.DisposeAsync().ConfigureAwait(false); + messageProcessingCancellationTokenSource?.Dispose(); + messageProcessingCancellationTokenSource = null; + Logger.Debug("Message processing cancellation token source disposed."); - messageProcessingCancellationTokenSource?.Dispose(); - messageProcessingCancellationTokenSource = null; - circuitBreaker?.Dispose(); + circuitBreaker?.Dispose(); + Logger.Debug("Circuit breaker disposed."); + } + catch (Exception ex) + { + Logger.Error("An error occurred while trying to stop the receiver.", ex); + throw; + } } async Task ProcessMessage(ServiceBusReceivedMessage message, From b2caec8161bafb962c51dc89696ce238101c5e00 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 5 Dec 2024 17:04:39 -0800 Subject: [PATCH 2/4] Make distinction of which exceptions to catch --- src/Transport/Receiving/MessagePump.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index d4eab899..807fd1bb 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -250,7 +250,7 @@ await processor.CloseAsync(cancellationToken) circuitBreaker?.Dispose(); Logger.Debug("Circuit breaker disposed."); } - catch (Exception ex) + catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) { Logger.Error("An error occurred while trying to stop the receiver.", ex); throw; From 1fcc90b2a07f0a7c4abc49d508e9294289e87e8b Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Fri, 6 Dec 2024 10:40:59 -0800 Subject: [PATCH 3/4] Remove try catch --- src/Transport/Receiving/MessagePump.cs | 78 ++++++++++++-------------- 1 file changed, 35 insertions(+), 43 deletions(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index 807fd1bb..62e55d6c 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -205,56 +205,48 @@ public async Task StopReceive(CancellationToken cancellationToken = default) return; } + Logger.Debug("Registering cancellation token"); + // Wiring up the stop token to trigger the cancellation token that is being + // used inside the message handling pipeline + await using var _ = cancellationToken + .Register(state => (state as CancellationTokenSource)?.Cancel(), + messageProcessingCancellationTokenSource, + useSynchronizationContext: false).ConfigureAwait(false); + Logger.Debug("Cancellation token registered."); + + + // Deliberately not passing the cancellation token forward in order to make sure + // the processor waits until all processing handlers have returned. This makes + // the code compliant to the previous version that uses manual receives and is aligned + // with how the cancellation token support was initially designed. + await processor.StopProcessingAsync(CancellationToken.None) + .ConfigureAwait(false); + Logger.Info("Processor stopped processing."); + try { - Logger.Debug("Registering cancellation token"); - // Wiring up the stop token to trigger the cancellation token that is being - // used inside the message handling pipeline - await using var _ = cancellationToken - .Register(state => (state as CancellationTokenSource)?.Cancel(), - messageProcessingCancellationTokenSource, - useSynchronizationContext: false).ConfigureAwait(false); - Logger.Debug("Cancellation token registered."); - - - // Deliberately not passing the cancellation token forward in order to make sure - // the processor waits until all processing handlers have returned. This makes - // the code compliant to the previous version that uses manual receives and is aligned - // with how the cancellation token support was initially designed. - await processor.StopProcessingAsync(CancellationToken.None) + await processor.CloseAsync(cancellationToken) .ConfigureAwait(false); - Logger.Info("Processor stopped processing."); - - try - { - await processor.CloseAsync(cancellationToken) - .ConfigureAwait(false); - Logger.Info("Processor closed successfully."); - } - catch (Exception ex) when (ex.IsCausedBy(cancellationToken)) - { - Logger.Debug($"Operation canceled while stopping the receiver {processor.EntityPath}.", ex); - } + Logger.Info("Processor closed successfully."); + } + catch (Exception ex) when (ex.IsCausedBy(cancellationToken)) + { + Logger.Debug($"Operation canceled while stopping the receiver {processor.EntityPath}.", ex); + } - processor.ProcessErrorAsync -= OnProcessorError; - processor.ProcessMessageAsync -= OnProcessMessage; - Logger.Debug("Processor event handlers detached."); + processor.ProcessErrorAsync -= OnProcessorError; + processor.ProcessMessageAsync -= OnProcessMessage; + Logger.Debug("Processor event handlers detached."); - await processor.DisposeAsync().ConfigureAwait(false); - Logger.Debug("Processor disposed."); + await processor.DisposeAsync().ConfigureAwait(false); + Logger.Debug("Processor disposed."); - messageProcessingCancellationTokenSource?.Dispose(); - messageProcessingCancellationTokenSource = null; - Logger.Debug("Message processing cancellation token source disposed."); + messageProcessingCancellationTokenSource?.Dispose(); + messageProcessingCancellationTokenSource = null; + Logger.Debug("Message processing cancellation token source disposed."); - circuitBreaker?.Dispose(); - Logger.Debug("Circuit breaker disposed."); - } - catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) - { - Logger.Error("An error occurred while trying to stop the receiver.", ex); - throw; - } + circuitBreaker?.Dispose(); + Logger.Debug("Circuit breaker disposed."); } async Task ProcessMessage(ServiceBusReceivedMessage message, From 36acbf3c275d3d481a6c02cadabcdf7b02647699 Mon Sep 17 00:00:00 2001 From: Bob Langley Date: Mon, 16 Dec 2024 14:23:28 -0800 Subject: [PATCH 4/4] Remove message cancellation token check --- src/Transport/Receiving/MessagePump.cs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index 62e55d6c..86933fb0 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -199,12 +199,6 @@ void UpdateProcessingCapacity(int maxConcurrency) public async Task StopReceive(CancellationToken cancellationToken = default) { - if (messageProcessingCancellationTokenSource is null) - { - // Receiver hasn't been started or is already stopped - return; - } - Logger.Debug("Registering cancellation token"); // Wiring up the stop token to trigger the cancellation token that is being // used inside the message handling pipeline @@ -347,4 +341,4 @@ AzureServiceBusTransportTransaction CreateTransaction(string incomingQueuePartit public string ReceiveAddress { get; } } -} \ No newline at end of file +}