Skip to content

Commit

Permalink
Enable a way to Unregister Message Handler and Session Handler (#14021)
Browse files Browse the repository at this point in the history
* add UnregisterMessageHandler method

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs

Co-authored-by: Sean Feldman <[email protected]>

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs

Co-authored-by: Sean Feldman <[email protected]>

* Update the unregister method to be async and await for inflight operations to finish

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs

Co-authored-by: Sean Feldman <[email protected]>

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs

Co-authored-by: Sean Feldman <[email protected]>

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs

Co-authored-by: Sean Feldman <[email protected]>

* Change name to have async suffix and add to existing onMessageQueueTests

* Add UnregisterSessionHandlerAsync and corresponding tests

* nit

* nit

* Add a new cancellation type to not cancel inflight message handling operations when unregister is called.

* Add another type of cancellation token to session handler path

* nit

* Add a timeout parameter to unregister functions and add according unit tests

* nit

* cancel runningTaskCancellationTokenSource after unregister is done

* change public API

* update the API header

* update the API definition

* fix spacing

* fix ApproveAzureServiceBus CIT test

Co-authored-by: Sean Feldman <[email protected]>
  • Loading branch information
DorothySun216 and SeanFeldman authored Sep 10, 2020
1 parent b15911e commit 9fcc8cc
Show file tree
Hide file tree
Showing 17 changed files with 634 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public interface IReceiverClient : IClientEntity
/// <remarks>Enable prefetch to speed up the receive rate.</remarks>
void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions messageHandlerOptions);

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout);

/// <summary>
/// Completes a <see cref="Message"/> using its lock token. This will delete the message from the queue.
/// </summary>
Expand Down Expand Up @@ -115,4 +123,4 @@ public interface IReceiverClient : IClientEntity
/// </remarks>
Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ public class MessageReceiver : ClientEntity, IMessageReceiver
int prefetchCount;
long lastPeekedSequenceNumber;
MessageReceivePump receivePump;
// Cancellation token to cancel the message pump. Once this is fired, all future message handling operations registered by application will be
// cancelled.
CancellationTokenSource receivePumpCancellationTokenSource;
// Cancellation token to cancel the inflight message handling operations registered by application in the message pump.
CancellationTokenSource runningTaskCancellationTokenSource;

/// <summary>
/// Creates a new MessageReceiver from a <see cref="ServiceBusConnectionStringBuilder"/>.
Expand Down Expand Up @@ -899,6 +903,51 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.OnMessageHandler(messageHandlerOptions, handler);
}

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();

if (inflightMessageHandlerTasksWaitTimeout <= TimeSpan.Zero)
{
throw Fx.Exception.ArgumentOutOfRange(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout));
}

MessagingEventSource.Log.UnregisterMessageHandlerStart(this.ClientId);
lock (this.messageReceivePumpSyncLock)
{
if (this.receivePump == null || this.receivePumpCancellationTokenSource.IsCancellationRequested)
{
// Silently return if handler has already been unregistered.
return;
}

this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
}

Stopwatch stopWatch = Stopwatch.StartNew();
while (this.receivePump != null
&& stopWatch.Elapsed < inflightMessageHandlerTasksWaitTimeout
&& this.receivePump.maxConcurrentCallsSemaphoreSlim.CurrentCount < this.receivePump.registerHandlerOptions.MaxConcurrentCalls)
{
await Task.Delay(10).ConfigureAwait(false);
}

lock (this.messageReceivePumpSyncLock)
{
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
MessagingEventSource.Log.UnregisterMessageHandlerStop(this.ClientId);
}

/// <summary>
/// Registers a <see cref="ServiceBusPlugin"/> to be used with this receiver.
/// </summary>
Expand Down Expand Up @@ -1003,6 +1052,9 @@ protected override async Task OnClosingAsync()
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
// For back-compatibility
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down Expand Up @@ -1279,7 +1331,13 @@ protected virtual void OnMessageHandler(
}

this.receivePumpCancellationTokenSource = new CancellationTokenSource();
this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token);

if (this.runningTaskCancellationTokenSource == null)
{
this.runningTaskCancellationTokenSource = new CancellationTokenSource();
}

this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token, this.runningTaskCancellationTokenSource.Token);
}

try
Expand All @@ -1295,6 +1353,8 @@ protected virtual void OnMessageHandler(
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down
8 changes: 8 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,13 @@ public interface IQueueClient : IReceiverClient, ISenderClient
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
/// <remarks>Enable prefetch to speed up the receive rate. </remarks>
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandlerOptions sessionHandlerOptions);

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,13 @@ public interface ISubscriptionClient : IReceiverClient
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
/// <remarks>Enable prefetch to speed up the receive rate. </remarks>
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandlerOptions sessionHandlerOptions);

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,28 @@ namespace Microsoft.Azure.ServiceBus

sealed class MessageReceivePump
{
public readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
public readonly MessageHandlerOptions registerHandlerOptions;
readonly Func<Message, CancellationToken, Task> onMessageCallback;
readonly string endpoint;
readonly MessageHandlerOptions registerHandlerOptions;
readonly IMessageReceiver messageReceiver;
readonly CancellationToken pumpCancellationToken;
readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
readonly CancellationToken runningTaskCancellationToken;
readonly ServiceBusDiagnosticSource diagnosticSource;

public MessageReceivePump(IMessageReceiver messageReceiver,
MessageHandlerOptions registerHandlerOptions,
Func<Message, CancellationToken, Task> callback,
Uri endpoint,
CancellationToken pumpCancellationToken)
CancellationToken pumpCancellationToken,
CancellationToken runningTaskCancellationToken)
{
this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver));
this.registerHandlerOptions = registerHandlerOptions;
this.onMessageCallback = callback;
this.endpoint = endpoint.Authority;
this.pumpCancellationToken = pumpCancellationToken;
this.runningTaskCancellationToken = runningTaskCancellationToken;
this.maxConcurrentCallsSemaphoreSlim = new SemaphoreSlim(this.registerHandlerOptions.MaxConcurrentCalls);
this.diagnosticSource = new ServiceBusDiagnosticSource(messageReceiver.Path, endpoint);
}
Expand Down Expand Up @@ -163,7 +166,7 @@ async Task MessageDispatchTask(Message message)
try
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackStart(this.messageReceiver.ClientId, message);
await this.onMessageCallback(message, this.pumpCancellationToken).ConfigureAwait(false);
await this.onMessageCallback(message, this.runningTaskCancellationToken).ConfigureAwait(false);

MessagingEventSource.Log.MessageReceiverPumpUserCallbackStop(this.messageReceiver.ClientId, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,42 @@ public void ManagementSerializationException(string objectName, string details =
this.WriteEvent(117, objectName, details);
}
}

[Event(118, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler start.")]
public void UnregisterMessageHandlerStart(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(118, clientId);
}
}

[Event(119, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler done.")]
public void UnregisterMessageHandlerStop(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(119, clientId);
}
}

[Event(120, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler start.")]
public void UnregisterSessionHandlerStart(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(120, clientId);
}
}

[Event(121, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler done.")]
public void UnregisterSessionHandlerStop(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(121, clientId);
}
}
}

internal static class TraceHelper
Expand Down
24 changes: 24 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();
await this.InnerReceiver.UnregisterMessageHandlerAsync(inflightMessageHandlerTasksWaitTimeout).ConfigureAwait(false);
}

/// <summary>
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages.
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client.
Expand Down Expand Up @@ -476,6 +488,18 @@ public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationTo
this.SessionPumpHost.OnSessionHandler(handler, sessionHandlerOptions);
}

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();
await this.SessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false);
}

/// <summary>
/// Schedules a message to appear on Service Bus at a later time.
/// </summary>
Expand Down
54 changes: 53 additions & 1 deletion sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

namespace Microsoft.Azure.ServiceBus
{
using Microsoft.Azure.ServiceBus.Primitives;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -12,6 +14,7 @@ internal sealed class SessionPumpHost
readonly object syncLock;
SessionReceivePump sessionReceivePump;
CancellationTokenSource sessionPumpCancellationTokenSource;
CancellationTokenSource runningTaskCancellationTokenSource;
readonly Uri endpoint;

public SessionPumpHost(string clientId, ReceiveMode receiveMode, ISessionClient sessionClient, Uri endpoint)
Expand All @@ -35,6 +38,9 @@ public void Close()
{
this.sessionPumpCancellationTokenSource?.Cancel();
this.sessionPumpCancellationTokenSource?.Dispose();
// For back-compatibility
this.runningTaskCancellationTokenSource?.Cancel();
this.runningTaskCancellationTokenSource?.Dispose();
this.sessionReceivePump = null;
}
}
Expand All @@ -53,14 +59,22 @@ public void OnSessionHandler(
}

this.sessionPumpCancellationTokenSource = new CancellationTokenSource();

// Running task cancellation token source can be reused if previously UnregisterSessionHandlerAsync was called
if (this.runningTaskCancellationTokenSource == null)
{
this.runningTaskCancellationTokenSource = new CancellationTokenSource();
}

this.sessionReceivePump = new SessionReceivePump(
this.ClientId,
this.SessionClient,
this.ReceiveMode,
sessionHandlerOptions,
callback,
this.endpoint,
this.sessionPumpCancellationTokenSource.Token);
this.sessionPumpCancellationTokenSource.Token,
this.runningTaskCancellationTokenSource.Token);
}

try
Expand All @@ -82,5 +96,43 @@ public void OnSessionHandler(

MessagingEventSource.Log.RegisterOnSessionHandlerStop(this.ClientId);
}

public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout)
{
if (inflightSessionHandlerTasksWaitTimeout <= TimeSpan.Zero)
{
throw Fx.Exception.ArgumentOutOfRange(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout));
}

MessagingEventSource.Log.UnregisterSessionHandlerStart(this.ClientId);
lock (this.syncLock)
{
if (this.sessionReceivePump == null || this.sessionPumpCancellationTokenSource.IsCancellationRequested)
{
// Silently return if handler has already been unregistered.
return;
}

this.sessionPumpCancellationTokenSource.Cancel();
this.sessionPumpCancellationTokenSource.Dispose();
}

Stopwatch stopWatch = Stopwatch.StartNew();
while (this.sessionReceivePump != null
&& stopWatch.Elapsed < inflightSessionHandlerTasksWaitTimeout
&& (this.sessionReceivePump.maxConcurrentSessionsSemaphoreSlim.CurrentCount <
this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentSessions
|| this.sessionReceivePump.maxPendingAcceptSessionsSemaphoreSlim.CurrentCount <
this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls))
{
await Task.Delay(10).ConfigureAwait(false);
}

lock (this.sessionPumpCancellationTokenSource)
{
this.sessionReceivePump = null;
}
MessagingEventSource.Log.UnregisterSessionHandlerStop(this.ClientId);
}
}
}
Loading

0 comments on commit 9fcc8cc

Please sign in to comment.