From 5a4483eb1c0918542298d1864b7233f962952880 Mon Sep 17 00:00:00 2001 From: Anton Firszov Date: Mon, 26 Oct 2020 13:18:31 +0100 Subject: [PATCH] Fix RHEL7 socket dispose hang, and extend coverage (#43409) Fix #42686 by doing a graceful close in case if the abortive connect(AF_UNSPEC) call fails on Linux, and improve couple of related tests: - Extend RetryHelper with an exception filter - Connect, Accept, Send, Receive, SendFile cancellation tests: make sure cancellation failures are not masked by RetryHelper (use exception filter) - Connect, Accept, Send, Receive cancellation tests: also test IPV6 and DualMode sockets --- .../Net/Sockets/SocketTestExtensions.cs | 16 +++-- .../tests/TestUtilities/System/RetryHelper.cs | 15 ++-- .../Unix/System.Native/pal_networking.c | 5 ++ .../tests/FunctionalTests/Accept.cs | 26 ++++--- .../tests/FunctionalTests/Connect.cs | 26 ++++--- .../FunctionalTests/InlineCompletions.Unix.cs | 4 +- .../tests/FunctionalTests/SendFile.cs | 9 +-- .../tests/FunctionalTests/SendReceive.cs | 69 +++++++++++++------ .../tests/FunctionalTests/TelemetryTest.cs | 6 -- 9 files changed, 111 insertions(+), 65 deletions(-) diff --git a/src/libraries/Common/tests/System/Net/Sockets/SocketTestExtensions.cs b/src/libraries/Common/tests/System/Net/Sockets/SocketTestExtensions.cs index 5d2ef3ee1f0980..b316d8e6e271f5 100644 --- a/src/libraries/Common/tests/System/Net/Sockets/SocketTestExtensions.cs +++ b/src/libraries/Common/tests/System/Net/Sockets/SocketTestExtensions.cs @@ -38,14 +38,20 @@ public static void ForceNonBlocking(this Socket socket, bool force) } } - public static (Socket, Socket) CreateConnectedSocketPair() + public static (Socket client, Socket server) CreateConnectedSocketPair(bool ipv6 = false, bool dualModeClient = false) { - using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - listener.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + IPAddress serverAddress = ipv6 ? IPAddress.IPv6Loopback : IPAddress.Loopback; + + using Socket listener = new Socket(serverAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + listener.Bind(new IPEndPoint(serverAddress, 0)); listener.Listen(1); - Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - client.Connect(listener.LocalEndPoint); + IPEndPoint connectTo = (IPEndPoint)listener.LocalEndPoint; + if (dualModeClient) connectTo = new IPEndPoint(connectTo.Address.MapToIPv6(), connectTo.Port); + + Socket client = new Socket(connectTo.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + if (dualModeClient) client.DualMode = true; + client.Connect(connectTo); Socket server = listener.Accept(); return (client, server); diff --git a/src/libraries/Common/tests/TestUtilities/System/RetryHelper.cs b/src/libraries/Common/tests/TestUtilities/System/RetryHelper.cs index cd5221e2b41ed2..8378d59a8b1a30 100644 --- a/src/libraries/Common/tests/TestUtilities/System/RetryHelper.cs +++ b/src/libraries/Common/tests/TestUtilities/System/RetryHelper.cs @@ -10,12 +10,14 @@ namespace System public static partial class RetryHelper { private static readonly Func s_defaultBackoffFunc = i => Math.Min(i * 100, 60_000); + private static readonly Predicate s_defaultRetryWhenFunc = _ => true; /// Executes the action up to a maximum of times. /// The maximum number of times to invoke . /// The test to invoke. /// After a failure, invoked to determine how many milliseconds to wait before the next attempt. It's passed the number of iterations attempted. - public static void Execute(Action test, int maxAttempts = 5, Func backoffFunc = null) + /// Invoked to select the exceptions to retry on. If not set, any exception will trigger a retry. + public static void Execute(Action test, int maxAttempts = 5, Func backoffFunc = null, Predicate retryWhen = null) { // Validate arguments if (maxAttempts < 1) @@ -27,6 +29,8 @@ public static void Execute(Action test, int maxAttempts = 5, Func back throw new ArgumentNullException(nameof(test)); } + retryWhen ??= s_defaultRetryWhenFunc; + // Execute the test until it either passes or we run it maxAttempts times var exceptions = new List(); for (int i = 1; i <= maxAttempts; i++) @@ -36,7 +40,7 @@ public static void Execute(Action test, int maxAttempts = 5, Func back test(); return; } - catch (Exception e) + catch (Exception e) when (retryWhen(e)) { exceptions.Add(e); if (i == maxAttempts) @@ -53,7 +57,8 @@ public static void Execute(Action test, int maxAttempts = 5, Func back /// The maximum number of times to invoke . /// The test to invoke. /// After a failure, invoked to determine how many milliseconds to wait before the next attempt. It's passed the number of iterations attempted. - public static async Task ExecuteAsync(Func test, int maxAttempts = 5, Func backoffFunc = null) + /// Invoked to select the exceptions to retry on. If not set, any exception will trigger a retry. + public static async Task ExecuteAsync(Func test, int maxAttempts = 5, Func backoffFunc = null, Predicate retryWhen = null) { // Validate arguments if (maxAttempts < 1) @@ -65,6 +70,8 @@ public static async Task ExecuteAsync(Func test, int maxAttempts = 5, Func throw new ArgumentNullException(nameof(test)); } + retryWhen ??= s_defaultRetryWhenFunc; + // Execute the test until it either passes or we run it maxAttempts times var exceptions = new List(); for (int i = 1; i <= maxAttempts; i++) @@ -74,7 +81,7 @@ public static async Task ExecuteAsync(Func test, int maxAttempts = 5, Func await test().ConfigureAwait(false); return; } - catch (Exception e) + catch (Exception e) when (retryWhen(e)) { exceptions.Add(e); if (i == maxAttempts) diff --git a/src/libraries/Native/Unix/System.Native/pal_networking.c b/src/libraries/Native/Unix/System.Native/pal_networking.c index 09ec260149f497..cf080d439ac478 100644 --- a/src/libraries/Native/Unix/System.Native/pal_networking.c +++ b/src/libraries/Native/Unix/System.Native/pal_networking.c @@ -3085,6 +3085,11 @@ int32_t SystemNative_Disconnect(intptr_t socket) addr.sa_family = AF_UNSPEC; err = connect(fd, &addr, sizeof(addr)); + if (err != 0) + { + // On some older kernels connect(AF_UNSPEC) may fail. Fall back to shutdown in these cases: + err = shutdown(fd, SHUT_RDWR); + } #elif HAVE_DISCONNECTX // disconnectx causes a FIN close on OSX. It's the best we can do. err = disconnectx(fd, SAE_ASSOCID_ANY, SAE_CONNID_ANY); diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/Accept.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/Accept.cs index 7f0dc3adc14a41..5eb3fc8c617f3d 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/Accept.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/Accept.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; +using Xunit.Sdk; namespace System.Net.Sockets.Tests { @@ -289,16 +290,25 @@ public async Task AcceptAsync_MultipleAcceptsThenDispose_AcceptsThrowAfterDispos } } - [Fact] - public async Task AcceptGetsCanceledByDispose() + public static readonly TheoryData AcceptGetsCanceledByDispose_Data = new TheoryData + { + { IPAddress.Loopback }, + { IPAddress.IPv6Loopback }, + { IPAddress.Loopback.MapToIPv6() } + }; + + [Theory] + [MemberData(nameof(AcceptGetsCanceledByDispose_Data))] + public async Task AcceptGetsCanceledByDispose(IPAddress loopback) { // We try this a couple of times to deal with a timing race: if the Dispose happens // before the operation is started, we won't see a SocketException. int msDelay = 100; await RetryHelper.ExecuteAsync(async () => { - var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - listener.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + var listener = new Socket(loopback.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + if (loopback.IsIPv4MappedToIPv6) listener.DualMode = true; + listener.Bind(new IPEndPoint(loopback, 0)); listener.Listen(1); Task acceptTask = AcceptAsync(listener); @@ -308,11 +318,7 @@ await RetryHelper.ExecuteAsync(async () => msDelay *= 2; Task disposeTask = Task.Run(() => listener.Dispose()); - var cts = new CancellationTokenSource(); - Task timeoutTask = Task.Delay(30000, cts.Token); - Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, acceptTask, timeoutTask)); - cts.Cancel(); - + await Task.WhenAny(disposeTask, acceptTask).TimeoutAfter(30000); await disposeTask; SocketError? localSocketError = null; @@ -343,7 +349,7 @@ await RetryHelper.ExecuteAsync(async () => { Assert.Equal(SocketError.OperationAborted, localSocketError); } - }, maxAttempts: 10); + }, maxAttempts: 10, retryWhen: e => e is XunitException); } [Fact] diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/Connect.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/Connect.cs index 08ecf15f3b925b..40a15f7185dfe9 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/Connect.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/Connect.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; +using Xunit.Sdk; namespace System.Net.Sockets.Tests { @@ -120,29 +121,34 @@ public async Task Connect_AfterDisconnect_Fails() } } - [Fact] + public static readonly TheoryData ConnectGetsCanceledByDispose_Data = new TheoryData + { + { IPAddress.Parse("1.1.1.1") }, + { IPAddress.Parse("1.1.1.1").MapToIPv6() }, + }; + + [OuterLoop("Connects to external server")] + [Theory] + [MemberData(nameof(ConnectGetsCanceledByDispose_Data))] [PlatformSpecific(~(TestPlatforms.OSX | TestPlatforms.FreeBSD))] // Not supported on BSD like OSes. - public async Task ConnectGetsCanceledByDispose() + public async Task ConnectGetsCanceledByDispose(IPAddress address) { // We try this a couple of times to deal with a timing race: if the Dispose happens // before the operation is started, we won't see a SocketException. int msDelay = 100; await RetryHelper.ExecuteAsync(async () => { - var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + var client = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + if (address.IsIPv4MappedToIPv6) client.DualMode = true; - Task connectTask = ConnectAsync(client, new IPEndPoint(IPAddress.Parse("1.1.1.1"), 23)); + Task connectTask = ConnectAsync(client, new IPEndPoint(address, 23)); // Wait a little so the operation is started. await Task.Delay(msDelay); msDelay *= 2; Task disposeTask = Task.Run(() => client.Dispose()); - var cts = new CancellationTokenSource(); - Task timeoutTask = Task.Delay(30000, cts.Token); - Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, connectTask, timeoutTask)); - cts.Cancel(); - + await Task.WhenAny(disposeTask, connectTask).TimeoutAfter(30000); await disposeTask; SocketError? localSocketError = null; @@ -176,7 +182,7 @@ await RetryHelper.ExecuteAsync(async () => { Assert.Equal(SocketError.OperationAborted, localSocketError); } - }, maxAttempts: 10); + }, maxAttempts: 10, retryWhen: e => e is XunitException); } } diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/InlineCompletions.Unix.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/InlineCompletions.Unix.cs index 3823e511ba59b4..a8c9d93d960fca 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/InlineCompletions.Unix.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/InlineCompletions.Unix.cs @@ -31,8 +31,8 @@ public void InlineSocketContinuations() await new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, useMultipleBuffers: false); await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentReceives(IPAddress.Loopback, useMultipleBuffers: false); await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentSends(IPAddress.Loopback, useMultipleBuffers: false); - await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: true); - await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: false); + await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: true, ipv6Server: false, dualModeClient: false); + await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: false, ipv6Server: false, dualModeClient: false); }, options).Dispose(); } } diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendFile.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendFile.cs index 3a8db0f161ee6e..539ed40e867af5 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendFile.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendFile.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Xunit; +using Xunit.Sdk; namespace System.Net.Sockets.Tests { @@ -310,11 +311,7 @@ await RetryHelper.ExecuteAsync(async () => msDelay *= 2; Task disposeTask = Task.Run(() => socket1.Dispose()); - var cts = new CancellationTokenSource(); - Task timeoutTask = Task.Delay(30000, cts.Token); - Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, socketOperation, timeoutTask)); - cts.Cancel(); - + await Task.WhenAny(disposeTask, socketOperation).TimeoutAfter(30000); await disposeTask; SocketError? localSocketError = null; @@ -355,7 +352,7 @@ await RetryHelper.ExecuteAsync(async () => Assert.Equal(SocketError.ConnectionReset, peerSocketError); } } - }, maxAttempts: 10); + }, maxAttempts: 10, retryWhen: e => e is XunitException); } [OuterLoop] diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs index 50a329e044c2b0..b6d5eea3d38108 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs @@ -7,8 +7,10 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.RemoteExecutor; +using Microsoft.DotNet.XUnitExtensions; using Xunit; using Xunit.Abstractions; +using Xunit.Sdk; namespace System.Net.Sockets.Tests { @@ -937,17 +939,26 @@ error is SocketException || } } - [Fact] + public static readonly TheoryData UdpReceiveGetsCanceledByDispose_Data = new TheoryData + { + { IPAddress.Loopback }, + { IPAddress.IPv6Loopback }, + { IPAddress.Loopback.MapToIPv6() } + }; + + [Theory] + [MemberData(nameof(UdpReceiveGetsCanceledByDispose_Data))] [PlatformSpecific(~TestPlatforms.OSX)] // Not supported on OSX. - public async Task UdpReceiveGetsCanceledByDispose() + public async Task UdpReceiveGetsCanceledByDispose(IPAddress address) { // We try this a couple of times to deal with a timing race: if the Dispose happens // before the operation is started, we won't see a SocketException. int msDelay = 100; await RetryHelper.ExecuteAsync(async () => { - var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); - socket.BindToAnonymousPort(IPAddress.Loopback); + var socket = new Socket(address.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + if (address.IsIPv4MappedToIPv6) socket.DualMode = true; + socket.BindToAnonymousPort(address); Task receiveTask = ReceiveAsync(socket, new ArraySegment(new byte[1])); @@ -956,11 +967,7 @@ await RetryHelper.ExecuteAsync(async () => msDelay *= 2; Task disposeTask = Task.Run(() => socket.Dispose()); - var cts = new CancellationTokenSource(); - Task timeoutTask = Task.Delay(30000, cts.Token); - Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, receiveTask, timeoutTask)); - cts.Cancel(); - + await Task.WhenAny(disposeTask, receiveTask).TimeoutAfter(30000); await disposeTask; SocketError? localSocketError = null; @@ -991,21 +998,35 @@ await RetryHelper.ExecuteAsync(async () => { Assert.Equal(SocketError.OperationAborted, localSocketError); } - }, maxAttempts: 10); + }, maxAttempts: 10, retryWhen: e => e is XunitException); } - [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task TcpReceiveSendGetsCanceledByDispose(bool receiveOrSend) + public static readonly TheoryData TcpReceiveSendGetsCanceledByDispose_Data = new TheoryData + { + { true, false, false }, + { true, false, true }, + { true, true, false }, + { false, false, false }, + { false, false, true }, + { false, true, false }, + }; + + [Theory(Timeout = 40000)] + [MemberData(nameof(TcpReceiveSendGetsCanceledByDispose_Data))] + public async Task TcpReceiveSendGetsCanceledByDispose(bool receiveOrSend, bool ipv6Server, bool dualModeClient) { + // RHEL7 kernel has a bug preventing close(AF_UNKNOWN) to succeed with IPv6 sockets. + // In this case Dispose will trigger a graceful shutdown, which means that receive will succeed on socket2. + // TODO: Remove this, once CI machines are updated to a newer kernel. + bool expectGracefulShutdown = UsesSync && PlatformDetection.IsRedHatFamily7 && receiveOrSend && (ipv6Server || dualModeClient); + // We try this a couple of times to deal with a timing race: if the Dispose happens // before the operation is started, the peer won't see a ConnectionReset SocketException and we won't // see a SocketException either. int msDelay = 100; await RetryHelper.ExecuteAsync(async () => { - (Socket socket1, Socket socket2) = SocketTestExtensions.CreateConnectedSocketPair(); + (Socket socket1, Socket socket2) = SocketTestExtensions.CreateConnectedSocketPair(ipv6Server, dualModeClient); using (socket2) { Task socketOperation; @@ -1030,11 +1051,7 @@ await RetryHelper.ExecuteAsync(async () => msDelay *= 2; Task disposeTask = Task.Run(() => socket1.Dispose()); - var cts = new CancellationTokenSource(); - Task timeoutTask = Task.Delay(30000, cts.Token); - Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, socketOperation, timeoutTask)); - cts.Cancel(); - + await Task.WhenAny(disposeTask, socketOperation).TimeoutAfter(30000); await disposeTask; SocketError? localSocketError = null; @@ -1088,10 +1105,18 @@ await RetryHelper.ExecuteAsync(async () => break; } } - Assert.Equal(SocketError.ConnectionReset, peerSocketError); + + if (!expectGracefulShutdown) + { + Assert.Equal(SocketError.ConnectionReset, peerSocketError); + } + else + { + Assert.Null(peerSocketError); + } } } - }, maxAttempts: 10); + }, maxAttempts: 10, retryWhen: e => e is XunitException); } [Fact] diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs index 8f834a5c112c3d..bb6a8bd2adf622 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs @@ -168,12 +168,6 @@ await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), asyn [MemberData(nameof(SocketMethods_WithBools_MemberData))] public void EventSource_SocketConnectFailure_LogsConnectFailed(string connectMethod, bool useDnsEndPoint) { - if (connectMethod == "Sync" && PlatformDetection.IsRedHatFamily7) - { - // [ActiveIssue("https://github.com/dotnet/runtime/issues/42686")] - throw new SkipTestException("Disposing a Socket performing a sync operation can hang on RedHat7 systems"); - } - RemoteExecutor.Invoke(async (connectMethod, useDnsEndPointString) => { EndPoint endPoint = await GetRemoteEndPointAsync(useDnsEndPointString, port: 12345);