diff --git a/src/System.IO.FileSystem/src/Resources/Strings.resx b/src/System.IO.FileSystem/src/Resources/Strings.resx index af7851b22388..c4b83570868f 100644 --- a/src/System.IO.FileSystem/src/Resources/Strings.resx +++ b/src/System.IO.FileSystem/src/Resources/Strings.resx @@ -291,4 +291,7 @@ Unknown error '{0}'. + + Cannot access a closed Stream. + diff --git a/src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs b/src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs index 82312d4df3c2..884e39923988 100644 --- a/src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs +++ b/src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs @@ -8,6 +8,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Win32.SafeHandles; +using System.Runtime.CompilerServices; /* * Win32FileStream supports different modes of accessing the disk - async mode @@ -1684,6 +1685,299 @@ private int GetLastWin32ErrorAndDisposeHandleIfInvalid(bool throwIfInvalidHandle return errorCode; } + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + // Validate arguments as would the base implementation + if (destination == null) throw new ArgumentNullException(nameof(destination)); + if (bufferSize <= 0) throw new ArgumentOutOfRangeException(nameof(bufferSize), SR.ArgumentOutOfRange_NeedPosNum); + bool parentCanRead = _parent.CanRead; + if (!parentCanRead && !_parent.CanWrite) throw new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed); + bool destinationCanWrite = destination.CanWrite; + if (!destination.CanRead && !destinationCanWrite) throw new ObjectDisposedException(nameof(destination), SR.ObjectDisposed_StreamClosed); + if (!parentCanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream); + if (!destinationCanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream); + if (_handle.IsClosed) throw Error.GetFileNotOpen(); + + // Bail early for cancellation if cancellation has been requested + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + // Do the async copy, with differing implementations based on whether the FileStream was opened as async or sync + Debug.Assert((_readPos == 0 && _readLen == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLen), "We're either reading or writing, but not both."); + return _isAsync ? + AsyncModeCopyToAsync(destination, bufferSize, cancellationToken) : + base.CopyToAsync(destination, bufferSize, cancellationToken); + } + + private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + Debug.Assert(_isAsync, "This implementation is for async mode only"); + Debug.Assert(!_handle.IsClosed, "!_handle.IsClosed"); + Debug.Assert(_parent.CanRead, "_parent.CanRead"); + + // Make sure any pending writes have been flushed before we do a read. + if (_writePos > 0) + { + await FlushWriteAsync(cancellationToken).ConfigureAwait(false); + } + + // Typically CopyToAsync would be invoked as the only "read" on the stream, but it's possible some reading is + // done and then the CopyToAsync is issued. For that case, see if we have any data available in the buffer. + if (_buffer != null) + { + int bufferedBytes = _readLen - _readPos; + if (bufferedBytes > 0) + { + await destination.WriteAsync(_buffer, _readPos, bufferedBytes, cancellationToken).ConfigureAwait(false); + _readPos = _readLen = 0; + } + } + + // For efficiency, we avoid creating a new task and associated state for each asynchronous read. + // Instead, we create a single reusable awaitable object that will be triggered when an await completes + // and reset before going again. + var readAwaitable = new AsyncCopyToAwaitable(this); + + // Make sure we are reading from the position that we think we are. + // Only set the position in the awaitable if we can seek (e.g. not for pipes). + bool canSeek = _parent.CanSeek; + if (canSeek) + { + if (_exposedHandle) VerifyOSHandlePosition(); + readAwaitable._position = _pos; + } + + // Create the buffer to use for the copy operation, as the base CopyToAsync does. We don't try to use + // _buffer here, even if it's not null, as concurrent operations are allowed, and another operation may + // actually be using the buffer already. Plus, it'll be rare for _buffer to be non-null, as typically + // CopyToAsync is used as the only operation performed on the stream, and the buffer is lazily initialized. + // Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that + // we'd likely be unable to use it anyway. A better option than using _buffer would be a future pooling solution. + byte[] copyBuffer = new byte[bufferSize]; + + // Allocate an Overlapped we can use repeatedly for all operations + var awaitableOverlapped = new PreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback, readAwaitable, copyBuffer); + var cancellationReg = default(CancellationTokenRegistration); + try + { + // Register for cancellation. We do this once for the whole copy operation, and just try to cancel + // whatever read operation may currently be in progress, if there is one. It's possible the cancellation + // request could come in between operations, in which case we flag that with explicit calls to ThrowIfCancellationRequested + // in the read/write copy loop. + if (cancellationToken.CanBeCanceled) + { + cancellationReg = cancellationToken.Register(s => + { + var innerAwaitable = (AsyncCopyToAwaitable)s; + unsafe + { + lock (innerAwaitable.CancellationLock) // synchronize with cleanup of the overlapped + { + if (innerAwaitable._nativeOverlapped != null) + { + // Try to cancel the I/O. We ignore the return value, as cancellation is opportunistic and we + // don't want to fail the operation because we couldn't cancel it. + Interop.mincore.CancelIoEx(innerAwaitable._fileStream._handle, innerAwaitable._nativeOverlapped); + } + } + } + }, readAwaitable); + } + + // Repeatedly read from this FileStream and write the results to the destination stream. + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + readAwaitable.ResetForNextOperation(); + + try + { + bool synchronousSuccess; + int errorCode; + unsafe + { + // Allocate a native overlapped for our reusable overlapped, and set position to read based on the next + // desired address stored in the awaitable. (This position may be 0, if either we're at the beginning or + // if the stream isn't seekable.) + readAwaitable._nativeOverlapped = _handle.ThreadPoolBinding.AllocateNativeOverlapped(awaitableOverlapped); + if (canSeek) + { + readAwaitable._nativeOverlapped->OffsetLow = unchecked((int)readAwaitable._position); + readAwaitable._nativeOverlapped->OffsetHigh = (int)(readAwaitable._position >> 32); + } + + // Kick off the read. + synchronousSuccess = ReadFileNative(_handle, copyBuffer, 0, copyBuffer.Length, readAwaitable._nativeOverlapped, out errorCode) >= 0; + } + + // If the operation did not synchronously succeed, it either failed or initiated the asynchronous operation. + if (!synchronousSuccess) + { + switch (errorCode) + { + case ERROR_IO_PENDING: + // Async operation in progress. + break; + case ERROR_BROKEN_PIPE: + case ERROR_HANDLE_EOF: + // We're at or past the end of the file, and the overlapped callback + // won't be raised in these cases. Mark it as completed so that the await + // below will see it as such. + readAwaitable.MarkCompleted(); + break; + default: + // Everything else is an error (and there won't be a callback). + throw Win32Marshal.GetExceptionForWin32Error(errorCode); + } + } + + // Wait for the async operation (which may or may not have already completed), then throw if it failed. + await readAwaitable; + switch (readAwaitable._errorCode) + { + case 0: // success + Debug.Assert(readAwaitable._numBytes >= 0, $"Expected non-negative numBytes, got {readAwaitable._numBytes}"); + break; + case ERROR_BROKEN_PIPE: // logically success with 0 bytes read (write end of pipe closed) + case ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file) + Debug.Assert(readAwaitable._numBytes == 0, $"Expected 0 bytes read, got {readAwaitable._numBytes}"); + break; + case Interop.mincore.Errors.ERROR_OPERATION_ABORTED: // canceled + throw new OperationCanceledException(cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(true)); + default: // error + throw Win32Marshal.GetExceptionForWin32Error((int)readAwaitable._errorCode); + } + + // Successful operation. If we got zero bytes, we're done: exit the read/write loop. + // Otherwise, update the read position for next time accordingly. + if (readAwaitable._numBytes == 0) + { + break; + } + else if (canSeek) + { + readAwaitable._position += (int)readAwaitable._numBytes; + } + } + finally + { + // Free the resources for this read operation + unsafe + { + NativeOverlapped* overlapped; + lock (readAwaitable.CancellationLock) // just an Exchange, but we need this to be synchronized with cancellation, so using the same lock + { + overlapped = readAwaitable._nativeOverlapped; + readAwaitable._nativeOverlapped = null; + } + if (overlapped != null) + { + _handle.ThreadPoolBinding.FreeNativeOverlapped(overlapped); + } + } + } + + // Write out the read data. + await destination.WriteAsync(copyBuffer, 0, (int)readAwaitable._numBytes, cancellationToken).ConfigureAwait(false); + } + } + finally + { + // Cleanup from the whole copy operation + cancellationReg.Dispose(); + awaitableOverlapped.Dispose(); + + // Make sure the stream's current position reflects where we ended up + if (!_handle.IsClosed && _parent.CanSeek) + { + SeekCore(0, SeekOrigin.End); + } + } + } + + /// Used by CopyToAsync to enable awaiting the result of an overlapped I/O operation with minimal overhead. + private sealed unsafe class AsyncCopyToAwaitable : ICriticalNotifyCompletion + { + /// Sentinel object used to indicate that the I/O operation has completed before being awaited. + private readonly static Action s_sentinel = () => { }; + /// Cached delegate to IOCallback. + internal static readonly IOCompletionCallback s_callback = IOCallback; + + /// The FileStream that owns this instance. + internal readonly Win32FileStream _fileStream; + + /// Tracked position representing the next location from which to read. + internal long _position; + /// The current native overlapped pointer. This changes for each operation. + internal NativeOverlapped* _nativeOverlapped; + /// + /// null if the operation is still in progress, + /// s_sentinel if the I/O operation completed before the await, + /// s_callback if it completed after the await yielded. + /// + internal Action _continuation; + /// Last error code from completed operation. + internal uint _errorCode; + /// Last number of read bytes from completed operation. + internal uint _numBytes; + + /// Lock object used to protect cancellation-related access to _nativeOverlapped. + internal object CancellationLock => this; + + /// Initialize the awaitable. + internal unsafe AsyncCopyToAwaitable(Win32FileStream fileStream) + { + _fileStream = fileStream; + } + + /// Reset state to prepare for the next read operation. + internal void ResetForNextOperation() + { + Debug.Assert(_position >= 0, $"Expected non-negative position, got {_position}"); + _continuation = null; + _errorCode = 0; + _numBytes = 0; + } + + /// Overlapped callback: store the results, then invoke the continuation delegate. + internal unsafe static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) + { + var awaitable = (AsyncCopyToAwaitable)ThreadPoolBoundHandle.GetNativeOverlappedState(pOVERLAP); + + Debug.Assert(awaitable._continuation != s_sentinel, "Sentinel must not have already been set as the continuation"); + awaitable._errorCode = errorCode; + awaitable._numBytes = numBytes; + + (awaitable._continuation ?? Interlocked.CompareExchange(ref awaitable._continuation, s_sentinel, null))?.Invoke(); + } + + /// + /// Called when it's known that the I/O callback for an operation will not be invoked but we'll + /// still be awaiting the awaitable. + /// + internal void MarkCompleted() + { + Debug.Assert(_continuation == null, "Expected null continuation"); + _continuation = s_sentinel; + } + + public AsyncCopyToAwaitable GetAwaiter() => this; + public bool IsCompleted => _continuation == s_sentinel; + public void GetResult() { } + public void OnCompleted(Action continuation) => UnsafeOnCompleted(continuation); + public void UnsafeOnCompleted(Action continuation) + { + if (_continuation == s_sentinel || + Interlocked.CompareExchange(ref _continuation, continuation, null) != null) + { + Debug.Assert(_continuation == s_sentinel, $"Expected continuation set to s_sentinel, got ${_continuation}"); + Task.Run(continuation); + } + } + } + [System.Security.SecuritySafeCritical] public override Task ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken) {