Skip to content

Commit

Permalink
Remove FileStream long pinning, and simplify synchronization (#51462)
Browse files Browse the repository at this point in the history
* Remove pinning in BufferedFileStreamStrategy

* Simplify code/synchronization in ValueTaskSource
  • Loading branch information
stephentoub authored Apr 20, 2021
1 parent 291e7c0 commit 508e560
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,49 @@

using System.Buffers;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks.Sources;
using TaskSourceCodes = System.IO.Strategies.FileStreamHelpers.TaskSourceCodes;

namespace System.IO.Strategies
{
internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy
{
/// <summary>
/// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync.
/// </summary>
/// <summary>Reusable IValueTaskSource for FileStream ValueTask-returning async operations.</summary>
private sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
{
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;

internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
private readonly AsyncWindowsFileStreamStrategy _strategy;
private MemoryHandle _handle;
internal MemoryHandle _memoryHandle;
internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private NativeOverlapped* _overlapped;
private CancellationTokenRegistration _cancellationRegistration;
private long _result; // Using long since this needs to be used in Interlocked APIs
#if DEBUG
private bool _cancellationHasBeenRegistered;
#endif
/// <summary>
/// 0 when the operation hasn't been scheduled, non-zero when either the operation has completed,
/// in which case its value is a packed combination of the error code and number of bytes, or when
/// the read/write call has finished scheduling the async operation.
/// </summary>
internal ulong _result;

internal ValueTaskSource(AsyncWindowsFileStreamStrategy strategy)
{
_strategy = strategy;
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);

_source.RunContinuationsAsynchronously = true;
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);
}

internal NativeOverlapped* Configure(ReadOnlyMemory<byte> memory)
internal void Dispose()
{
_result = TaskSourceCodes.NoResult;
ReleaseResources();
_preallocatedOverlapped.Dispose();
}

_handle = memory.Pin();
internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory)
{
_result = 0;
_memoryHandle = memory.Pin();
_overlapped = _strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);

return _overlapped;
}

Expand All @@ -69,140 +71,110 @@ private int GetResultAndRelease(short token)

internal void RegisterForCancellation(CancellationToken cancellationToken)
{
#if DEBUG
Debug.Assert(cancellationToken.CanBeCanceled);
Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
_cancellationHasBeenRegistered = true;
#endif

// Quick check to make sure the IO hasn't completed
if (_overlapped != null)
Debug.Assert(_overlapped != null);
if (cancellationToken.CanBeCanceled)
{
// Register the cancellation only if the IO hasn't completed
long packedResult = Interlocked.CompareExchange(ref _result, TaskSourceCodes.RegisteringCancellation, TaskSourceCodes.NoResult);
if (packedResult == TaskSourceCodes.NoResult)
try
{
_cancellationRegistration = cancellationToken.UnsafeRegister((s, token) => Cancel(token), this);

// Switch the result, just in case IO completed while we were setting the registration
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
}
else if (packedResult != TaskSourceCodes.CompletedCallback)
{
// Failed to set the result, IO is in the process of completing
// Attempt to take the packed result
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
_cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) =>
{
ValueTaskSource vts = (ValueTaskSource)s!;
if (!vts._strategy._fileHandle.IsInvalid)
{
try
{
Interop.Kernel32.CancelIoEx(vts._strategy._fileHandle, vts._overlapped);
// Ignore all failures: no matter whether it succeeds or fails, completion is handled via the IOCallback.
}
catch (ObjectDisposedException) { } // in case the SafeHandle is (erroneously) closed concurrently
}
}, this);
}

// If we have a callback that needs to be completed
if ((packedResult != TaskSourceCodes.NoResult) && (packedResult != TaskSourceCodes.CompletedCallback) && (packedResult != TaskSourceCodes.RegisteringCancellation))
catch (OutOfMemoryException)
{
CompleteCallback((ulong)packedResult);
// Just in case trying to register OOMs, we ignore it in order to
// protect the higher-level calling code that would proceed to unpin
// memory that might be actively used by an in-flight async operation.
}
}
}

internal void ReleaseNativeResource()
internal void ReleaseResources()
{
_handle.Dispose();
// Unpin any pinned buffer.
_memoryHandle.Dispose();

// Ensure that cancellation has been completed and cleaned up.
// Ensure that any cancellation callback has either completed or will never run,
// so that we don't try to access an overlapped for this operation after it's already
// been freed.
_cancellationRegistration.Dispose();

// Free the overlapped.
// NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
// (this is why we disposed the registration above).
if (_overlapped != null)
{
_strategy._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped);
_overlapped = null;
}
}

private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
ValueTaskSource valueTaskSource = (ValueTaskSource)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped)!;
Debug.Assert(valueTaskSource._overlapped == pOverlapped, "Overlaps don't match");

// Handle reading from & writing to closed pipes. While I'm not sure
// this is entirely necessary anymore, maybe it's possible for
// an async read on a pipe to be issued and then the pipe is closed,
// returning this error. This may very well be necessary.
ulong packedResult;
if (errorCode != 0 && errorCode != Interop.Errors.ERROR_BROKEN_PIPE && errorCode != Interop.Errors.ERROR_NO_DATA)
{
packedResult = ((ulong)TaskSourceCodes.ResultError | errorCode);
}
else
{
packedResult = ((ulong)TaskSourceCodes.ResultSuccess | numBytes);
}
// After calling Read/WriteFile to start the asynchronous operation, the caller may configure cancellation,
// and only after that should we allow for completing the operation, as completion needs to factor in work
// done by that cancellation registration, e.g. unregistering. As such, we use _result to both track who's
// responsible for calling Complete and for passing the necessary data between parties.

// Stow the result so that other threads can observe it
// And, if no other thread is registering cancellation, continue
if (Interlocked.Exchange(ref valueTaskSource._result, (long)packedResult) == TaskSourceCodes.NoResult)
/// <summary>Invoked when AsyncWindowsFileStreamStrategy has finished scheduling the async operation.</summary>
internal void FinishedScheduling()
{
// Set the value to 1. If it was already non-0, then the asynchronous operation already completed but
// didn't call Complete, so we call Complete here. The read result value is the data (packed) necessary
// to make the call.
ulong result = Interlocked.Exchange(ref _result, 1);
if (result != 0)
{
// Successfully set the state, attempt to take back the callback
if (Interlocked.Exchange(ref valueTaskSource._result, TaskSourceCodes.CompletedCallback) != TaskSourceCodes.NoResult)
{
// Successfully got the callback, finish the callback
valueTaskSource.CompleteCallback(packedResult);
}
// else: Some other thread stole the result, so now it is responsible to finish the callback
Complete(errorCode: (uint)result, numBytes: (uint)(result >> 32) & 0x7FFFFFFF);
}
// else: Some other thread is registering a cancellation, so it *must* finish the callback
}

private void CompleteCallback(ulong packedResult)
/// <summary>Invoked when the asynchronous operation has completed asynchronously.</summary>
private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
CancellationToken cancellationToken = _cancellationRegistration.Token;

ReleaseNativeResource();

// Unpack the result and send it to the user
long result = (long)(packedResult & TaskSourceCodes.ResultMask);
if (result == TaskSourceCodes.ResultError)
ValueTaskSource? vts = (ValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(vts is not null);
Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match");

// Set the value to a packed combination of the error code and number of bytes (plus a high-bit 1
// to ensure the value we're setting is non-zero). If it was already non-0 (the common case), then
// the call site already finished scheduling the async operation, in which case we're ready to complete.
Debug.Assert(numBytes < int.MaxValue);
if (Interlocked.Exchange(ref vts._result, (1ul << 63) | ((ulong)numBytes << 32) | errorCode) != 0)
{
int errorCode = unchecked((int)(packedResult & uint.MaxValue));
Exception e;
if (errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
{
CancellationToken ct = cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(canceled: true);
e = new OperationCanceledException(ct);
}
else
{
e = Win32Marshal.GetExceptionForWin32Error(errorCode);
}
e.SetCurrentStackTrace();
_source.SetException(e);
}
else
{
Debug.Assert(result == TaskSourceCodes.ResultSuccess, "Unknown result");
_source.SetResult((int)(packedResult & uint.MaxValue));
vts.Complete(errorCode, numBytes);
}
}

private void Cancel(CancellationToken token)
internal void Complete(uint errorCode, uint numBytes)
{
// WARNING: This may potentially be called under a lock (during cancellation registration)
Debug.Assert(_overlapped != null && GetStatus(Version) != ValueTaskSourceStatus.Succeeded, "IO should not have completed yet");
ReleaseResources();

// If the handle is still valid, attempt to cancel the IO
if (!_strategy._fileHandle.IsInvalid &&
!Interop.Kernel32.CancelIoEx(_strategy._fileHandle, _overlapped))
switch (errorCode)
{
int errorCode = Marshal.GetLastWin32Error();

// ERROR_NOT_FOUND is returned if CancelIoEx cannot find the request to cancel.
// This probably means that the IO operation has completed.
if (errorCode != Interop.Errors.ERROR_NOT_FOUND)
{
Exception e = new OperationCanceledException(SR.OperationCanceled, Win32Marshal.GetExceptionForWin32Error(errorCode), token);
e.SetCurrentStackTrace();
_source.SetException(e);
}
case 0:
case Interop.Errors.ERROR_BROKEN_PIPE:
case Interop.Errors.ERROR_NO_DATA:
// Success
_source.SetResult((int)numBytes);
break;

case Interop.Errors.ERROR_OPERATION_ABORTED:
// Cancellation
CancellationToken ct = _cancellationRegistration.Token;
_source.SetException(ct.IsCancellationRequested ? new OperationCanceledException(ct) : new OperationCanceledException());
break;

default:
// Failure
_source.SetException(Win32Marshal.GetExceptionForWin32Error((int)errorCode));
break;
}
}
}
Expand Down
Loading

0 comments on commit 508e560

Please sign in to comment.