From 36093bc9da43a9b1dcd81bece9c80de3028e678b Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Mon, 16 Dec 2019 15:14:38 -0500 Subject: [PATCH] [Event Hubs Client] Track Two (Timer Dispose Race Condition Fix) The focus of these changes is to address a race condition that could cause the authorization refresh timer callback to throw an ObjectDisposedException due to a set of benign race conditions. These changes attempt to make those conditions less likely but cannot elimitate them completely without introducing synchronization overhead. Because the condition is benign, accept the resulting exception as an expected case rather than paying that cost. --- .../src/Amqp/AmqpConnectionScope.cs | 13 +- .../tests/Amqp/AmqpConnectionScopeTests.cs | 122 ++++++++++++++++++ 2 files changed, 134 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs index d15dd063f6b7f..ac83a8ada25c3 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs @@ -543,7 +543,7 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon endpoint.AbsoluteUri, authClaims, AuthorizationRefreshTimeout, - () => refreshTimer + () => (ActiveLinks.ContainsKey(link) ? refreshTimer : null) ); refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan); @@ -754,6 +754,11 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection try { + if (refreshTimer == null) + { + return; + } + var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout).ConfigureAwait(false); // Reset the timer for the next refresh. @@ -763,6 +768,12 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection refreshTimer.Change(CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan); } } + catch (ObjectDisposedException) + { + // This can occur if the connection is closed or the scope disposed after the factory + // is called but before the timer is updated. The callback may also fire while the timer is + // in the act of disposing. Do not consider it an error. + } catch (Exception ex) { EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshError(EventHubName, endpoint.AbsoluteUri, ex.Message); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs index 6821bb5648982..d2989ccf20131 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs @@ -1372,6 +1372,70 @@ public async Task OpenProducerLinkAsyncRefreshesAuthorization() } } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public async Task AuthorizationTimerCallbackToleratesDisposal() + { + var endpoint = new Uri("amqp://test.service.gov"); + var eventHub = "myHub"; + var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); + var transport = EventHubsTransportType.AmqpTcp; + var cancellationSource = new CancellationTokenSource(); + var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings()); + var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of()); + var mockScope = new DisposeOnAuthorizationTimerCallbackMockScope(endpoint, eventHub, credential.Object, transport, null); + + var link = await mockScope.OpenProducerLinkAsync(null, TimeSpan.FromDays(1), cancellationSource.Token); + Assert.That(link, Is.Not.Null, "The link produced was null"); + + var activeLinks = GetActiveLinks(mockScope); + Assert.That(activeLinks.ContainsKey(link), Is.True, "The producer link should be tracked as active."); + + activeLinks.TryGetValue(link, out var refreshTimer); + Assert.That(refreshTimer, Is.Not.Null, "The link should have a non-null timer."); + + // Reset the timer so that it fires immediately and validate that authorization was + // requested. Since opening of the link requests an initial authorization and the expiration + // was set way in the future, there should be exactly two calls. + // + // Because the timer runs in the background, there is a level of non-determinism in when that + // callback will execute. Allow for a small number of delay and retries to account for it. + + refreshTimer.Change(0, Timeout.Infinite); + + var attemptCount = 0; + var remainingAttempts = 10; + var success = false; + + while ((--remainingAttempts >= 0) && (!success)) + { + try + { + await Task.Delay(250 * ++attemptCount).ConfigureAwait(false); + success = ((mockScope.IsDisposed) && (mockScope.CallbackInvoked)); + } + catch (ObjectDisposedException) + { + Assert.Fail("No disposed exception should have been triggered"); + } + catch when (remainingAttempts <= 0) + { + throw; + } + catch + { + // No action needed. + } + + Assert.That(mockScope.IsDisposed, Is.True, "The scope should have been disposed."); + Assert.That(mockScope.CallbackInvoked, Is.True, "The authorization timer callback should have been invoked."); + } + } + /// /// Verifies functionality of the /// method. @@ -1666,5 +1730,63 @@ public MockTransport() : base("Mock") { } protected override void AbortInternal() => throw new NotImplementedException(); protected override bool CloseInternal() => throw new NotImplementedException(); } + + /// + /// Provides a mock which disposes the scope before invoking the default timer callback for authorization. + /// + /// + private class DisposeOnAuthorizationTimerCallbackMockScope : AmqpConnectionScope + { + public bool CallbackInvoked = false; + + private AmqpConnection _mockConnection; + private AmqpSession _mockSession; + + public DisposeOnAuthorizationTimerCallbackMockScope(Uri serviceEndpoint, + string eventHubName, + EventHubTokenCredential credential, + EventHubsTransportType transport, + IWebProxy proxy) : base(serviceEndpoint, eventHubName, credential, transport, proxy) + { + _mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings()); + _mockSession = new AmqpSession(_mockConnection, new AmqpSessionSettings(), Mock.Of()); + } + + protected override Task CreateAndOpenConnectionAsync(Version amqpVersion, + Uri serviceEndpoint, + EventHubsTransportType transportType, + IWebProxy proxy, + string scopeIdentifier, + TimeSpan timeout) => Task.FromResult(_mockConnection); + + protected override Task OpenAmqpObjectAsync(AmqpObject target, TimeSpan timeout) => Task.CompletedTask; + + protected override TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection connection, + AmqpObject amqpLink, + CbsTokenProvider tokenProvider, + Uri endpoint, + string audience, + string resource, + string[] requiredClaims, + TimeSpan refreshTimeout, + Func refreshTimerFactory) + { + Action baseImplementation = () => base.CreateAuthorizationRefreshHandler(connection, amqpLink, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout, refreshTimerFactory); + + return state => + { + CallbackInvoked = true; + Dispose(); + baseImplementation(); + }; + } + protected override Task RequestAuthorizationUsingCbsAsync(AmqpConnection connection, + CbsTokenProvider tokenProvider, + Uri endpoint, + string audience, + string resource, + string[] requiredClaims, + TimeSpan timeout) => Task.FromResult(DateTime.Now.AddMinutes(60)); + } } }