Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Override memory-based ReadAsync & WriteAsync for SerialStream #56866

Merged
merged 9 commits into from
Aug 9, 2021
102 changes: 80 additions & 22 deletions src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs
Original file line number Diff line number Diff line change
Expand Up @@ -431,30 +431,64 @@ public override Task<int> ReadAsync(byte[] array, int offset, int count, Cancell
return Task<int>.FromResult(0); // return immediately if no bytes requested; no need for overhead.

Memory<byte> buffer = new Memory<byte>(array, offset, count);
SerialStreamIORequest result = new SerialStreamIORequest(cancellationToken, buffer);
SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer);
_readQueue.Enqueue(result);

EnsureIOLoopRunning();

return result.Task;
}

#if !NETFRAMEWORK && !NETSTANDARD2_0
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
CheckHandle();

if (buffer.IsEmpty)
return new ValueTask<int>(0);

SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer);
_readQueue.Enqueue(result);

EnsureIOLoopRunning();

return new ValueTask<int>(result.Task);
}
#endif

public override Task WriteAsync(byte[] array, int offset, int count, CancellationToken cancellationToken)
{
CheckWriteArguments(array, offset, count);

if (count == 0)
return Task.CompletedTask; // return immediately if no bytes to write; no need for overhead.

Memory<byte> buffer = new Memory<byte>(array, offset, count);
SerialStreamIORequest result = new SerialStreamIORequest(cancellationToken, buffer);
ReadOnlyMemory<byte> buffer = new ReadOnlyMemory<byte>(array, offset, count);
SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer);
_writeQueue.Enqueue(result);

EnsureIOLoopRunning();

return result.Task;
}

#if !NETFRAMEWORK && !NETSTANDARD2_0
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
CheckWriteArguments();

if (buffer.IsEmpty)
return ValueTask.CompletedTask; // return immediately if no bytes to write; no need for overhead.

SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer);
_writeQueue.Enqueue(result);

EnsureIOLoopRunning();

return new ValueTask(result.Task);
}
#endif

public override IAsyncResult BeginRead(byte[] array, int offset, int numBytes, AsyncCallback userCallback, object stateObject)
{
return TaskToApm.Begin(ReadAsync(array, offset, numBytes), userCallback, stateObject);
Expand Down Expand Up @@ -715,7 +749,8 @@ private void RaiseDataReceivedEof()

private unsafe int ProcessRead(SerialStreamIORequest r)
{
Span<byte> buff = r.Buffer.Span;
SerialStreamReadRequest readRequest = (SerialStreamReadRequest)r;
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
Span<byte> buff = readRequest.Buffer.Span;
fixed (byte* bufPtr = buff)
{
// assumes dequeue-ing happens on a single thread
Expand All @@ -728,12 +763,12 @@ private unsafe int ProcessRead(SerialStreamIORequest r)
// ignore EWOULDBLOCK since we handle timeout elsewhere
if (lastError.Error != Interop.Error.EWOULDBLOCK)
{
r.Complete(Interop.GetIOException(lastError));
readRequest.Complete(Interop.GetIOException(lastError));
}
}
else if (numBytes > 0)
{
r.Complete(numBytes);
readRequest.Complete(numBytes);
return numBytes;
}
else // numBytes == 0
Expand All @@ -747,7 +782,8 @@ private unsafe int ProcessRead(SerialStreamIORequest r)

private unsafe int ProcessWrite(SerialStreamIORequest r)
{
ReadOnlySpan<byte> buff = r.Buffer.Span;
SerialStreamWriteRequest writeRequest = (SerialStreamWriteRequest)r;
ReadOnlySpan<byte> buff = writeRequest.Buffer.Span;
fixed (byte* bufPtr = buff)
{
// assumes dequeue-ing happens on a single thread
Expand All @@ -766,11 +802,11 @@ private unsafe int ProcessWrite(SerialStreamIORequest r)
}
else
{
r.ProcessBytes(numBytes);
writeRequest.ProcessBytes(numBytes);

if (r.Buffer.Length == 0)
if (writeRequest.Buffer.Length == 0)
{
r.Complete();
writeRequest.Complete();
}

return numBytes;
Expand Down Expand Up @@ -962,35 +998,57 @@ private static Exception GetLastIOError()
return Interop.GetIOException(Interop.Sys.GetLastErrorInfo());
}

private sealed class SerialStreamIORequest : TaskCompletionSource<int>
private abstract class SerialStreamIORequest : TaskCompletionSource<int>
{
public Memory<byte> Buffer { get; private set; }
public bool IsCompleted => Task.IsCompleted;
private CancellationToken _cancellationToken;
private readonly CancellationTokenRegistration _cancellationTokenRegistration;

public SerialStreamIORequest(CancellationToken ct, Memory<byte> buffer)
protected SerialStreamIORequest(CancellationToken ct)
: base(TaskCreationOptions.RunContinuationsAsynchronously)
{
_cancellationToken = ct;
ct.Register(s => ((TaskCompletionSource<int>)s).TrySetCanceled(), this);

Buffer = buffer;
}

internal void Complete()
{
Debug.Assert(Buffer.Length == 0);
TrySetResult(Buffer.Length);
_cancellationTokenRegistration = ct.Register(s => ((TaskCompletionSource<int>)s).TrySetCanceled(), this);
}

internal void Complete(int numBytes)
{
TrySetResult(numBytes);
_cancellationTokenRegistration.Dispose();
}

internal void Complete(Exception exception)
{
TrySetException(exception);
_cancellationTokenRegistration.Dispose();
}
}

private sealed class SerialStreamReadRequest : SerialStreamIORequest
{
public Memory<byte> Buffer { get; private set; }
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved

public SerialStreamReadRequest(CancellationToken ct, Memory<byte> buffer)
: base(ct)
{
Buffer = buffer;
}
}

private sealed class SerialStreamWriteRequest : SerialStreamIORequest
{
public ReadOnlyMemory<byte> Buffer { get; private set; }

public SerialStreamWriteRequest(CancellationToken ct, ReadOnlyMemory<byte> buffer)
: base(ct)
{
Buffer = buffer;
}

internal void Complete()
{
Debug.Assert(Buffer.Length == 0);
Complete(Buffer.Length);
}

internal void ProcessBytes(int numBytes)
Expand Down
24 changes: 20 additions & 4 deletions src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@

namespace System.IO.Ports
{
// Issue https://github.com/dotnet/runtime/issues/54916
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the Windows part? I know it's very archaic, but it seems that the implementation uses ReadFile and WriteFile methods, which means that we could just create a FileStream (with buffering disabled by passing bufferSize: 1) from the SafeFileHandle we get in

public SafeFileHandle OpenPort(uint portNumber)
{
return Interop.Kernel32.CreateFile(
and delegate all reads and writes to FileStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look, but it feels like a separate PR to me..

// 'SerialStream' overrides array-based 'ReadAsync' but does not override memory-based 'ReadAsync'. Consider overriding memory-based 'ReadAsync' to improve performance.
// 'SerialStream' overrides array-based 'WriteAsync' but does not override memory-based 'WriteAsync'. Consider overriding memory-based 'WriteAsync' to improve performance.
#pragma warning disable CA1844
internal sealed partial class SerialStream : Stream
#pragma warning restore CA1844
Expand Down Expand Up @@ -91,7 +88,7 @@ public override void Write(byte[] array, int offset, int count)
Dispose(false);
}

private void CheckReadWriteArguments(byte[] array, int offset, int count)
private void CheckArrayArguments(byte[] array, int offset, int count)
{
if (array == null)
throw new ArgumentNullException(nameof(array));
Expand All @@ -101,10 +98,29 @@ private void CheckReadWriteArguments(byte[] array, int offset, int count)
throw new ArgumentOutOfRangeException(nameof(count), SR.ArgumentOutOfRange_NeedNonNegNumRequired);
if (array.Length - offset < count)
throw new ArgumentException(SR.Argument_InvalidOffLen);
}

private void CheckHandle()
{
if (_handle == null)
InternalResources.FileNotOpen();
}

private void CheckReadWriteArguments(byte[] array, int offset, int count)
{
CheckArrayArguments(array, offset, count);

CheckHandle();
}

private void CheckWriteArguments()
{
if (_inBreak)
throw new InvalidOperationException(SR.In_Break_State);

CheckHandle();
}

private void CheckWriteArguments(byte[] array, int offset, int count)
{
if (_inBreak)
Expand Down