diff --git a/src/Benchmarks/MicroBenchmarks/InFlightRequestBenchmark.cs b/src/Benchmarks/MicroBenchmarks/InFlightRequestBenchmark.cs new file mode 100644 index 000000000..282d4dd13 --- /dev/null +++ b/src/Benchmarks/MicroBenchmarks/InFlightRequestBenchmark.cs @@ -0,0 +1,56 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Threading; +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Jobs; +using NATS.Client.Internals; + +namespace MicroBenchmarks +{ + [MemoryDiagnoser] + [MarkdownExporterAttribute.GitHub] + [SimpleJob(RuntimeMoniker.Net462)] + [SimpleJob(RuntimeMoniker.NetCoreApp31)] + [GroupBenchmarksBy(BenchmarkLogicalGroupRule.ByCategory)] + [CategoriesColumn] + public class InFlightRequestBenchmark + { + private static void OnCompleted(string _) { } + + private static CancellationToken _ct = new CancellationTokenSource().Token; + + [Params(0, 999_999)] + public int Timeout { get; set; } + + [BenchmarkCategory("DefaultToken")] + [Benchmark] + public string InFlightRequest() + { + var request = new InFlightRequest("a", default, Timeout, OnCompleted); + var id = request.Id; + request.Dispose(); + return id; + } + [BenchmarkCategory("ClientToken")] + [Benchmark] + public string InFlightRequestClientToken() + { + var request = new InFlightRequest("a", _ct, Timeout, OnCompleted); + var id = request.Id; + request.Dispose(); + return id; + } + } +} diff --git a/src/Benchmarks/MicroBenchmarks/MicroBenchmarks.csproj b/src/Benchmarks/MicroBenchmarks/MicroBenchmarks.csproj index 7fe690cf8..4276fc4a8 100644 --- a/src/Benchmarks/MicroBenchmarks/MicroBenchmarks.csproj +++ b/src/Benchmarks/MicroBenchmarks/MicroBenchmarks.csproj @@ -7,6 +7,7 @@ AnyCPU pdbonly true + false diff --git a/src/Benchmarks/MicroBenchmarks/Program.cs b/src/Benchmarks/MicroBenchmarks/Program.cs index 7ff6d2d6c..3eb52cd85 100644 --- a/src/Benchmarks/MicroBenchmarks/Program.cs +++ b/src/Benchmarks/MicroBenchmarks/Program.cs @@ -19,7 +19,7 @@ public class Program { public static void Main(string[] args) { - var summary = BenchmarkRunner.Run(); + BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args); } } } diff --git a/src/NATS.Client/Conn.cs b/src/NATS.Client/Conn.cs index 207169dd0..7dad3ac8f 100644 --- a/src/NATS.Client/Conn.cs +++ b/src/NATS.Client/Conn.cs @@ -158,46 +158,6 @@ public Options Opts private readonly ConcurrentDictionary waitingRequests = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - // Handles in-flight requests when using the new-style request/reply behavior - private sealed class InFlightRequest : IDisposable - { - internal InFlightRequest(string id, CancellationToken token, int timeout, Action onCompleted) - { - this.Id = id; - this.Waiter = new TaskCompletionSource(); - this.onCompleted = onCompleted; - this.tokenSource = token == CancellationToken.None - ? new CancellationTokenSource() - : CancellationTokenSource.CreateLinkedTokenSource(token); - - this.tokenRegistration = this.tokenSource.Token.Register(() => - { - if (timeout > 0) - this.Waiter.TrySetException(new NATSTimeoutException()); - - this.Waiter.TrySetCanceled(); - }); - - if(timeout > 0) - this.tokenSource.CancelAfter(timeout); - } - - public string Id { get; } - public TaskCompletionSource Waiter { get; } - public CancellationToken Token => tokenSource.Token; - - private readonly Action onCompleted; - private readonly CancellationTokenSource tokenSource; - private CancellationTokenRegistration tokenRegistration; - - public void Dispose() - { - this.tokenRegistration.Dispose(); - this.tokenSource?.Dispose(); - this.onCompleted?.Invoke(this.Id); - } - } - // Prepare protocol messages for efficiency private byte[] PING_P_BYTES = null; private int PING_P_BYTES_LEN; diff --git a/src/NATS.Client/Internals/InFlightRequest.cs b/src/NATS.Client/Internals/InFlightRequest.cs new file mode 100644 index 000000000..fe791dd60 --- /dev/null +++ b/src/NATS.Client/Internals/InFlightRequest.cs @@ -0,0 +1,95 @@ +// Copyright 2017-2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace NATS.Client.Internals +{ + /// + /// Represents an in-flight request/reply operation. + /// + /// + /// This class is not used when using the legacy request/reply + /// pattern (see ). + /// + internal sealed class InFlightRequest : IDisposable + { + private readonly Action _onCompleted; + private readonly CancellationTokenSource _tokenSource; + private readonly CancellationTokenRegistration _tokenRegistration; + private readonly CancellationToken _clientProvidedToken; + + public readonly string Id; + public readonly CancellationToken Token; + public readonly TaskCompletionSource Waiter = new TaskCompletionSource(); + + /// + /// Initializes a new instance of class. + /// + /// The id associated with the request. + /// The cancellation token used to cancel the request. + /// A timeout (ms) after which the request is canceled. + /// The delegate that will be executed after the request ended. + /// Thrown if the request is cancelled by before receiving a response. + /// Thrown if the request is cancelled because period has elapsed before receiving a response. + internal InFlightRequest(string id, CancellationToken token, int timeout, Action onCompleted) + { + _onCompleted = onCompleted ?? throw new ArgumentNullException(nameof(onCompleted)); + _clientProvidedToken = token; + Id = id; + + if (timeout > 0 && token == default) + { + _tokenSource = new CancellationTokenSource(); + Token = _tokenSource.Token; + } + else if (timeout > 0) + { + _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); + Token = _tokenSource.Token; + } + else + { + Token = token; + } + + _tokenRegistration = Token.Register(CancellationCallback, this); + + if (timeout > 0) + _tokenSource.CancelAfter(timeout); + } + + private static void CancellationCallback(object req) + { + var request = req as InFlightRequest; + + if (request._clientProvidedToken.IsCancellationRequested) + request.Waiter.TrySetCanceled(); + + request.Waiter.TrySetException(new NATSTimeoutException()); + } + + /// + /// Releases all resources used by the current instance of the + /// class and invokes the onCompleted delegate. + /// + public void Dispose() + { + _tokenRegistration.Dispose(); + _tokenSource?.Dispose(); + _onCompleted.Invoke(Id); + } + } +} diff --git a/src/NATS.Client/Properties/AssemblyInfo.cs b/src/NATS.Client/Properties/AssemblyInfo.cs index 346e5cb05..7226e4387 100644 --- a/src/NATS.Client/Properties/AssemblyInfo.cs +++ b/src/NATS.Client/Properties/AssemblyInfo.cs @@ -6,7 +6,6 @@ #if DEBUG [assembly: InternalsVisibleTo("UnitTests")] [assembly: InternalsVisibleTo("MicroBenchmarks")] - #else [assembly: InternalsVisibleTo("UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd")] [assembly: InternalsVisibleTo("MicroBenchmarks, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd")] diff --git a/src/Tests/UnitTests/Internals/InFlightRequestTests.cs b/src/Tests/UnitTests/Internals/InFlightRequestTests.cs new file mode 100644 index 000000000..7a9fd98f5 --- /dev/null +++ b/src/Tests/UnitTests/Internals/InFlightRequestTests.cs @@ -0,0 +1,94 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Threading; +using System.Threading.Tasks; +using NATS.Client; +using NATS.Client.Internals; +using Xunit; + +namespace UnitTests.Internals +{ + public class InFlightRequestTests + { + [Fact] + public async Task Timeout_ThrowsNatsTimeoutException() + { + // Arrange + var sut = new InFlightRequest("Foo", default, 1, _ => { }); + + // Assert + await Assert.ThrowsAsync(() => sut.Waiter.Task); + } + + [Fact] + public async Task TimeoutWithToken_ThrowsTaskCanceledExcpetion() + { + // Arrange + var cts = new CancellationTokenSource(); + var sut = new InFlightRequest("Foo", cts.Token, 1, _ => { }); + + // Assert + await Assert.ThrowsAsync(() => sut.Waiter.Task); + } + + [Fact] + public async Task Canceled_ThrowsTaskCanceledExcpetion() + { + // Arrange + var cts = new CancellationTokenSource(); + var sut = new InFlightRequest("Foo", cts.Token, 0, _ => { }); + + // Act + cts.Cancel(); + + // Assert + await Assert.ThrowsAsync(() => sut.Waiter.Task); + } + + [Fact] + public async Task CanceledWithTimeout_ThrowsTaskCanceledException() + { + // Arrange + var cts = new CancellationTokenSource(); + var sut = new InFlightRequest("Foo", cts.Token, int.MaxValue, _ => { }); + + // Act + cts.Cancel(); + + // Assert + await Assert.ThrowsAsync(() => sut.Waiter.Task); + } + + [Fact] + public void Dispose_InvokesOnCompletedDelegate() + { + // Arrange + var onCompletedArg = ""; + var sut = new InFlightRequest("Foo", default, 0, id => { onCompletedArg = id; }); + + // Act + sut.Dispose(); + + // Assert + Assert.Equal("Foo", onCompletedArg); + } + + [Fact] + public void Ctor_ThrowsForNullArg() + { + Assert.Throws("onCompleted", () => new InFlightRequest("Foo", default, 0, null)); + } + } +}