Skip to content

Commit

Permalink
[QUIC] Root listener/connection while waiting on new connection/strea…
Browse files Browse the repository at this point in the history
…m event (#74450)

* Fixed GC collecting listener and/or connection while waiting on new connection/stream event

* Minor fixes and cleanups
  • Loading branch information
ManickaP authored Aug 26, 2022
1 parent 38ba41e commit 37eff15
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 20 deletions.
1 change: 0 additions & 1 deletion src/libraries/System.Net.Http/src/System.Net.Http.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,6 @@
<Reference Include="System.Diagnostics.DiagnosticSource" />
<Reference Include="System.Diagnostics.Tracing" />
<Reference Include="System.IO.Compression" />
<Reference Include="System.Linq" />
<Reference Include="System.Memory" />
<Reference Include="System.Net.NameResolution" />
<Reference Include="System.Net.NetworkInformation" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Runtime.Versioning;
using System.Net.Quic;
using System.IO;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
Expand Down Expand Up @@ -155,7 +154,7 @@ private void CheckForShutdown()

if (_clientControl != null)
{
_clientControl.Dispose();
await _clientControl.DisposeAsync().ConfigureAwait(false);
_clientControl = null;
}

Expand Down Expand Up @@ -245,7 +244,10 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
}
finally
{
requestStream?.Dispose();
if (requestStream is not null)
{
await requestStream.DisposeAsync().ConfigureAwait(false);
}
}
}

Expand Down Expand Up @@ -562,7 +564,6 @@ private async Task ProcessServerStreamAsync(QuicStream stream)
}

stream.Abort(QuicAbortDirection.Read, (long)Http3ErrorCode.StreamCreationError);
stream.Dispose();
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ public async ValueTask<QuicStream> AcceptInboundStreamAsync(CancellationToken ca
throw new InvalidOperationException(SR.net_quic_accept_not_allowed);
}

GCHandle keepObject = GCHandle.Alloc(this);
try
{
return await _acceptQueue.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
Expand All @@ -401,6 +402,10 @@ public async ValueTask<QuicStream> AcceptInboundStreamAsync(CancellationToken ca
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
throw;
}
finally
{
keepObject.Free();
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public async ValueTask<QuicConnection> AcceptConnectionAsync(CancellationToken c
{
ObjectDisposedException.ThrowIf(_disposed == 1, this);

GCHandle keepObject = GCHandle.Alloc(this);
try
{
PendingConnection pendingConnection = await _acceptQueue.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
Expand All @@ -175,6 +176,10 @@ public async ValueTask<QuicConnection> AcceptConnectionAsync(CancellationToken c
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
throw;
}
finally
{
keepObject.Free();
}
}

private unsafe int HandleEventNewConnection(ref NEW_CONNECTION_DATA data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ public void Abort(QuicAbortDirection abortDirection, long errorCode)
QUIC_STREAM_SHUTDOWN_FLAGS flags = QUIC_STREAM_SHUTDOWN_FLAGS.NONE;
if (abortDirection.HasFlag(QuicAbortDirection.Read))
{
flags |= QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE;
if (_receiveTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_reading_aborted), final: true))
{
flags |= QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE;
Expand Down Expand Up @@ -537,12 +536,12 @@ private unsafe int HandleEventShutdownComplete(ref SHUTDOWN_COMPLETE data)
(shutdownByApp: true, closedRemotely: true) => ThrowHelper.GetConnectionAbortedException((long)data.ConnectionErrorCode),
// It's local shutdown by app, this side called QuicConnection.CloseAsync, throw QuicError.OperationAborted.
(shutdownByApp: true, closedRemotely: false) => ThrowHelper.GetOperationAbortedException(),
// It's remote shutdown by transport, we received a CONNECTION_CLOSE frame with a QUIC transport error code
// It's remote shutdown by transport, we received a CONNECTION_CLOSE frame with a QUIC transport error code, throw error based on the status.
// TODO: we should propagate the transport error code
// https://github.com/dotnet/runtime/issues/72666
(shutdownByApp: false, closedRemotely: true) => ThrowHelper.GetExceptionForMsQuicStatus(data.ConnectionCloseStatus, $"Shutdown by transport {data.ConnectionErrorCode}"),
// It's local shutdown by transport, due to some timeout
// TODO: we should propagate transport error code
// It's local shutdown by transport, most likely due to a timeout, throw error based on the status.
// TODO: we should propagate the transport error code
// https://github.com/dotnet/runtime/issues/72666
(shutdownByApp: false, closedRemotely: false) => ThrowHelper.GetExceptionForMsQuicStatus(data.ConnectionCloseStatus),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,5 +336,33 @@ public async Task Connect_PeerCertificateDisposed(bool useGetter)
}
peerCertificate.Dispose();
}

[Fact]
public async Task Connection_AwaitsStream_ConnectionSurvivesGC()
{
const byte data = 0xDC;

TaskCompletionSource<IPEndPoint> listenerEndpointTcs = new TaskCompletionSource<IPEndPoint>();
await Task.WhenAll(
Task.Run(async () =>
{
await using var listener = await CreateQuicListener();
listenerEndpointTcs.SetResult(listener.LocalEndPoint);
await using var connection = await listener.AcceptConnectionAsync();
await using var stream = await connection.AcceptInboundStreamAsync();
var buffer = new byte[1];
Assert.Equal(1, await stream.ReadAsync(buffer));
Assert.Equal(data, buffer[0]);
}).WaitAsync(TimeSpan.FromSeconds(5)),
Task.Run(async () =>
{
var endpoint = await listenerEndpointTcs.Task;
await using var connection = await CreateQuicConnection(endpoint);
await Task.Delay(TimeSpan.FromSeconds(0.5));
GC.Collect();
await using var stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Unidirectional);
await stream.WriteAsync(new byte[1] { data }, completeWrites: true);
}).WaitAsync(TimeSpan.FromSeconds(5)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public async Task TwoListenersOnSamePort_DisjointAlpn_Success()
QuicListenerOptions listenerOptions = CreateQuicListenerOptions();
listenerOptions.ListenEndPoint = listener1.LocalEndPoint;
listenerOptions.ApplicationProtocols[0] = new SslApplicationProtocol("someprotocol");
listenerOptions.ConnectionOptionsCallback = (_, _, _) =>
listenerOptions.ConnectionOptionsCallback = (_, _, _) =>
{
var options = CreateQuicServerOptions();
options.ServerAuthenticationOptions.ApplicationProtocols[0] = listenerOptions.ApplicationProtocols[0];
Expand Down Expand Up @@ -144,5 +144,27 @@ public async Task TwoListenersOnSamePort_SameAlpn_Throws()
//
await AssertThrowsQuicExceptionAsync(QuicError.InternalError, async () => await CreateQuicListener(listener.LocalEndPoint));
}

[Fact]
public async Task Listener_AwaitsConnection_ListenerSurvivesGC()
{
TaskCompletionSource<IPEndPoint> listenerEndpointTcs = new TaskCompletionSource<IPEndPoint>();
await Task.WhenAll(
Task.Run(async () =>
{
await using var listener = await CreateQuicListener();
listenerEndpointTcs.SetResult(listener.LocalEndPoint);
var connection = await listener.AcceptConnectionAsync();
await connection.DisposeAsync();
}).WaitAsync(TimeSpan.FromSeconds(5)),
Task.Run(async () =>
{
var endpoint = await listenerEndpointTcs.Task;
await Task.Delay(TimeSpan.FromSeconds(0.5));
GC.Collect();
var connection = await CreateQuicConnection(endpoint);
await connection.DisposeAsync();
}).WaitAsync(TimeSpan.FromSeconds(5)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ await WhenAllOrAnyFailed(
}
catch (Exception ex)
{
_output?.WriteLine($"Failed to {ex.Message}");
_output?.WriteLine($"Failed to connect: {ex.Message}");
throw;
}
}));
Expand Down Expand Up @@ -153,14 +153,5 @@ public override void Dispose()
}
}
}

[OuterLoop("May take several seconds")]
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[SkipOnPlatform(TestPlatforms.LinuxBionic, "SElinux blocks UNIX sockets in our CI environment")]
[ActiveIssue("https://github.com/dotnet/runtime/issues/73377")]
public override Task Parallel_ReadWriteMultipleStreamsConcurrently()
{
return Task.CompletedTask;
}
}
}

0 comments on commit 37eff15

Please sign in to comment.