Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Track Two (Timer Dispose Fix) #9156

Merged
merged 1 commit into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
endpoint.AbsoluteUri,
jsquire marked this conversation as resolved.
Show resolved Hide resolved
authClaims,
AuthorizationRefreshTimeout,
() => refreshTimer
() => (ActiveLinks.ContainsKey(link) ? refreshTimer : null)
);

refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan);
Expand Down Expand Up @@ -754,6 +754,11 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection

try
{
if (refreshTimer == null)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This will help reduce the potential for occurrence, but does not eliminate it as the Dispose call could be taking place as the timer callback is invoked.

{
return;
}

var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout).ConfigureAwait(false);

// Reset the timer for the next refresh.
Expand All @@ -763,6 +768,12 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection
refreshTimer.Change(CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan);
}
}
catch (ObjectDisposedException)
{
// This can occur if the connection is closed or the scope disposed after the factory
// is called but before the timer is updated. The callback may also fire while the timer is
// in the act of disposing. Do not consider it an error.
}
catch (Exception ex)
{
EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshError(EventHubName, endpoint.AbsoluteUri, ex.Message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,70 @@ public async Task OpenProducerLinkAsyncRefreshesAuthorization()
}
}

/// <summary>
/// Verifies functionality of the <see cref="AmqpConnectionScope.OpenProducerLinkAsync" />
/// method.
/// </summary>
///
[Test]
public async Task AuthorizationTimerCallbackToleratesDisposal()
{
var endpoint = new Uri("amqp://test.service.gov");
var eventHub = "myHub";
var credential = new Mock<EventHubTokenCredential>(Mock.Of<TokenCredential>(), "{namespace}.servicebus.windows.net");
var transport = EventHubsTransportType.AmqpTcp;
var cancellationSource = new CancellationTokenSource();
var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of<ILinkFactory>());
var mockScope = new DisposeOnAuthorizationTimerCallbackMockScope(endpoint, eventHub, credential.Object, transport, null);

var link = await mockScope.OpenProducerLinkAsync(null, TimeSpan.FromDays(1), cancellationSource.Token);
Assert.That(link, Is.Not.Null, "The link produced was null");

var activeLinks = GetActiveLinks(mockScope);
Assert.That(activeLinks.ContainsKey(link), Is.True, "The producer link should be tracked as active.");

activeLinks.TryGetValue(link, out var refreshTimer);
Assert.That(refreshTimer, Is.Not.Null, "The link should have a non-null timer.");

// Reset the timer so that it fires immediately and validate that authorization was
// requested. Since opening of the link requests an initial authorization and the expiration
// was set way in the future, there should be exactly two calls.
//
// Because the timer runs in the background, there is a level of non-determinism in when that
// callback will execute. Allow for a small number of delay and retries to account for it.

refreshTimer.Change(0, Timeout.Infinite);

var attemptCount = 0;
var remainingAttempts = 10;
var success = false;

while ((--remainingAttempts >= 0) && (!success))
{
try
{
await Task.Delay(250 * ++attemptCount).ConfigureAwait(false);
success = ((mockScope.IsDisposed) && (mockScope.CallbackInvoked));
}
catch (ObjectDisposedException)
{
Assert.Fail("No disposed exception should have been triggered");
}
catch when (remainingAttempts <= 0)
{
throw;
}
catch
{
// No action needed.
}

Assert.That(mockScope.IsDisposed, Is.True, "The scope should have been disposed.");
Assert.That(mockScope.CallbackInvoked, Is.True, "The authorization timer callback should have been invoked.");
}
}

/// <summary>
/// Verifies functionality of the <see cref="AmqpConnectionScope.Dispose" />
/// method.
Expand Down Expand Up @@ -1666,5 +1730,63 @@ public MockTransport() : base("Mock") { }
protected override void AbortInternal() => throw new NotImplementedException();
protected override bool CloseInternal() => throw new NotImplementedException();
}

/// <summary>
/// Provides a mock which disposes the scope before invoking the default timer callback for authorization.
/// </summary>
///
private class DisposeOnAuthorizationTimerCallbackMockScope : AmqpConnectionScope
{
public bool CallbackInvoked = false;

private AmqpConnection _mockConnection;
private AmqpSession _mockSession;

public DisposeOnAuthorizationTimerCallbackMockScope(Uri serviceEndpoint,
string eventHubName,
EventHubTokenCredential credential,
EventHubsTransportType transport,
IWebProxy proxy) : base(serviceEndpoint, eventHubName, credential, transport, proxy)
{
_mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
_mockSession = new AmqpSession(_mockConnection, new AmqpSessionSettings(), Mock.Of<ILinkFactory>());
}

protected override Task<AmqpConnection> CreateAndOpenConnectionAsync(Version amqpVersion,
Uri serviceEndpoint,
EventHubsTransportType transportType,
IWebProxy proxy,
string scopeIdentifier,
TimeSpan timeout) => Task.FromResult(_mockConnection);

protected override Task OpenAmqpObjectAsync(AmqpObject target, TimeSpan timeout) => Task.CompletedTask;

protected override TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection connection,
AmqpObject amqpLink,
CbsTokenProvider tokenProvider,
Uri endpoint,
string audience,
string resource,
string[] requiredClaims,
TimeSpan refreshTimeout,
Func<Timer> refreshTimerFactory)
{
Action baseImplementation = () => base.CreateAuthorizationRefreshHandler(connection, amqpLink, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout, refreshTimerFactory);

return state =>
{
CallbackInvoked = true;
Dispose();
baseImplementation();
};
}
protected override Task<DateTime> RequestAuthorizationUsingCbsAsync(AmqpConnection connection,
CbsTokenProvider tokenProvider,
Uri endpoint,
string audience,
string resource,
string[] requiredClaims,
TimeSpan timeout) => Task.FromResult(DateTime.Now.AddMinutes(60));
}
}
}