Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple client open-close semaphore from callback subscription semaphore #3135

Merged
merged 19 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions iothub/device/src/Pipeline/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,6 @@ public override Task OpenAsync(CancellationToken cancellationToken)

public override async Task CloseAsync(CancellationToken cancellationToken)
{
await _clientOpenCloseSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (!_openCalled)
Expand All @@ -793,20 +792,10 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
}
finally
{
Dispose(true);

if (Logging.IsEnabled)
Logging.Exit(this, cancellationToken, nameof(CloseAsync));

try
{
_clientOpenCloseSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
Dispose(true);
}
}

Expand All @@ -815,6 +804,8 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
/// </summary>
private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellationToken)
{
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);

// If this object has already been disposed, we will throw an exception indicating that.
// This is the entry point for interacting with the client and this safety check should be done here.
// The current behavior does not support open->close->open
Expand All @@ -828,7 +819,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat
return;
}

await _clientOpenCloseSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await _clientOpenCloseSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
if (!_opened)
Expand All @@ -840,7 +831,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat
// we are returning the corresponding connection status change event => disconnected: retry_expired.
try
{
await OpenInternalAsync(withRetry, cancellationToken).ConfigureAwait(false);
await OpenInternalAsync(withRetry, operationCts.Token).ConfigureAwait(false);
}
catch (Exception ex) when (!ex.IsFatal())
{
Expand Down Expand Up @@ -882,17 +873,24 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat

private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper)
{
if (Volatile.Read(ref _opened))
using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime());
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token);

// If this object has already been disposed, we will throw an exception indicating that.
// This is the entry point for interacting with the client and this safety check should be done here.
// The current behavior does not support open->close->open
if (_isDisposed)
{
return;
throw new ObjectDisposedException(nameof(RetryDelegatingHandler));
}

bool gain = await _clientOpenCloseSemaphore.WaitAsync(timeoutHelper.GetRemainingTime()).ConfigureAwait(false);
if (!gain)
if (Volatile.Read(ref _opened))
{
throw new TimeoutException("Timed out to acquire handler lock.");
return;
}

await _clientOpenCloseSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);

try
{
if (!_opened)
Expand Down Expand Up @@ -946,6 +944,8 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper

private async Task OpenInternalAsync(bool withRetry, CancellationToken cancellationToken)
{
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token);

if (withRetry)
{
await _internalRetryPolicy
Expand All @@ -958,7 +958,7 @@ await _internalRetryPolicy
Logging.Enter(this, cancellationToken, nameof(OpenAsync));

// Will throw on error.
await base.OpenAsync(cancellationToken).ConfigureAwait(false);
await base.OpenAsync(operationCts.Token).ConfigureAwait(false);
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
}
catch (Exception ex) when (!ex.IsFatal())
Expand All @@ -972,7 +972,7 @@ await _internalRetryPolicy
Logging.Exit(this, cancellationToken, nameof(OpenAsync));
}
},
cancellationToken).ConfigureAwait(false);
operationCts.Token).ConfigureAwait(false);
}
else
{
Expand All @@ -982,7 +982,7 @@ await _internalRetryPolicy
Logging.Enter(this, cancellationToken, nameof(OpenAsync));

// Will throw on error.
await base.OpenAsync(cancellationToken).ConfigureAwait(false);
await base.OpenAsync(operationCts.Token).ConfigureAwait(false);
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
}
catch (Exception ex) when (!ex.IsFatal())
Expand All @@ -1001,6 +1001,7 @@ await _internalRetryPolicy
private async Task OpenInternalAsync(bool withRetry, TimeoutHelper timeoutHelper)
{
using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime());
using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token);

if (withRetry)
{
Expand Down Expand Up @@ -1028,7 +1029,7 @@ await _internalRetryPolicy
Logging.Exit(this, timeoutHelper, nameof(OpenAsync));
}
},
cts.Token)
operationCts.Token)
.ConfigureAwait(false);
}
else
Expand Down
19 changes: 9 additions & 10 deletions iothub/device/tests/Pipeline/RetryDelegatingHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ public async Task RetryDelegatingHandler_OpenAsyncRetries()
// arrange
int callCounter = 0;

var ct = CancellationToken.None;
PipelineContext contextMock = Substitute.For<PipelineContext>();
contextMock.ConnectionStatusChangesHandler = new ConnectionStatusChangesHandler(delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { });
IDelegatingHandler innerHandlerMock = Substitute.For<IDelegatingHandler>();

innerHandlerMock
.OpenAsync(ct)
.OpenAsync(Arg.Any<CancellationToken>())
.Returns(t =>
{
return ++callCounter == 1
Expand All @@ -45,7 +44,7 @@ public async Task RetryDelegatingHandler_OpenAsyncRetries()
var retryDelegatingHandler = new RetryDelegatingHandler(contextMock, innerHandlerMock);

// act
await retryDelegatingHandler.OpenAsync(ct).ConfigureAwait(false);
await retryDelegatingHandler.OpenAsync(CancellationToken.None).ConfigureAwait(false);

// assert
callCounter.Should().Be(2);
Expand Down Expand Up @@ -255,12 +254,12 @@ public async Task DeviceNotFoundExceptionReturnsDeviceDisabledStatus()
public async Task RetryTransientErrorThrownAfterNumberOfRetriesThrows()
{
// arrange
using var cts = new CancellationTokenSource(100);
using var cts = new CancellationTokenSource(1000);
var contextMock = Substitute.For<PipelineContext>();
contextMock.ConnectionStatusChangesHandler = new ConnectionStatusChangesHandler(delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { });
var innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock
.OpenAsync(cts.Token)
.OpenAsync(Arg.Any<CancellationToken>())
.Returns(t => throw new IotHubException(TestExceptionMessage, isTransient: true));

var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
Expand Down Expand Up @@ -352,12 +351,12 @@ public async Task RetrySetRetryPolicyVerifyInternalsSuccess()
delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { });
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);

var retryPolicy = new TestRetryPolicy();
var retryPolicy = new TestRetryPolicyRetryTwice();
sut.SetRetryPolicy(retryPolicy);

int innerHandlerCallCounter = 0;

innerHandlerMock.OpenAsync(CancellationToken.None).Returns(t =>
innerHandlerMock.OpenAsync(Arg.Any<CancellationToken>()).Returns(t =>
{
innerHandlerCallCounter++;
throw new IotHubCommunicationException();
Expand Down Expand Up @@ -397,7 +396,7 @@ public async Task RetryCancellationTokenCanceledAbandon()
{
// arrange
var innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock.AbandonAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask);
innerHandlerMock.AbandonAsync(null, Arg.Any<CancellationToken>()).ReturnsForAnyArgs(TaskHelpers.CompletedTask);

var contextMock = Substitute.For<PipelineContext>();
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
Expand All @@ -416,7 +415,7 @@ public async Task RetryCancellationTokenCanceledReject()
{
// arrange
var innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock.RejectAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask);
innerHandlerMock.RejectAsync(null, Arg.Any<CancellationToken>()).ReturnsForAnyArgs(TaskHelpers.CompletedTask);

var contextMock = Substitute.For<PipelineContext>();
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
Expand All @@ -427,7 +426,7 @@ public async Task RetryCancellationTokenCanceledReject()
await sut.RejectAsync(Arg.Any<string>(), cts.Token).ExpectedAsync<TaskCanceledException>().ConfigureAwait(false);
}

private class TestRetryPolicy : IRetryPolicy
private class TestRetryPolicyRetryTwice : IRetryPolicy
{
public int Counter { get; private set; }

Expand Down