Skip to content

Commit

Permalink
Fix WaitForConnectionAsync when NamedPipeServerStream is disposed (#5…
Browse files Browse the repository at this point in the history
…2825)

* Fix WaitForConnectionAsync when NamedPipeServerStream is disposed

* Align Unix implementation on broken pipe IO exception as on Windows

* Add missing methods to test against ObjectDisposedException

* Apply suggestions from code review

Co-authored-by: Stephen Toub <[email protected]>

* Rebase and fix suggestions

* Cancel Accept on dispose

* Improve test

* Apply suggestions from code review

Co-authored-by: Stephen Toub <[email protected]>
  • Loading branch information
manandre and stephentoub authored Jul 18, 2021
1 parent e8be7f2 commit c5edf0e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public sealed partial class NamedPipeServerStream : PipeStream
private int _inBufferSize;
private int _outBufferSize;
private HandleInheritability _inheritability;
private CancellationTokenSource _internalTokenSource = new CancellationTokenSource();

private void Create(string pipeName, PipeDirection direction, int maxNumberOfServerInstances,
PipeTransmissionMode transmissionMode, PipeOptions options, int inBufferSize, int outBufferSize,
Expand Down Expand Up @@ -77,8 +78,25 @@ public Task WaitForConnectionAsync(CancellationToken cancellationToken)
Task.FromCanceled(cancellationToken) :
WaitForConnectionAsyncCore();

async Task WaitForConnectionAsyncCore() =>
HandleAcceptedSocket(await _instance!.ListeningSocket.AcceptAsync(cancellationToken).ConfigureAwait(false));
async Task WaitForConnectionAsyncCore()
{
Socket acceptedSocket;
CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_internalTokenSource.Token, cancellationToken);
try
{
acceptedSocket = await _instance!.ListeningSocket.AcceptAsync(linkedTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
throw new IOException(SR.IO_PipeBroken);
}
finally
{
linkedTokenSource.Dispose();
}

HandleAcceptedSocket(acceptedSocket);
}
}

private void HandleAcceptedSocket(Socket acceptedSocket)
Expand Down Expand Up @@ -116,9 +134,19 @@ private void HandleAcceptedSocket(Socket acceptedSocket)
State = PipeState.Connected;
}

internal override void DisposeCore(bool disposing) =>
internal override void DisposeCore(bool disposing)
{
Interlocked.Exchange(ref _instance, null)?.Dispose(disposing); // interlocked to avoid shared state problems from erroneous double/concurrent disposes

if (disposing)
{
if (State != PipeState.Closed)
{
_internalTokenSource.Cancel();
}
}
}

public void Disconnect()
{
CheckDisconnectOperations();
Expand Down
87 changes: 65 additions & 22 deletions src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,16 @@ public abstract class NamedPipeStreamConformanceTests : PipeStreamConformanceTes
{
protected override bool BrokenPipePropagatedImmediately => OperatingSystem.IsWindows(); // On Unix, implemented on Sockets, where it won't propagate immediate

protected abstract (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams();
protected abstract NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1);
protected abstract NamedPipeClientStream CreateClientStream(string pipeName);

protected (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams()
{
string pipeName = GetUniquePipeName();
NamedPipeServerStream server = CreateServerStream(pipeName);
NamedPipeClientStream client = CreateClientStream(pipeName);
return (server, client);
}

protected sealed override async Task<StreamPair> CreateConnectedStreamsAsync()
{
Expand All @@ -88,6 +97,15 @@ protected sealed override async Task<StreamPair> CreateConnectedStreamsAsync()
return ((NamedPipeServerStream)streams.Stream2, (NamedPipeClientStream)streams.Stream1);
}

protected async Task ValidateDisposedExceptionsAsync(NamedPipeServerStream server)
{
Assert.Throws<ObjectDisposedException>(() => server.Disconnect());
Assert.Throws<ObjectDisposedException>(() => server.GetImpersonationUserName());
Assert.Throws<ObjectDisposedException>(() => server.WaitForConnection());
await Assert.ThrowsAsync<ObjectDisposedException>(() => server.WaitForConnectionAsync());
await ValidateDisposedExceptionsAsync(server as Stream);
}

/// <summary>
/// Yields every combination of testing options for the OneWayReadWrites test
/// </summary>
Expand Down Expand Up @@ -629,6 +647,37 @@ public async Task CancelTokenOn_Client_ReadWriteCancelledToken_Throws_OperationC
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => clientWriteToken);
}
}

[Fact]
public async Task TwoServerInstances_OnceDisposed_Throws()
{
string pipeName = GetUniquePipeName();
NamedPipeServerStream server1 = CreateServerStream(pipeName, 2);
using NamedPipeServerStream server2 = CreateServerStream(pipeName, 2);

Task wait1 = server1.WaitForConnectionAsync();
Task wait2 = server2.WaitForConnectionAsync();
server1.Dispose();
await ValidateDisposedExceptionsAsync(server1);

using NamedPipeClientStream client = CreateClientStream(pipeName);
await client.ConnectAsync();

await Assert.ThrowsAsync<IOException>(() => wait1);

await wait2;

foreach ((Stream writeable, Stream readable) in GetReadWritePairs((server2, client)))
{
byte[] sent = new byte[] { 123 };
byte[] received = new byte[] { 0 };

Task t = Task.Run(() => writeable.Write(sent, 0, sent.Length));
Assert.Equal(sent.Length, readable.Read(received, 0, sent.Length));
Assert.Equal(sent, received);
await t;
}
}
}

public sealed class AnonymousPipeTest_ServerIn_ClientOut : AnonymousPipeStreamConformanceTests
Expand All @@ -653,34 +702,28 @@ protected override (AnonymousPipeServerStream Server, AnonymousPipeClientStream

public sealed class NamedPipeTest_ServerOut_ClientIn : NamedPipeStreamConformanceTests
{
protected override (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams()
{
string pipeName = PipeStreamConformanceTests.GetUniquePipeName();
var server = new NamedPipeServerStream(pipeName, PipeDirection.Out, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
var client = new NamedPipeClientStream(".", pipeName, PipeDirection.In, PipeOptions.Asynchronous);
return (server, client);
}
protected override NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1) =>
new NamedPipeServerStream(pipeName, PipeDirection.Out, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);

protected override NamedPipeClientStream CreateClientStream(string pipeName) =>
new NamedPipeClientStream(".", pipeName, PipeDirection.In, PipeOptions.Asynchronous);
}

public sealed class NamedPipeTest_ServerIn_ClientOut : NamedPipeStreamConformanceTests
{
protected override (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams()
{
string pipeName = PipeStreamConformanceTests.GetUniquePipeName();
var server = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
var client = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, PipeOptions.Asynchronous);
return (server, client);
}
protected override NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1) =>
new NamedPipeServerStream(pipeName, PipeDirection.In, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);

protected override NamedPipeClientStream CreateClientStream(string pipeName) =>
new NamedPipeClientStream(".", pipeName, PipeDirection.Out, PipeOptions.Asynchronous);
}

public sealed class NamedPipeTest_ServerInOut_ClientInOut : NamedPipeStreamConformanceTests
{
protected override (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams()
{
string pipeName = PipeStreamConformanceTests.GetUniquePipeName();
var server = new NamedPipeServerStream(pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
var client = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
return (server, client);
}
protected override NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1) =>
new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);

protected override NamedPipeClientStream CreateClientStream(string pipeName) =>
new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
}
}

0 comments on commit c5edf0e

Please sign in to comment.