From bca4acd57bd2e07c40baa5c2c7a9e010e1819e93 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 27 May 2021 17:43:48 -0400 Subject: [PATCH] Remove forced serialization of async-over-sync in Stream base methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The base implementation of Stream.BeginRead/Write queue a work items that invoke the abstract Read/Write methods. When Stream.BeginRead/Write were introduced long ago, for reasons I’m not privy to, someone decided it would be a good idea to add protection to these methods, such that if you try to call either BeginRead or BeginWrite while a previous BeginRead or BeginWrite operation was still in flight, the synchronous call to BeginXx would synchronously block. Yuck. Back in .NET Framework 4.5 when we added Stream.Read/WriteAsync, we had to add the base implementations as wrappers for the BeginRead/Write methods, since Read/WriteAsync should pick up the overrides of those methods if they existed. The idea of propagating that synchronous blocking behavior to Read/WriteAsync was unstomachable, but for compatibility we made it so that Read/WriteAsync would still serialize, just asynchronously (later we added a fast path optimization that would skip BeginRead/Write entirely if they weren’t overridden by the derived type). That serialization, however, even though it was asynchronous, was also misguided. In addition to adding overhead, both in terms of needing a semaphore and somewhere to store it and in terms of using that semaphore for every operation, it prevents the concurrent use of read and write. In general, streams aren’t meant to be used concurrently at all, but some streams are duplex and support up to a single reader and single writer at a time. This serialization ends up blocking such duplex streams from being used (if they don’t override Read/WriteAsync), but worse, it ends up hiding misuse of streams that shouldn’t be used concurrently by masking the misuse and turning it into behavior that might appear to work but is unlikely to actually be the desired behavior. This PR deletes that serialization and then cleans up all the cruft that was built up around it. This is a breaking change, as it’s possible code could have been relying on this (undocumented) protection; the fix for such an app is to stop doing that erroneous concurrent access, which could include applying its own serialization if necessary. BufferedStream was explicitly using the same serialization mechanism; I left that intact. BufferedFileStreamStrategy was also using it, as FileStream kinda sorta somewhat tries to enable concurrent (not parallel) usage when useAsync == true on Windows. --- .../tests/FileStream/WriteAsync.cs | 51 +-- .../src/System/IO/BufferedStream.cs | 12 + .../AsyncWindowsFileStreamStrategy.cs | 12 + .../Strategies/BufferedFileStreamStrategy.cs | 11 + .../Net5CompatFileStreamStrategy.cs | 8 +- .../SyncWindowsFileStreamStrategy.cs | 8 +- .../src/System/IO/Stream.cs | 344 +++++------------- 7 files changed, 146 insertions(+), 300 deletions(-) diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs index 9e4db63e6fbc3..b801b765012c4 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs @@ -156,24 +156,15 @@ public async Task WriteAsyncInternalBufferOverflow() public static IEnumerable MemberData_FileStreamAsyncWriting() { - foreach (bool useAsync in new[] { true, false }) + foreach (bool preSize in new[] { true, false }) { - if (useAsync && !OperatingSystem.IsWindows()) + foreach (bool cancelable in new[] { true, false }) { - // We don't have a special async I/O implementation in FileStream on Unix. - continue; - } - - foreach (bool preSize in new[] { true, false }) - { - foreach (bool cancelable in new[] { true, false }) - { - yield return new object[] { useAsync, preSize, false, cancelable, 0x1000, 0x100, 100 }; - yield return new object[] { useAsync, preSize, false, cancelable, 0x1, 0x1, 1000 }; - yield return new object[] { useAsync, preSize, true, cancelable, 0x2, 0x100, 100 }; - yield return new object[] { useAsync, preSize, false, cancelable, 0x4000, 0x10, 100 }; - yield return new object[] { useAsync, preSize, true, cancelable, 0x1000, 99999, 10 }; - } + yield return new object[] { preSize, false, cancelable, 0x1000, 0x100, 100 }; + yield return new object[] { preSize, false, cancelable, 0x1, 0x1, 1000 }; + yield return new object[] { preSize, true, cancelable, 0x2, 0x100, 100 }; + yield return new object[] { preSize, false, cancelable, 0x4000, 0x10, 100 }; + yield return new object[] { preSize, true, cancelable, 0x1000, 99999, 10 }; } } } @@ -183,7 +174,6 @@ public Task ManyConcurrentWriteAsyncs() { // For inner loop, just test one case return ManyConcurrentWriteAsyncs_OuterLoop( - useAsync: OperatingSystem.IsWindows(), presize: false, exposeHandle: false, cancelable: true, @@ -196,7 +186,7 @@ public Task ManyConcurrentWriteAsyncs() [MemberData(nameof(MemberData_FileStreamAsyncWriting))] [OuterLoop] // many combinations: we test just one in inner loop and the rest outer public async Task ManyConcurrentWriteAsyncs_OuterLoop( - bool useAsync, bool presize, bool exposeHandle, bool cancelable, int bufferSize, int writeSize, int numWrites) + bool presize, bool exposeHandle, bool cancelable, int bufferSize, int writeSize, int numWrites) { long totalLength = writeSize * numWrites; var expectedData = new byte[totalLength]; @@ -204,7 +194,7 @@ public async Task ManyConcurrentWriteAsyncs_OuterLoop( CancellationToken cancellationToken = cancelable ? new CancellationTokenSource().Token : CancellationToken.None; string path = GetTestFilePath(); - using (FileStream fs = new FileStream(path, FileMode.Create, FileAccess.ReadWrite, FileShare.None, bufferSize, useAsync)) + using (FileStream fs = new FileStream(path, FileMode.Create, FileAccess.ReadWrite, FileShare.None, bufferSize, useAsync: true)) { if (presize) { @@ -220,17 +210,15 @@ public async Task ManyConcurrentWriteAsyncs_OuterLoop( { writes[i] = WriteAsync(fs, expectedData, i * writeSize, writeSize, cancellationToken); Assert.Null(writes[i].Exception); - if (useAsync) + + // To ensure that the buffer of a FileStream opened for async IO is flushed + // by FlushAsync in asynchronous way, we aquire a lock for every buffered WriteAsync. + // The side effect of this is that the Position of FileStream is not updated until + // the lock is released by a previous operation. + // So now all WriteAsync calls should be awaited before starting another async file operation. + if (PlatformDetection.IsNet5CompatFileStreamEnabled) { - // To ensure that the buffer of a FileStream opened for async IO is flushed - // by FlushAsync in asynchronous way, we aquire a lock for every buffered WriteAsync. - // The side effect of this is that the Position of FileStream is not updated until - // the lock is released by a previous operation. - // So now all WriteAsync calls should be awaited before starting another async file operation. - if (PlatformDetection.IsNet5CompatFileStreamEnabled) - { - Assert.Equal((i + 1) * writeSize, fs.Position); - } + Assert.Equal((i + 1) * writeSize, fs.Position); } } @@ -239,10 +227,7 @@ public async Task ManyConcurrentWriteAsyncs_OuterLoop( byte[] actualData = File.ReadAllBytes(path); Assert.Equal(expectedData.Length, actualData.Length); - if (useAsync) - { - Assert.Equal(expectedData, actualData); - } + AssertExtensions.SequenceEqual(expectedData, actualData); } [Theory] diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/BufferedStream.cs b/src/libraries/System.Private.CoreLib/src/System/IO/BufferedStream.cs index f6da2fd101d1c..a4a8fc8eb5fce 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/BufferedStream.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/BufferedStream.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; @@ -58,6 +59,7 @@ public sealed class BufferedStream : Stream // (perf optimization for successive reads of the same size) // Removing a private default constructor is a breaking change for the DataDebugSerializer. // Because this ctor was here previously we need to keep it around. + private SemaphoreSlim? _asyncActiveSemaphore; // To serialize async operations. public BufferedStream(Stream stream) : this(stream, DefaultBufferSize) @@ -136,6 +138,16 @@ private void EnsureBufferAllocated() _buffer = new byte[_bufferSize]; } + [MemberNotNull(nameof(_asyncActiveSemaphore))] + private SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() => + // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's + // WaitHandle, we don't need to worry about Disposing it in the case of a race condition. + #pragma warning disable CS8774 // We lack a NullIffNull annotation for Volatile.Read + Volatile.Read(ref _asyncActiveSemaphore) ?? + #pragma warning restore CS8774 + Interlocked.CompareExchange(ref _asyncActiveSemaphore, new SemaphoreSlim(1, 1), null) ?? + _asyncActiveSemaphore; + public Stream UnderlyingStream { get diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs index 6719dd9389566..3bcd19019e765 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs @@ -120,6 +120,12 @@ public override int Read(byte[] buffer, int offset, int count) vt.AsTask().GetAwaiter().GetResult(); } + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => + TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); + + public override int EndRead(IAsyncResult asyncResult) => + TaskToApm.End(asyncResult); + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask(); @@ -207,6 +213,12 @@ private unsafe ValueTask ReadAsyncInternal(Memory destination, Cancel public override void Write(byte[] buffer, int offset, int count) => WriteAsyncInternal(new ReadOnlyMemory(buffer, offset, count), CancellationToken.None).AsTask().GetAwaiter().GetResult(); + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => + TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); + + public override void EndWrite(IAsyncResult asyncResult) => + TaskToApm.End(asyncResult); + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => WriteAsyncInternal(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs index 2228993f48e5a..3201a5686bf83 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs @@ -15,6 +15,7 @@ internal sealed class BufferedFileStreamStrategy : FileStreamStrategy { private readonly FileStreamStrategy _strategy; private readonly int _bufferSize; + private SemaphoreSlim? _asyncActiveSemaphore; private byte[]? _buffer; private int _writePos; @@ -46,6 +47,16 @@ internal BufferedFileStreamStrategy(FileStreamStrategy strategy, int bufferSize) } } + [MemberNotNull(nameof(_asyncActiveSemaphore))] + private SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() => + // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's + // WaitHandle, we don't need to worry about Disposing it in the case of a race condition. + #pragma warning disable CS8774 // We lack a NullIffNull annotation for Volatile.Read + Volatile.Read(ref _asyncActiveSemaphore) ?? + #pragma warning restore CS8774 + Interlocked.CompareExchange(ref _asyncActiveSemaphore, new SemaphoreSlim(1, 1), null) ?? + _asyncActiveSemaphore; + public override bool CanRead => _strategy.CanRead; public override bool CanWrite => _strategy.CanWrite; diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/Net5CompatFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/Net5CompatFileStreamStrategy.cs index d257df97722da..b8870959be46a 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/Net5CompatFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/Net5CompatFileStreamStrategy.cs @@ -163,7 +163,7 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel // Read is invoked asynchronously. But we can do so using the base Stream's internal helper // that bypasses delegating to BeginRead, since we already know this is FileStream rather // than something derived from it and what our BeginRead implementation is going to do. - return (Task)base.BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false); + return BeginReadInternal(buffer, offset, count, null, null); } return ReadAsyncTask(buffer, offset, count, cancellationToken); @@ -178,7 +178,7 @@ public override ValueTask ReadAsync(Memory buffer, CancellationToken // internal helper that bypasses delegating to BeginRead, since we already know this is FileStream // rather than something derived from it and what our BeginRead implementation is going to do. return MemoryMarshal.TryGetArray(buffer, out ArraySegment segment) ? - new ValueTask((Task)base.BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) : + new ValueTask(BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null)) : base.ReadAsync(buffer, cancellationToken); } @@ -245,7 +245,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati // Write is invoked asynchronously. But we can do so using the base Stream's internal helper // that bypasses delegating to BeginWrite, since we already know this is FileStream rather // than something derived from it and what our BeginWrite implementation is going to do. - return (Task)base.BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false); + return BeginWriteInternal(buffer, offset, count, null, null); } return WriteAsyncInternal(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); @@ -260,7 +260,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationTo // internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream // rather than something derived from it and what our BeginWrite implementation is going to do. return MemoryMarshal.TryGetArray(buffer, out ArraySegment segment) ? - new ValueTask((Task)base.BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) : + new ValueTask(BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null)) : base.WriteAsync(buffer, cancellationToken); } diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/SyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/SyncWindowsFileStreamStrategy.cs index 30499b660ddf0..996d53a559967 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/SyncWindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/SyncWindowsFileStreamStrategy.cs @@ -46,7 +46,7 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel // Read is invoked asynchronously. But we can do so using the base Stream's internal helper // that bypasses delegating to BeginRead, since we already know this is FileStream rather // than something derived from it and what our BeginRead implementation is going to do. - return (Task)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false); + return BeginReadInternal(buffer, offset, count, null, null); } public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) @@ -56,7 +56,7 @@ public override ValueTask ReadAsync(Memory buffer, CancellationToken // internal helper that bypasses delegating to BeginRead, since we already know this is FileStream // rather than something derived from it and what our BeginRead implementation is going to do. return MemoryMarshal.TryGetArray(buffer, out ArraySegment segment) ? - new ValueTask((Task)BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) : + new ValueTask(BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null)) : base.ReadAsync(buffer, cancellationToken); } @@ -79,7 +79,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati // Write is invoked asynchronously. But we can do so using the base Stream's internal helper // that bypasses delegating to BeginWrite, since we already know this is FileStream rather // than something derived from it and what our BeginWrite implementation is going to do. - return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false); + return BeginWriteInternal(buffer, offset, count, null, null); } public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) @@ -89,7 +89,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationTo // internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream // rather than something derived from it and what our BeginWrite implementation is going to do. return MemoryMarshal.TryGetArray(buffer, out ArraySegment segment) ? - new ValueTask((Task)BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) : + new ValueTask(BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null)) : base.WriteAsync(buffer, cancellationToken); } diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs index 71ad1dbeddc48..f84918b2c6526 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs @@ -3,7 +3,6 @@ using System.Buffers; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; @@ -15,19 +14,6 @@ public abstract partial class Stream : MarshalByRefObject, IDisposable, IAsyncDi { public static readonly Stream Null = new NullStream(); - /// To serialize async operations on streams that don't implement their own. - private protected SemaphoreSlim? _asyncActiveSemaphore; - - [MemberNotNull(nameof(_asyncActiveSemaphore))] - private protected SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() => - // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's - // WaitHandle, we don't need to worry about Disposing it in the case of a race condition. -#pragma warning disable CS8774 // We lack a NullIffNull annotation for Volatile.Read - Volatile.Read(ref _asyncActiveSemaphore) ?? -#pragma warning restore CS8774 - Interlocked.CompareExchange(ref _asyncActiveSemaphore, new SemaphoreSlim(1, 1), null) ?? - _asyncActiveSemaphore; - public abstract bool CanRead { get; } public abstract bool CanWrite { get; } public abstract bool CanSeek { get; } @@ -199,11 +185,9 @@ public virtual Task FlushAsync(CancellationToken cancellationToken) => protected virtual WaitHandle CreateWaitHandle() => new ManualResetEvent(false); public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => - BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true); + BeginReadInternal(buffer, offset, count, callback, state); - internal IAsyncResult BeginReadInternal( - byte[] buffer, int offset, int count, AsyncCallback? callback, object? state, - bool serializeAsynchronously, bool apm) + internal Task BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { ValidateBufferArguments(buffer, offset, count); if (!CanRead) @@ -211,65 +195,26 @@ internal IAsyncResult BeginReadInternal( ThrowHelper.ThrowNotSupportedException_UnreadableStream(); } - // To avoid a race with a stream's position pointer & generating race conditions - // with internal buffer indexes in our own streams that - // don't natively support async IO operations when there are multiple - // async requests outstanding, we will block the application's main - // thread if it does a second IO request until the first one completes. - SemaphoreSlim semaphore = EnsureAsyncActiveSemaphoreInitialized(); - Task? semaphoreTask = null; - if (serializeAsynchronously) - { - semaphoreTask = semaphore.WaitAsync(); - } - else - { -#pragma warning disable CA1416 // Validate platform compatibility, issue: https://github.com/dotnet/runtime/issues/44543 - semaphore.Wait(); -#pragma warning restore CA1416 - } - // Create the task to asynchronously do a Read. This task serves both // as the asynchronous work item and as the IAsyncResult returned to the user. - var asyncResult = new ReadWriteTask(true /*isRead*/, apm, delegate + var task = new ReadWriteTask(isRead: true, delegate { // The ReadWriteTask stores all of the parameters to pass to Read. // As we're currently inside of it, we can get the current task // and grab the parameters from it. - var thisTask = Task.InternalCurrent as ReadWriteTask; - Debug.Assert(thisTask != null && thisTask._stream != null, - "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream should be set"); - - try - { - // Do the Read and return the number of bytes read - return thisTask._stream.Read(thisTask._buffer!, thisTask._offset, thisTask._count); - } - finally - { - // If this implementation is part of Begin/EndXx, then the EndXx method will handle - // finishing the async operation. However, if this is part of XxAsync, then there won't - // be an end method, and this task is responsible for cleaning up. - if (!thisTask._apm) - { - thisTask._stream.FinishTrackingAsyncOperation(thisTask); - } - - thisTask.ClearBeginState(); // just to help alleviate some memory pressure - } + var thisTask = (ReadWriteTask)Task.InternalCurrent!; + Debug.Assert(thisTask._stream != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream should be set"); + + Stream stream = thisTask._stream; + byte[] buffer = thisTask._buffer!; + thisTask._stream = null; // help alleviate some memory pressure + thisTask._buffer = null; + return stream.Read(buffer, thisTask._offset, thisTask._count); }, state, this, buffer, offset, count, callback); - // Schedule it - if (semaphoreTask != null) - { - RunReadWriteTaskWhenReady(semaphoreTask, asyncResult); - } - else - { - RunReadWriteTask(asyncResult); - } + QueueReadWriteTask(task); - return asyncResult; // return it + return task; } public virtual int EndRead(IAsyncResult asyncResult) @@ -290,22 +235,32 @@ public virtual int EndRead(IAsyncResult asyncResult) ThrowHelper.ThrowInvalidOperationException(ExceptionResource.InvalidOperation_WrongAsyncResultOrEndCalledMultiple); } - try + readTask._endCalled = true; + return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception + } + + public Task ReadAsync(byte[] buffer, int offset, int count) => ReadAsync(buffer, offset, count, CancellationToken.None); + + public virtual Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) { - return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception + return Task.FromCanceled(cancellationToken); } - finally + + if (!HasOverriddenBeginEndRead()) { - FinishTrackingAsyncOperation(readTask); + // If the Stream does not override Begin/EndRead, then we can take an optimized path + // that skips an extra layer of tasks / IAsyncResults. + return BeginReadInternal(buffer, offset, count, null, null); } - } - public Task ReadAsync(byte[] buffer, int offset, int count) => ReadAsync(buffer, offset, count, CancellationToken.None); - - public virtual Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => - cancellationToken.IsCancellationRequested ? - Task.FromCanceled(cancellationToken) : - BeginEndReadAsync(buffer, offset, count); + // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality. + return TaskFactory.FromAsyncTrim( + this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count }, + static (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), + static (stream, asyncResult) => stream.EndRead(asyncResult)); + } public virtual ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { @@ -332,22 +287,6 @@ static async ValueTask FinishReadAsync(Task readTask, byte[] localBuff } } - private Task BeginEndReadAsync(byte[] buffer, int offset, int count) - { - if (!HasOverriddenBeginEndRead()) - { - // If the Stream does not override Begin/EndRead, then we can take an optimized path - // that skips an extra layer of tasks / IAsyncResults. - return (Task)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false); - } - - // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality. - return TaskFactory.FromAsyncTrim( - this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count }, - (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler - (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler - } - private struct ReadWriteParameters // struct for arguments to Read and Write calls { internal byte[] Buffer; @@ -356,11 +295,9 @@ private Task BeginEndReadAsync(byte[] buffer, int offset, int count) } public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => - BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true); + BeginWriteInternal(buffer, offset, count, callback, state); - internal IAsyncResult BeginWriteInternal( - byte[] buffer, int offset, int count, AsyncCallback? callback, object? state, - bool serializeAsynchronously, bool apm) + internal Task BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { ValidateBufferArguments(buffer, offset, count); if (!CanWrite) @@ -368,92 +305,30 @@ internal IAsyncResult BeginWriteInternal( ThrowHelper.ThrowNotSupportedException_UnwritableStream(); } - // To avoid a race condition with a stream's position pointer & generating conditions - // with internal buffer indexes in our own streams that - // don't natively support async IO operations when there are multiple - // async requests outstanding, we will block the application's main - // thread if it does a second IO request until the first one completes. - SemaphoreSlim semaphore = EnsureAsyncActiveSemaphoreInitialized(); - Task? semaphoreTask = null; - if (serializeAsynchronously) - { - semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block - } - else - { -#pragma warning disable CA1416 // Validate platform compatibility, issue: https://github.com/dotnet/runtime/issues/44543 - semaphore.Wait(); // synchronously wait here -#pragma warning restore CA1416 - } - // Create the task to asynchronously do a Write. This task serves both // as the asynchronous work item and as the IAsyncResult returned to the user. - var asyncResult = new ReadWriteTask(false /*isRead*/, apm, delegate + var task = new ReadWriteTask(isRead: false, delegate { // The ReadWriteTask stores all of the parameters to pass to Write. // As we're currently inside of it, we can get the current task // and grab the parameters from it. - var thisTask = Task.InternalCurrent as ReadWriteTask; - Debug.Assert(thisTask != null && thisTask._stream != null, - "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream should be set"); - - try - { - // Do the Write - thisTask._stream.Write(thisTask._buffer!, thisTask._offset, thisTask._count); - return 0; // not used, but signature requires a value be returned - } - finally - { - // If this implementation is part of Begin/EndXx, then the EndXx method will handle - // finishing the async operation. However, if this is part of XxAsync, then there won't - // be an end method, and this task is responsible for cleaning up. - if (!thisTask._apm) - { - thisTask._stream.FinishTrackingAsyncOperation(thisTask); - } - - thisTask.ClearBeginState(); // just to help alleviate some memory pressure - } + var thisTask = (ReadWriteTask)Task.InternalCurrent!; + Debug.Assert(thisTask._stream != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream should be set"); + + Stream stream = thisTask._stream; + byte[] buffer = thisTask._buffer!; + thisTask._stream = null; // help alleviate some memory pressure + thisTask._buffer = null; + stream.Write(buffer, thisTask._offset, thisTask._count); + return 0; // not used, but signature requires a value be returned }, state, this, buffer, offset, count, callback); - // Schedule it - if (semaphoreTask != null) - { - RunReadWriteTaskWhenReady(semaphoreTask, asyncResult); - } - else - { - RunReadWriteTask(asyncResult); - } - - return asyncResult; // return it - } + QueueReadWriteTask(task); - private static void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask) - { - Debug.Assert(readWriteTask != null); - Debug.Assert(asyncWaiter != null); - - // If the wait has already completed, run the task. - if (asyncWaiter.IsCompleted) - { - Debug.Assert(asyncWaiter.IsCompletedSuccessfully, "The semaphore wait should always complete successfully."); - RunReadWriteTask(readWriteTask); - } - else // Otherwise, wait for our turn, and then run the task. - { - asyncWaiter.ContinueWith(static (t, state) => - { - Debug.Assert(t.IsCompletedSuccessfully, "The semaphore wait should always complete successfully."); - var rwt = (ReadWriteTask)state!; - Debug.Assert(rwt._stream != null, "Validates that this code isn't run a second time."); - RunReadWriteTask(rwt); // RunReadWriteTask(readWriteTask); - }, readWriteTask, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); - } + return task; } - private static void RunReadWriteTask(ReadWriteTask readWriteTask) + private static void QueueReadWriteTask(ReadWriteTask readWriteTask) { Debug.Assert(readWriteTask != null); @@ -465,13 +340,6 @@ private static void RunReadWriteTask(ReadWriteTask readWriteTask) readWriteTask.ScheduleAndStart(needsProtection: false); } - private void FinishTrackingAsyncOperation(ReadWriteTask task) - { - Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here."); - task._endCalled = true; - _asyncActiveSemaphore.Release(); - } - public virtual void EndWrite(IAsyncResult asyncResult) { if (asyncResult is null) @@ -489,15 +357,9 @@ public virtual void EndWrite(IAsyncResult asyncResult) ThrowHelper.ThrowInvalidOperationException(ExceptionResource.InvalidOperation_WrongAsyncResultOrEndCalledMultiple); } - try - { - writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions - Debug.Assert(writeTask.Status == TaskStatus.RanToCompletion); - } - finally - { - FinishTrackingAsyncOperation(writeTask); - } + writeTask._endCalled = true; + writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions + Debug.Assert(writeTask.Status == TaskStatus.RanToCompletion); } // Task used by BeginRead / BeginWrite to do Read / Write asynchronously. @@ -519,7 +381,6 @@ public virtual void EndWrite(IAsyncResult asyncResult) private sealed class ReadWriteTask : Task, ITaskCompletionAction { internal readonly bool _isRead; - internal readonly bool _apm; // true if this is from Begin/EndXx; false if it's from XxAsync internal bool _endCalled; internal Stream? _stream; internal byte[]? _buffer; @@ -528,15 +389,8 @@ private sealed class ReadWriteTask : Task, ITaskCompletionAction private AsyncCallback? _callback; private ExecutionContext? _context; - internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC - { - _stream = null; - _buffer = null; - } - public ReadWriteTask( bool isRead, - bool apm, Func function, object? state, Stream stream, byte[] buffer, int offset, int count, AsyncCallback? callback) : base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach) @@ -546,7 +400,6 @@ public ReadWriteTask( // Store the arguments _isRead = isRead; - _apm = apm; _stream = stream; _buffer = buffer; _offset = offset; @@ -565,38 +418,22 @@ public ReadWriteTask( } } - private static void InvokeAsyncCallback(object? completedTask) + void ITaskCompletionAction.Invoke(Task _) { - Debug.Assert(completedTask is ReadWriteTask); - var rwc = (ReadWriteTask)completedTask; - AsyncCallback? callback = rwc._callback; - Debug.Assert(callback != null); - rwc._callback = null; - callback(rwc); - } - - private static ContextCallback? s_invokeAsyncCallback; - - void ITaskCompletionAction.Invoke(Task completingTask) - { - // Get the ExecutionContext. If there is none, just run the callback - // directly, passing in the completed task as the IAsyncResult. - // If there is one, process it with ExecutionContext.Run. + // Get the ExecutionContext. If there is none, just run the callback directly, passing in this + // task as the IAsyncResult. If there is one, process it with ExecutionContext.Run. ExecutionContext? context = _context; - if (context is null) + if (context is not null) { AsyncCallback? callback = _callback; - Debug.Assert(callback != null); _callback = null; - callback(completingTask); + Debug.Assert(callback != null); + callback(this); } else { _context = null; - - ContextCallback? invokeAsyncCallback = s_invokeAsyncCallback ??= InvokeAsyncCallback; - - ExecutionContext.RunInternal(context, invokeAsyncCallback, this); + ExecutionContext.RunInternal(context, static state => ((ITaskCompletionAction)state!).Invoke(null!), this); } } @@ -605,12 +442,31 @@ void ITaskCompletionAction.Invoke(Task completingTask) public Task WriteAsync(byte[] buffer, int offset, int count) => WriteAsync(buffer, offset, count, CancellationToken.None); - public virtual Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => + public virtual Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { // If cancellation was requested, bail early with an already completed task. - // Otherwise, return a task that represents the Begin/End methods. - cancellationToken.IsCancellationRequested ? - Task.FromCanceled(cancellationToken) : - BeginEndWriteAsync(buffer, offset, count); + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + if (!HasOverriddenBeginEndWrite()) + { + // If the Stream does not override Begin/EndWrite, then we can take an optimized path + // that skips an extra layer of tasks / IAsyncResults. + return BeginWriteInternal(buffer, offset, count, null, null); + } + + // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality. + return TaskFactory.FromAsyncTrim( + this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count }, + static (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), + static (stream, asyncResult) => + { + stream.EndWrite(asyncResult); + return default; + }); + } public virtual ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { @@ -636,26 +492,6 @@ private static async Task FinishWriteAsync(Task writeTask, byte[] localBuffer) } } - private Task BeginEndWriteAsync(byte[] buffer, int offset, int count) - { - if (!HasOverriddenBeginEndWrite()) - { - // If the Stream does not override Begin/EndWrite, then we can take an optimized path - // that skips an extra layer of tasks / IAsyncResults. - return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false); - } - - // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality. - return TaskFactory.FromAsyncTrim( - this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count }, - (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler - (stream, asyncResult) => // cached by compiler - { - stream.EndWrite(asyncResult); - return default; - }); - } - public abstract long Seek(long offset, SeekOrigin origin); public abstract void SetLength(long value); @@ -992,14 +828,9 @@ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, Asy lock (_stream) { // If the Stream does have its own BeginRead implementation, then we must use that override. - // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic - // which ensures only one asynchronous operation does so with an asynchronous wait rather - // than a synchronous wait. A synchronous wait will result in a deadlock condition, because - // the EndXx method for the outstanding async operation won't be able to acquire the lock on - // _stream due to this call blocked while holding the lock. return overridesBeginRead ? _stream.BeginRead(buffer, offset, count, callback, state) : - _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true); + _stream.BeginReadInternal(buffer, offset, count, callback, state); } #endif } @@ -1067,14 +898,9 @@ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, As lock (_stream) { // If the Stream does have its own BeginWrite implementation, then we must use that override. - // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic - // which ensures only one asynchronous operation does so with an asynchronous wait rather - // than a synchronous wait. A synchronous wait will result in a deadlock condition, because - // the EndXx method for the outstanding async operation won't be able to acquire the lock on - // _stream due to this call blocked while holding the lock. return overridesBeginWrite ? _stream.BeginWrite(buffer, offset, count, callback, state) : - _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true); + _stream.BeginWriteInternal(buffer, offset, count, callback, state); } #endif }