Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove forced serialization of async-over-sync in Stream base methods #53389

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 18 additions & 33 deletions src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,15 @@ public async Task WriteAsyncInternalBufferOverflow()

public static IEnumerable<object[]> MemberData_FileStreamAsyncWriting()
{
foreach (bool useAsync in new[] { true, false })
foreach (bool preSize in new[] { true, false })
{
if (useAsync && !OperatingSystem.IsWindows())
foreach (bool cancelable in new[] { true, false })
{
// We don't have a special async I/O implementation in FileStream on Unix.
continue;
}

foreach (bool preSize in new[] { true, false })
{
foreach (bool cancelable in new[] { true, false })
{
yield return new object[] { useAsync, preSize, false, cancelable, 0x1000, 0x100, 100 };
yield return new object[] { useAsync, preSize, false, cancelable, 0x1, 0x1, 1000 };
yield return new object[] { useAsync, preSize, true, cancelable, 0x2, 0x100, 100 };
yield return new object[] { useAsync, preSize, false, cancelable, 0x4000, 0x10, 100 };
yield return new object[] { useAsync, preSize, true, cancelable, 0x1000, 99999, 10 };
}
yield return new object[] { preSize, false, cancelable, 0x1000, 0x100, 100 };
yield return new object[] { preSize, false, cancelable, 0x1, 0x1, 1000 };
yield return new object[] { preSize, true, cancelable, 0x2, 0x100, 100 };
yield return new object[] { preSize, false, cancelable, 0x4000, 0x10, 100 };
yield return new object[] { preSize, true, cancelable, 0x1000, 99999, 10 };
}
}
}
Expand All @@ -183,7 +174,6 @@ public Task ManyConcurrentWriteAsyncs()
{
// For inner loop, just test one case
return ManyConcurrentWriteAsyncs_OuterLoop(
useAsync: OperatingSystem.IsWindows(),
presize: false,
exposeHandle: false,
cancelable: true,
Expand All @@ -196,15 +186,15 @@ public Task ManyConcurrentWriteAsyncs()
[MemberData(nameof(MemberData_FileStreamAsyncWriting))]
[OuterLoop] // many combinations: we test just one in inner loop and the rest outer
public async Task ManyConcurrentWriteAsyncs_OuterLoop(
bool useAsync, bool presize, bool exposeHandle, bool cancelable, int bufferSize, int writeSize, int numWrites)
bool presize, bool exposeHandle, bool cancelable, int bufferSize, int writeSize, int numWrites)
{
long totalLength = writeSize * numWrites;
var expectedData = new byte[totalLength];
new Random(42).NextBytes(expectedData);
CancellationToken cancellationToken = cancelable ? new CancellationTokenSource().Token : CancellationToken.None;

string path = GetTestFilePath();
using (FileStream fs = new FileStream(path, FileMode.Create, FileAccess.ReadWrite, FileShare.None, bufferSize, useAsync))
using (FileStream fs = new FileStream(path, FileMode.Create, FileAccess.ReadWrite, FileShare.None, bufferSize, useAsync: true))
{
if (presize)
{
Expand All @@ -220,17 +210,15 @@ public async Task ManyConcurrentWriteAsyncs_OuterLoop(
{
writes[i] = WriteAsync(fs, expectedData, i * writeSize, writeSize, cancellationToken);
Assert.Null(writes[i].Exception);
if (useAsync)

// To ensure that the buffer of a FileStream opened for async IO is flushed
// by FlushAsync in asynchronous way, we aquire a lock for every buffered WriteAsync.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "aquire" => "acquire"

// The side effect of this is that the Position of FileStream is not updated until
// the lock is released by a previous operation.
// So now all WriteAsync calls should be awaited before starting another async file operation.
if (PlatformDetection.IsNet5CompatFileStreamEnabled)
{
// To ensure that the buffer of a FileStream opened for async IO is flushed
// by FlushAsync in asynchronous way, we aquire a lock for every buffered WriteAsync.
// The side effect of this is that the Position of FileStream is not updated until
// the lock is released by a previous operation.
// So now all WriteAsync calls should be awaited before starting another async file operation.
if (PlatformDetection.IsNet5CompatFileStreamEnabled)
{
Assert.Equal((i + 1) * writeSize, fs.Position);
}
Assert.Equal((i + 1) * writeSize, fs.Position);
}
}

Expand All @@ -239,10 +227,7 @@ public async Task ManyConcurrentWriteAsyncs_OuterLoop(

byte[] actualData = File.ReadAllBytes(path);
Assert.Equal(expectedData.Length, actualData.Length);
if (useAsync)
{
Assert.Equal<byte>(expectedData, actualData);
}
AssertExtensions.SequenceEqual(expectedData, actualData);
}

[Theory]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -58,6 +59,7 @@ public sealed class BufferedStream : Stream
// (perf optimization for successive reads of the same size)
// Removing a private default constructor is a breaking change for the DataDebugSerializer.
// Because this ctor was here previously we need to keep it around.
private SemaphoreSlim? _asyncActiveSemaphore; // To serialize async operations.

public BufferedStream(Stream stream)
: this(stream, DefaultBufferSize)
Expand Down Expand Up @@ -136,6 +138,16 @@ private void EnsureBufferAllocated()
_buffer = new byte[_bufferSize];
}

[MemberNotNull(nameof(_asyncActiveSemaphore))]
private SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() =>
// Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
// WaitHandle, we don't need to worry about Disposing it in the case of a race condition.
#pragma warning disable CS8774 // We lack a NullIffNull annotation for Volatile.Read
Volatile.Read(ref _asyncActiveSemaphore) ??
#pragma warning restore CS8774
Interlocked.CompareExchange(ref _asyncActiveSemaphore, new SemaphoreSlim(1, 1), null) ??
_asyncActiveSemaphore;

public Stream UnderlyingStream
{
get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public override int Read(byte[] buffer, int offset, int count)
vt.AsTask().GetAwaiter().GetResult();
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);

public override int EndRead(IAsyncResult asyncResult) =>
TaskToApm.End<int>(asyncResult);

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();

Expand Down Expand Up @@ -207,6 +213,12 @@ private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, Cancel
public override void Write(byte[] buffer, int offset, int count)
=> WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), CancellationToken.None).AsTask().GetAwaiter().GetResult();

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);

public override void EndWrite(IAsyncResult asyncResult) =>
TaskToApm.End(asyncResult);

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal sealed class BufferedFileStreamStrategy : FileStreamStrategy
{
private readonly FileStreamStrategy _strategy;
private readonly int _bufferSize;
private SemaphoreSlim? _asyncActiveSemaphore;

private byte[]? _buffer;
private int _writePos;
Expand Down Expand Up @@ -46,6 +47,16 @@ internal BufferedFileStreamStrategy(FileStreamStrategy strategy, int bufferSize)
}
}

[MemberNotNull(nameof(_asyncActiveSemaphore))]
private SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() =>
// Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
// WaitHandle, we don't need to worry about Disposing it in the case of a race condition.
#pragma warning disable CS8774 // We lack a NullIffNull annotation for Volatile.Read
Volatile.Read(ref _asyncActiveSemaphore) ??
#pragma warning restore CS8774
Interlocked.CompareExchange(ref _asyncActiveSemaphore, new SemaphoreSlim(1, 1), null) ??
_asyncActiveSemaphore;

public override bool CanRead => _strategy.CanRead;

public override bool CanWrite => _strategy.CanWrite;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
// Read is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginRead, since we already know this is FileStream rather
// than something derived from it and what our BeginRead implementation is going to do.
return (Task<int>)base.BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
return BeginReadInternal(buffer, offset, count, null, null);
}

return ReadAsyncTask(buffer, offset, count, cancellationToken);
Expand All @@ -178,7 +178,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
// internal helper that bypasses delegating to BeginRead, since we already know this is FileStream
// rather than something derived from it and what our BeginRead implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask<int>((Task<int>)base.BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
new ValueTask<int>(BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null)) :
base.ReadAsync(buffer, cancellationToken);
}

Expand Down Expand Up @@ -245,7 +245,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
// Write is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginWrite, since we already know this is FileStream rather
// than something derived from it and what our BeginWrite implementation is going to do.
return (Task)base.BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
return BeginWriteInternal(buffer, offset, count, null, null);
}

return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
Expand All @@ -260,7 +260,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
// internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream
// rather than something derived from it and what our BeginWrite implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask((Task)base.BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
new ValueTask(BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null)) :
base.WriteAsync(buffer, cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
// Read is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginRead, since we already know this is FileStream rather
// than something derived from it and what our BeginRead implementation is going to do.
return (Task<int>)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
return BeginReadInternal(buffer, offset, count, null, null);
}

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
Expand All @@ -56,7 +56,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
// internal helper that bypasses delegating to BeginRead, since we already know this is FileStream
// rather than something derived from it and what our BeginRead implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask<int>((Task<int>)BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
new ValueTask<int>(BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null)) :
base.ReadAsync(buffer, cancellationToken);
}

Expand All @@ -79,7 +79,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
// Write is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginWrite, since we already know this is FileStream rather
// than something derived from it and what our BeginWrite implementation is going to do.
return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
return BeginWriteInternal(buffer, offset, count, null, null);
}

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
Expand All @@ -89,7 +89,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
// internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream
// rather than something derived from it and what our BeginWrite implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask((Task)BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
new ValueTask(BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null)) :
base.WriteAsync(buffer, cancellationToken);
}

Expand Down
Loading