Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Azure Service Bus] Replace Stopwatch with ValueStopwatch #11266

Merged
merged 7 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions sdk/core/Azure.Core/src/Shared/ValueStopwatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma warning disable SA1636
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
#pragma warning restore SA1636

using System;
using System.Diagnostics;

namespace Azure.Core.Diagnostics
{
// https://github.com/dotnet/aspnetcore/blob/master/src/Shared/ValueStopwatch/ValueStopwatch.cs
internal struct ValueStopwatch
{
private static readonly double TimestampToTicks = TimeSpan.TicksPerSecond / (double)Stopwatch.Frequency;

private long _startTimestamp;

public bool IsActive => _startTimestamp != 0;

private ValueStopwatch(long startTimestamp)
{
_startTimestamp = startTimestamp;
}

public static ValueStopwatch StartNew() => new ValueStopwatch(Stopwatch.GetTimestamp());

public TimeSpan GetElapsedTime()
{
// Start timestamp can't be zero in an initialized ValueStopwatch. It would have to be literally the first thing executed when the machine boots to be 0.
// So it being 0 is a clear indication of default(ValueStopwatch)
if (!IsActive)
{
throw new InvalidOperationException("An uninitialized, or 'default', ValueStopwatch cannot be used to get elapsed time.");
}

var end = Stopwatch.GetTimestamp();
var timestampDelta = end - _startTimestamp;
var ticks = (long)(TimestampToTicks * timestampDelta);
return new TimeSpan(ticks);
}
}
}
1 change: 1 addition & 0 deletions sdk/core/Azure.Core/tests/Azure.Core.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<Compile Include="..\src\Shared\NoBodyResponse{T}.cs" />
<Compile Include="..\src\Shared\OperationHelpers.cs" />
<Compile Include="..\src\Shared\AzureResourceProviderNamespaceAttribute.cs" />
<Compile Include="..\src\Shared\ValueStopwatch.cs" />
</ItemGroup>

</Project>
42 changes: 42 additions & 0 deletions sdk/core/Azure.Core/tests/ValueStopwatchTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma warning disable SA1636
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
#pragma warning restore SA1636

using System;
using System.Threading.Tasks;
using Azure.Core.Diagnostics;
using NUnit.Framework;

namespace Azure.Core.Tests.Diagnostics
{
public class ValueStopwatchTests
{
[Test]
public void IsActiveIsFalseForDefaultValueStopwatch()
{
Assert.False(default(ValueStopwatch).IsActive);
}

[Test]
public void IsActiveIsTrueWhenValueStopwatchStartedWithStartNew()
{
Assert.True(ValueStopwatch.StartNew().IsActive);
}

[Test]
public void GetElapsedTimeThrowsIfValueStopwatchIsDefaultValue()
{
var stopwatch = default(ValueStopwatch);
Assert.Throws<InvalidOperationException>(() => stopwatch.GetElapsedTime());
}

[Test]
public async Task GetElapsedTimeReturnsTimeElapsedSinceStart()
{
var stopwatch = ValueStopwatch.StartNew();
await Task.Delay(200);
Assert.True(stopwatch.GetElapsedTime().TotalMilliseconds > 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Diagnostics;
using Azure.Messaging.ServiceBus.Authorization;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
Expand Down Expand Up @@ -235,20 +235,19 @@ public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var stopWatch = Stopwatch.StartNew();
var stopWatch = ValueStopwatch.StartNew();
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var link = await CreateManagementLinkAsync(
entityPath,
connection,
timeout.CalculateRemaining(stopWatch.Elapsed), cancellationToken).ConfigureAwait(false);
timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

stopWatch.Stop();
return link;
}

Expand Down Expand Up @@ -278,7 +277,7 @@ public virtual async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var stopWatch = Stopwatch.StartNew();
var stopWatch = ValueStopwatch.StartNew();
var receiverEndpoint = new Uri(ServiceEndpoint, entityPath);

var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
Expand All @@ -288,7 +287,7 @@ public virtual async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
entityPath,
connection,
receiverEndpoint,
timeout.CalculateRemaining(stopWatch.Elapsed),
timeout.CalculateRemaining(stopWatch.GetElapsedTime()),
prefetchCount,
receiveMode,
sessionId,
Expand All @@ -298,10 +297,9 @@ public virtual async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

stopWatch.Stop();
return link;
}

Expand All @@ -323,7 +321,7 @@ public virtual async Task<SendingAmqpLink> OpenSenderLinkAsync(
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var stopWatch = Stopwatch.StartNew();
var stopWatch = ValueStopwatch.StartNew();

AmqpConnection connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Expand All @@ -332,14 +330,13 @@ public virtual async Task<SendingAmqpLink> OpenSenderLinkAsync(
entityPath,
viaEntityPath,
connection,
timeout.CalculateRemaining(stopWatch.Elapsed), cancellationToken).ConfigureAwait(false);
timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

stopWatch.Stop();
return link;
}

Expand Down Expand Up @@ -394,15 +391,13 @@ protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(
// Create and open the connection, respecting the timeout constraint
// that was received.

var stopWatch = Stopwatch.StartNew();
var stopWatch = ValueStopwatch.StartNew();

var initiator = new AmqpTransportInitiator(amqpSettings, transportSettings);
TransportBase transport = await initiator.ConnectTaskAsync(timeout).ConfigureAwait(false);

var connection = new AmqpConnection(transport, amqpSettings, connectionSetings);
await OpenAmqpObjectAsync(connection, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);

stopWatch.Stop();
await OpenAmqpObjectAsync(connection, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);

// Create the CBS link that will be used for authorization. The act of creating the link will associate
// it with the connection.
Expand Down Expand Up @@ -448,7 +443,7 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var session = default(AmqpSession);
var stopWatch = Stopwatch.StartNew();
var stopWatch = ValueStopwatch.StartNew();

try
{
Expand All @@ -463,7 +458,7 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
// Create and open the link.

var linkSettings = new AmqpLinkSettings();
linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.Elapsed).TotalMilliseconds);
linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime()).TotalMilliseconds);
linkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);
entityPath += '/' + AmqpClientConstants.ManagementAddress;

Expand All @@ -478,7 +473,7 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
ServiceEndpoint,
audience,
claims,
timeout.CalculateRemaining(stopWatch.Elapsed))
timeout.CalculateRemaining(stopWatch.GetElapsedTime()))
.ConfigureAwait(false);

var link = new RequestResponseAmqpLink(
Expand All @@ -487,7 +482,6 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
entityPath,
linkSettings.Properties);
linkSettings.LinkName = $"{connection.Settings.ContainerId};{connection.Identifier}:{session.Identifier}:{link.Identifier}";
stopWatch.Stop();

// Track the link before returning it, so that it can be managed with the scope.
var refreshTimer = default(Timer);
Expand Down Expand Up @@ -557,7 +551,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var session = default(AmqpSession);
var stopWatch = Stopwatch.StartNew();
var stopWatch = ValueStopwatch.StartNew();

try
{
Expand All @@ -571,7 +565,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(
endpoint,
audience,
authClaims,
timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// Create and open the AMQP session associated with the link.
Expand Down Expand Up @@ -605,8 +599,6 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(

link.AttachTo(session);

stopWatch.Stop();

// Configure refresh for authorization of the link.

var refreshTimer = default(Timer);
Expand Down Expand Up @@ -667,7 +659,7 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var session = default(AmqpSession);
var stopWatch = Stopwatch.StartNew();
var stopWatch = ValueStopwatch.StartNew();

try
{
Expand Down Expand Up @@ -698,7 +690,7 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(
destinationEndpoint,
audience,
authClaims,
timeout.CalculateRemaining(stopWatch.Elapsed))
timeout.CalculateRemaining(stopWatch.GetElapsedTime()))
.ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Expand Down Expand Up @@ -726,14 +718,12 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(
linkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, entityPath);
}

linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.Elapsed).TotalMilliseconds);
linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime()).TotalMilliseconds);

var link = new SendingAmqpLink(linkSettings);
linkSettings.LinkName = $"{ Id };{ connection.Identifier }:{ session.Identifier }:{ link.Identifier }";
link.AttachTo(session);

stopWatch.Stop();

// Configure refresh for authorization of the link.

var refreshTimer = default(Timer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Azure.Core;
using Azure.Core.Diagnostics;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
using Azure.Messaging.ServiceBus.Primitives;
Expand Down Expand Up @@ -214,8 +214,6 @@ private async Task<IList<ServiceBusReceivedMessage>> ReceiveBatchAsyncInternal(
var link = default(ReceivingAmqpLink);
var amqpMessages = default(IEnumerable<AmqpMessage>);
var receivedMessages = new List<ServiceBusReceivedMessage>();
var stopWatch = Stopwatch.StartNew();

try
{
link = await _receiveLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false);
Expand Down Expand Up @@ -250,7 +248,6 @@ private async Task<IList<ServiceBusReceivedMessage>> ReceiveBatchAsyncInternal(
}
}

stopWatch.Stop();
return receivedMessages;
}
catch (Exception exception)
Expand Down Expand Up @@ -803,8 +800,7 @@ private async Task<IList<ServiceBusReceivedMessage>> PeekBatchAtInternalAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
var stopWatch = new Stopwatch();
stopWatch.Start();
var stopWatch = ValueStopwatch.StartNew();

AmqpRequestMessage amqpRequestMessage = AmqpRequestMessage.CreateRequest(
ManagementConstants.Operations.PeekMessageOperation,
Expand All @@ -826,13 +822,13 @@ private async Task<IList<ServiceBusReceivedMessage>> PeekBatchAtInternalAsync(

RequestResponseAmqpLink link = await _managementLink.GetOrCreateAsync(
UseMinimum(_connectionScope.SessionTimeout,
timeout.CalculateRemaining(stopWatch.Elapsed)))
timeout.CalculateRemaining(stopWatch.GetElapsedTime())))
.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

using AmqpMessage responseAmqpMessage = await link.RequestAsync(
amqpRequestMessage.AmqpMessage,
timeout.CalculateRemaining(stopWatch.Elapsed))
timeout.CalculateRemaining(stopWatch.GetElapsedTime()))
.ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Expand Down
Loading