From 851f09c96aaef293b802eb98fd06948c7719f6a3 Mon Sep 17 00:00:00 2001 From: Bar Arnon Date: Thu, 5 Aug 2021 00:01:11 +0300 Subject: [PATCH 1/8] Override memory-based ReadAsync & WriteAsync for SerialStream Fix #54575 --- .../src/System/IO/Ports/SerialStream.Unix.cs | 92 ++++++++++++++----- .../src/System/IO/Ports/SerialStream.cs | 24 ++++- 2 files changed, 88 insertions(+), 28 deletions(-) diff --git a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs index 7ee385c717373..0e7fc9913dab1 100644 --- a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs +++ b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs @@ -431,12 +431,22 @@ public override Task ReadAsync(byte[] array, int offset, int count, Cancell return Task.FromResult(0); // return immediately if no bytes requested; no need for overhead. Memory buffer = new Memory(array, offset, count); - SerialStreamIORequest result = new SerialStreamIORequest(cancellationToken, buffer); + return ReadAsync(buffer, cancellationToken).AsTask(); + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + CheckReadWriteArguments(); + + if (buffer.IsEmpty) + return ValueTask.FromResult(0); + + SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer); _readQueue.Enqueue(result); EnsureIOLoopRunning(); - return result.Task; + return new ValueTask(result.Task); } public override Task WriteAsync(byte[] array, int offset, int count, CancellationToken cancellationToken) @@ -446,13 +456,23 @@ public override Task WriteAsync(byte[] array, int offset, int count, Cancellatio if (count == 0) return Task.CompletedTask; // return immediately if no bytes to write; no need for overhead. - Memory buffer = new Memory(array, offset, count); - SerialStreamIORequest result = new SerialStreamIORequest(cancellationToken, buffer); + ReadOnlyMemory buffer = new ReadOnlyMemory(array, offset, count); + return WriteAsync(buffer, cancellationToken).AsTask(); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + CheckWriteArguments(); + + if (buffer.IsEmpty) + return ValueTask.CompletedTask; // return immediately if no bytes to write; no need for overhead. + + SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer); _writeQueue.Enqueue(result); EnsureIOLoopRunning(); - return result.Task; + return new ValueTask(result.Task); } public override IAsyncResult BeginRead(byte[] array, int offset, int numBytes, AsyncCallback userCallback, object stateObject) @@ -715,7 +735,8 @@ private void RaiseDataReceivedEof() private unsafe int ProcessRead(SerialStreamIORequest r) { - Span buff = r.Buffer.Span; + SerialStreamReadRequest readRequest = (SerialStreamReadRequest)r; + Span buff = readRequest.Buffer.Span; fixed (byte* bufPtr = buff) { // assumes dequeue-ing happens on a single thread @@ -728,12 +749,12 @@ private unsafe int ProcessRead(SerialStreamIORequest r) // ignore EWOULDBLOCK since we handle timeout elsewhere if (lastError.Error != Interop.Error.EWOULDBLOCK) { - r.Complete(Interop.GetIOException(lastError)); + readRequest.Complete(Interop.GetIOException(lastError)); } } else if (numBytes > 0) { - r.Complete(numBytes); + readRequest.Complete(numBytes); return numBytes; } else // numBytes == 0 @@ -747,7 +768,8 @@ private unsafe int ProcessRead(SerialStreamIORequest r) private unsafe int ProcessWrite(SerialStreamIORequest r) { - ReadOnlySpan buff = r.Buffer.Span; + SerialStreamWriteRequest writeRequest = (SerialStreamWriteRequest)r; + ReadOnlySpan buff = writeRequest.Buffer.Span; fixed (byte* bufPtr = buff) { // assumes dequeue-ing happens on a single thread @@ -766,11 +788,11 @@ private unsafe int ProcessWrite(SerialStreamIORequest r) } else { - r.ProcessBytes(numBytes); + writeRequest.ProcessBytes(numBytes); - if (r.Buffer.Length == 0) + if (writeRequest.Buffer.Length == 0) { - r.Complete(); + writeRequest.Complete(); } return numBytes; @@ -962,35 +984,57 @@ private static Exception GetLastIOError() return Interop.GetIOException(Interop.Sys.GetLastErrorInfo()); } - private sealed class SerialStreamIORequest : TaskCompletionSource + private abstract class SerialStreamIORequest : TaskCompletionSource { - public Memory Buffer { get; private set; } public bool IsCompleted => Task.IsCompleted; private CancellationToken _cancellationToken; + private readonly CancellationTokenRegistration _cancellationTokenRegistration; - public SerialStreamIORequest(CancellationToken ct, Memory buffer) + protected SerialStreamIORequest(CancellationToken ct) : base(TaskCreationOptions.RunContinuationsAsynchronously) { _cancellationToken = ct; - ct.Register(s => ((TaskCompletionSource)s).TrySetCanceled(), this); - - Buffer = buffer; - } - - internal void Complete() - { - Debug.Assert(Buffer.Length == 0); - TrySetResult(Buffer.Length); + _cancellationTokenRegistration = ct.Register(s => ((TaskCompletionSource)s).TrySetCanceled(), this); } internal void Complete(int numBytes) { TrySetResult(numBytes); + _cancellationTokenRegistration.Dispose(); } internal void Complete(Exception exception) { TrySetException(exception); + _cancellationTokenRegistration.Dispose(); + } + } + + private sealed class SerialStreamReadRequest : SerialStreamIORequest + { + public Memory Buffer { get; private set; } + + public SerialStreamReadRequest(CancellationToken ct, Memory buffer) + : base(ct) + { + Buffer = buffer; + } + } + + private sealed class SerialStreamWriteRequest : SerialStreamIORequest + { + public ReadOnlyMemory Buffer { get; private set; } + + public SerialStreamWriteRequest(CancellationToken ct, ReadOnlyMemory buffer) + : base(ct) + { + Buffer = buffer; + } + + internal void Complete() + { + Debug.Assert(Buffer.Length == 0); + Complete(Buffer.Length); } internal void ProcessBytes(int numBytes) diff --git a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.cs b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.cs index 6a8505c7ba1ae..cc29f01c7ff55 100644 --- a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.cs +++ b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.cs @@ -10,9 +10,6 @@ namespace System.IO.Ports { -// Issue https://github.com/dotnet/runtime/issues/54916 -// 'SerialStream' overrides array-based 'ReadAsync' but does not override memory-based 'ReadAsync'. Consider overriding memory-based 'ReadAsync' to improve performance. -// 'SerialStream' overrides array-based 'WriteAsync' but does not override memory-based 'WriteAsync'. Consider overriding memory-based 'WriteAsync' to improve performance. #pragma warning disable CA1844 internal sealed partial class SerialStream : Stream #pragma warning restore CA1844 @@ -91,7 +88,7 @@ public override void Write(byte[] array, int offset, int count) Dispose(false); } - private void CheckReadWriteArguments(byte[] array, int offset, int count) + private void CheckArrayArguments(byte[] array, int offset, int count) { if (array == null) throw new ArgumentNullException(nameof(array)); @@ -101,10 +98,29 @@ private void CheckReadWriteArguments(byte[] array, int offset, int count) throw new ArgumentOutOfRangeException(nameof(count), SR.ArgumentOutOfRange_NeedNonNegNumRequired); if (array.Length - offset < count) throw new ArgumentException(SR.Argument_InvalidOffLen); + } + + private void CheckReadWriteArguments() + { if (_handle == null) InternalResources.FileNotOpen(); } + private void CheckReadWriteArguments(byte[] array, int offset, int count) + { + CheckArrayArguments(array, offset, count); + + CheckReadWriteArguments(); + } + + private void CheckWriteArguments() + { + if (_inBreak) + throw new InvalidOperationException(SR.In_Break_State); + + CheckReadWriteArguments(); + } + private void CheckWriteArguments(byte[] array, int offset, int count) { if (_inBreak) From 479d9ac8a7e45f6ab064a4c4e374903636d858c6 Mon Sep 17 00:00:00 2001 From: Bar Arnon Date: Thu, 5 Aug 2021 10:19:41 +0300 Subject: [PATCH 2/8] Reference System.Threading.Tasks.Extensions --- src/libraries/System.IO.Ports/src/System.IO.Ports.csproj | 1 + 1 file changed, 1 insertion(+) diff --git a/src/libraries/System.IO.Ports/src/System.IO.Ports.csproj b/src/libraries/System.IO.Ports/src/System.IO.Ports.csproj index c3691c1b93381..2892796338ddc 100644 --- a/src/libraries/System.IO.Ports/src/System.IO.Ports.csproj +++ b/src/libraries/System.IO.Ports/src/System.IO.Ports.csproj @@ -151,5 +151,6 @@ + From e1df703566b9f98974fecf494172919eb64b746f Mon Sep 17 00:00:00 2001 From: Bar Arnon Date: Thu, 5 Aug 2021 10:21:11 +0300 Subject: [PATCH 3/8] CheckReadWriteArguments => CheckHandle --- .../src/System/IO/Ports/SerialStream.Unix.cs | 2 +- .../System.IO.Ports/src/System/IO/Ports/SerialStream.cs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs index 0e7fc9913dab1..4d5f271d67f31 100644 --- a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs +++ b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs @@ -436,7 +436,7 @@ public override Task ReadAsync(byte[] array, int offset, int count, Cancell public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { - CheckReadWriteArguments(); + CheckHandle(); if (buffer.IsEmpty) return ValueTask.FromResult(0); diff --git a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.cs b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.cs index cc29f01c7ff55..2de7ab2d15b35 100644 --- a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.cs +++ b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.cs @@ -100,7 +100,7 @@ private void CheckArrayArguments(byte[] array, int offset, int count) throw new ArgumentException(SR.Argument_InvalidOffLen); } - private void CheckReadWriteArguments() + private void CheckHandle() { if (_handle == null) InternalResources.FileNotOpen(); @@ -110,7 +110,7 @@ private void CheckReadWriteArguments(byte[] array, int offset, int count) { CheckArrayArguments(array, offset, count); - CheckReadWriteArguments(); + CheckHandle(); } private void CheckWriteArguments() @@ -118,7 +118,7 @@ private void CheckWriteArguments() if (_inBreak) throw new InvalidOperationException(SR.In_Break_State); - CheckReadWriteArguments(); + CheckHandle(); } private void CheckWriteArguments(byte[] array, int offset, int count) From 05118de8c0af048aecf2cca544a465e2741e7348 Mon Sep 17 00:00:00 2001 From: Bar Arnon Date: Thu, 5 Aug 2021 11:01:59 +0300 Subject: [PATCH 4/8] ifdef the accessibility of the memory-based overloads --- .../src/System/IO/Ports/SerialStream.Unix.cs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs index 4d5f271d67f31..ca4cec05401f4 100644 --- a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs +++ b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs @@ -434,7 +434,12 @@ public override Task ReadAsync(byte[] array, int offset, int count, Cancell return ReadAsync(buffer, cancellationToken).AsTask(); } - public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) +#if !NETFRAMEWORK && !NETSTANDARD2_0 + public override +#else + private +#endif + ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { CheckHandle(); @@ -460,7 +465,12 @@ public override Task WriteAsync(byte[] array, int offset, int count, Cancellatio return WriteAsync(buffer, cancellationToken).AsTask(); } - public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) +#if !NETFRAMEWORK && !NETSTANDARD2_0 + public override +#else + private +#endif + ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { CheckWriteArguments(); From 1fa6e547ef70475370165a2080a6bf7d1935c5c4 Mon Sep 17 00:00:00 2001 From: Bar Arnon Date: Thu, 5 Aug 2021 15:00:41 +0300 Subject: [PATCH 5/8] Use basic ValueTask API --- .../src/System/IO/Ports/SerialStream.Unix.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs index ca4cec05401f4..cd969bfab2478 100644 --- a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs +++ b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs @@ -437,14 +437,14 @@ public override Task ReadAsync(byte[] array, int offset, int count, Cancell #if !NETFRAMEWORK && !NETSTANDARD2_0 public override #else - private + private #endif ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { CheckHandle(); if (buffer.IsEmpty) - return ValueTask.FromResult(0); + return new ValueTask(0); SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer); _readQueue.Enqueue(result); @@ -475,8 +475,8 @@ ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellation CheckWriteArguments(); if (buffer.IsEmpty) - return ValueTask.CompletedTask; // return immediately if no bytes to write; no need for overhead. - + return new ValueTask(); // return immediately if no bytes to write; no need for overhead. + SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer); _writeQueue.Enqueue(result); From 1c62d4f864dab66d20a4c78aa5c112b6c4cc9dd6 Mon Sep 17 00:00:00 2001 From: Bar Arnon Date: Thu, 5 Aug 2021 15:35:36 +0300 Subject: [PATCH 6/8] Don't share implemntation to not take a dependency on ValueTask --- .../src/System.IO.Ports.csproj | 1 - .../src/System/IO/Ports/SerialStream.Unix.cs | 34 +++++++++++-------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/libraries/System.IO.Ports/src/System.IO.Ports.csproj b/src/libraries/System.IO.Ports/src/System.IO.Ports.csproj index 7fb4049e7496c..5dff581314589 100644 --- a/src/libraries/System.IO.Ports/src/System.IO.Ports.csproj +++ b/src/libraries/System.IO.Ports/src/System.IO.Ports.csproj @@ -164,7 +164,6 @@ System.IO.Ports.SerialPort - diff --git a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs index cd969bfab2478..9dd7b39a164bd 100644 --- a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs +++ b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs @@ -431,20 +431,21 @@ public override Task ReadAsync(byte[] array, int offset, int count, Cancell return Task.FromResult(0); // return immediately if no bytes requested; no need for overhead. Memory buffer = new Memory(array, offset, count); - return ReadAsync(buffer, cancellationToken).AsTask(); + SerialStreamIORequest result = new SerialStreamIORequest(cancellationToken, buffer); + _readQueue.Enqueue(result); + + EnsureIOLoopRunning(); + + return result.Task; } #if !NETFRAMEWORK && !NETSTANDARD2_0 - public override -#else - private -#endif - ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { CheckHandle(); if (buffer.IsEmpty) - return new ValueTask(0); + return new ValueTask.FromResult(0); SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer); _readQueue.Enqueue(result); @@ -453,6 +454,7 @@ ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToke return new ValueTask(result.Task); } +#endif public override Task WriteAsync(byte[] array, int offset, int count, CancellationToken cancellationToken) { @@ -461,21 +463,22 @@ public override Task WriteAsync(byte[] array, int offset, int count, Cancellatio if (count == 0) return Task.CompletedTask; // return immediately if no bytes to write; no need for overhead. - ReadOnlyMemory buffer = new ReadOnlyMemory(array, offset, count); - return WriteAsync(buffer, cancellationToken).AsTask(); + Memory buffer = new Memory(array, offset, count); + SerialStreamIORequest result = new SerialStreamIORequest(cancellationToken, buffer); + _writeQueue.Enqueue(result); + + EnsureIOLoopRunning(); + + return result.Task; } #if !NETFRAMEWORK && !NETSTANDARD2_0 - public override -#else - private -#endif - ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { CheckWriteArguments(); if (buffer.IsEmpty) - return new ValueTask(); // return immediately if no bytes to write; no need for overhead. + return ValueTask.CompletedTask; // return immediately if no bytes to write; no need for overhead. SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer); _writeQueue.Enqueue(result); @@ -484,6 +487,7 @@ ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellation return new ValueTask(result.Task); } +#endif public override IAsyncResult BeginRead(byte[] array, int offset, int numBytes, AsyncCallback userCallback, object stateObject) { From bf2be50c5d337619de8e4fed03d55dc7cf29cdd1 Mon Sep 17 00:00:00 2001 From: Bar Arnon Date: Thu, 5 Aug 2021 15:37:43 +0300 Subject: [PATCH 7/8] . --- .../src/System/IO/Ports/SerialStream.Unix.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs index 9dd7b39a164bd..e643c8dbbe9be 100644 --- a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs +++ b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs @@ -431,7 +431,7 @@ public override Task ReadAsync(byte[] array, int offset, int count, Cancell return Task.FromResult(0); // return immediately if no bytes requested; no need for overhead. Memory buffer = new Memory(array, offset, count); - SerialStreamIORequest result = new SerialStreamIORequest(cancellationToken, buffer); + SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer); _readQueue.Enqueue(result); EnsureIOLoopRunning(); @@ -463,8 +463,8 @@ public override Task WriteAsync(byte[] array, int offset, int count, Cancellatio if (count == 0) return Task.CompletedTask; // return immediately if no bytes to write; no need for overhead. - Memory buffer = new Memory(array, offset, count); - SerialStreamIORequest result = new SerialStreamIORequest(cancellationToken, buffer); + ReadOnlyMemory buffer = new ReadOnlyMemory(array, offset, count); + SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer); _writeQueue.Enqueue(result); EnsureIOLoopRunning(); From 082f24dfee0f7436809a40352b0ec6cce4c20408 Mon Sep 17 00:00:00 2001 From: Bar Arnon Date: Thu, 5 Aug 2021 16:34:12 +0300 Subject: [PATCH 8/8] Avoid ValueTask.FromResult --- .../System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs index e643c8dbbe9be..a30e4abe46ad6 100644 --- a/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs +++ b/src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs @@ -445,7 +445,7 @@ public override ValueTask ReadAsync(Memory buffer, CancellationToken CheckHandle(); if (buffer.IsEmpty) - return new ValueTask.FromResult(0); + return new ValueTask(0); SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer); _readQueue.Enqueue(result); @@ -479,7 +479,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationTo if (buffer.IsEmpty) return ValueTask.CompletedTask; // return immediately if no bytes to write; no need for overhead. - + SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer); _writeQueue.Enqueue(result);