From 482f5d145a7f9eb362432d130164e0f08d86e49d Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Tue, 16 Oct 2018 13:42:26 -0400 Subject: [PATCH] Add ManualResetValueTaskSourceLogic --- .../System.Private.CoreLib.Shared.projitems | 1 + .../shared/System/IO/MemoryStream.cs | 14 +- .../System/IO/PinnedBufferMemoryStream.cs | 9 +- .../shared/System/IO/UnmanagedMemoryStream.cs | 12 +- .../System/IO/UnmanagedMemoryStreamWrapper.cs | 5 + .../System/Threading/ExecutionContext.cs | 81 ++++++ .../ManualResetValueTaskSourceLogic.cs | 258 ++++++++++++++++++ .../src/System/Threading/ThreadPool.cs | 16 ++ .../src/System/Threading/Timer.cs | 2 +- 9 files changed, 379 insertions(+), 19 deletions(-) create mode 100644 src/System.Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceLogic.cs diff --git a/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems b/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems index d03e79685e79..cdc6c7d62ece 100644 --- a/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems +++ b/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems @@ -636,6 +636,7 @@ + diff --git a/src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs b/src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs index f836c73c3202..538448f6383d 100644 --- a/src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs +++ b/src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs @@ -143,19 +143,13 @@ protected override void Dispose(bool disposing) public override ValueTask DisposeAsync() { - if (GetType() == typeof(MemoryStream)) - { - // Same as Dispose(true) - _isOpen = false; - _writable = false; - _expandable = false; - _lastReadTask = null; - return default; - } - else + if (GetType() != typeof(MemoryStream)) { return base.DisposeAsync(); } + + Dispose(disposing: true); + return default; } // returns a bool saying whether we allocated a new array. diff --git a/src/System.Private.CoreLib/shared/System/IO/PinnedBufferMemoryStream.cs b/src/System.Private.CoreLib/shared/System/IO/PinnedBufferMemoryStream.cs index 94331a2ef826..28385a6b740e 100644 --- a/src/System.Private.CoreLib/shared/System/IO/PinnedBufferMemoryStream.cs +++ b/src/System.Private.CoreLib/shared/System/IO/PinnedBufferMemoryStream.cs @@ -15,8 +15,9 @@ ===========================================================*/ using System; -using System.Runtime.InteropServices; using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading.Tasks; namespace System.IO { @@ -56,5 +57,11 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } + + public override ValueTask DisposeAsync() + { + Dispose(disposing: true); + return default; + } } } diff --git a/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStream.cs b/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStream.cs index 4fa6fae6240f..73e92ad30927 100644 --- a/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStream.cs +++ b/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStream.cs @@ -233,15 +233,13 @@ protected override void Dispose(bool disposing) public override ValueTask DisposeAsync() { - try + if (GetType() != typeof(UnmanagedMemoryStream)) { - Dispose(disposing: true); - return default; - } - catch (Exception exc) - { - return new ValueTask(Task.FromException(exc)); + return base.DisposeAsync(); } + + Dispose(disposing: true); + return default; } private void EnsureNotClosed() diff --git a/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStreamWrapper.cs b/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStreamWrapper.cs index 9a598951ee80..dbc88488b551 100644 --- a/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStreamWrapper.cs +++ b/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStreamWrapper.cs @@ -59,6 +59,11 @@ protected override void Dispose(bool disposing) } } + public override ValueTask DisposeAsync() + { + return _unmanagedStream.DisposeAsync(); + } + public override void Flush() { _unmanagedStream.Flush(); diff --git a/src/System.Private.CoreLib/shared/System/Threading/ExecutionContext.cs b/src/System.Private.CoreLib/shared/System/Threading/ExecutionContext.cs index 694514ef07e8..983281491778 100644 --- a/src/System.Private.CoreLib/shared/System/Threading/ExecutionContext.cs +++ b/src/System.Private.CoreLib/shared/System/Threading/ExecutionContext.cs @@ -21,6 +21,8 @@ namespace System.Threading { public delegate void ContextCallback(object state); + internal delegate void ContextCallback(ref TState state); + public sealed class ExecutionContext : IDisposable, ISerializable { internal static readonly ExecutionContext Default = new ExecutionContext(isDefault: true); @@ -201,6 +203,85 @@ internal static void RunInternal(ExecutionContext executionContext, ContextCallb edi?.Throw(); } + // Direct copy of the above RunInternal overload, except that it passes the state into the callback strongly-typed and by ref. + internal static void RunInternal(ExecutionContext executionContext, ContextCallback callback, ref TState state) + { + // Note: ExecutionContext.RunInternal is an extremely hot function and used by every await, ThreadPool execution, etc. + // Note: Manual enregistering may be addressed by "Exception Handling Write Through Optimization" + // https://github.com/dotnet/coreclr/blob/master/Documentation/design-docs/eh-writethru.md + + // Enregister variables with 0 post-fix so they can be used in registers without EH forcing them to stack + // Capture references to Thread Contexts + Thread currentThread0 = Thread.CurrentThread; + Thread currentThread = currentThread0; + ExecutionContext previousExecutionCtx0 = currentThread0.ExecutionContext; + + // Store current ExecutionContext and SynchronizationContext as "previousXxx". + // This allows us to restore them and undo any Context changes made in callback.Invoke + // so that they won't "leak" back into caller. + // These variables will cross EH so be forced to stack + ExecutionContext previousExecutionCtx = previousExecutionCtx0; + SynchronizationContext previousSyncCtx = currentThread0.SynchronizationContext; + + if (executionContext != null && executionContext.m_isDefault) + { + // Default is a null ExecutionContext internally + executionContext = null; + } + + if (previousExecutionCtx0 != executionContext) + { + // Restore changed ExecutionContext + currentThread0.ExecutionContext = executionContext; + if ((executionContext != null && executionContext.HasChangeNotifications) || + (previousExecutionCtx0 != null && previousExecutionCtx0.HasChangeNotifications)) + { + // There are change notifications; trigger any affected + OnValuesChanged(previousExecutionCtx0, executionContext); + } + } + + ExceptionDispatchInfo edi = null; + try + { + callback.Invoke(ref state); + } + catch (Exception ex) + { + // Note: we have a "catch" rather than a "finally" because we want + // to stop the first pass of EH here. That way we can restore the previous + // context before any of our callers' EH filters run. + edi = ExceptionDispatchInfo.Capture(ex); + } + + // Re-enregistrer variables post EH with 1 post-fix so they can be used in registers rather than from stack + SynchronizationContext previousSyncCtx1 = previousSyncCtx; + Thread currentThread1 = currentThread; + // The common case is that these have not changed, so avoid the cost of a write barrier if not needed. + if (currentThread1.SynchronizationContext != previousSyncCtx1) + { + // Restore changed SynchronizationContext back to previous + currentThread1.SynchronizationContext = previousSyncCtx1; + } + + ExecutionContext previousExecutionCtx1 = previousExecutionCtx; + ExecutionContext currentExecutionCtx1 = currentThread1.ExecutionContext; + if (currentExecutionCtx1 != previousExecutionCtx1) + { + // Restore changed ExecutionContext back to previous + currentThread1.ExecutionContext = previousExecutionCtx1; + if ((currentExecutionCtx1 != null && currentExecutionCtx1.HasChangeNotifications) || + (previousExecutionCtx1 != null && previousExecutionCtx1.HasChangeNotifications)) + { + // There are change notifications; trigger any affected + OnValuesChanged(currentExecutionCtx1, previousExecutionCtx1); + } + } + + // If exception was thrown by callback, rethrow it now original contexts are restored + edi?.Throw(); + } + internal static void OnValuesChanged(ExecutionContext previousExecutionCtx, ExecutionContext nextExecutionCtx) { Debug.Assert(previousExecutionCtx != nextExecutionCtx); diff --git a/src/System.Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceLogic.cs b/src/System.Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceLogic.cs new file mode 100644 index 000000000000..ba044a9f5637 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceLogic.cs @@ -0,0 +1,258 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.ExceptionServices; +using System.Runtime.InteropServices; + +namespace System.Threading.Tasks.Sources +{ + /// Provides the core logic for implementing a manual-reset or . + /// + [StructLayout(LayoutKind.Auto)] + public struct ManualResetValueTaskSourceLogic + { + /// + /// The callback to invoke when the operation completes if was called before the operation completed, + /// or if the operation completed before a callback was supplied, + /// or null if a callback hasn't yet been provided and the operation hasn't yet completed. + /// + private Action _continuation; + /// State to pass to . + private object _continuationState; + /// to flow to the callback, or null if no flowing is required. + private ExecutionContext _executionContext; + /// + /// A "captured" or with which to invoke the callback, + /// or null if no special context is required. + /// + private object _capturedContext; + /// Whether the current operation has completed. + private bool _completed; + /// The result with which the operation succeeded, or the default value if it hasn't yet completed or failed. + private TResult _result; + /// The exception with which the operation failed, or null if it hasn't yet completed or completed successfully. + private ExceptionDispatchInfo _error; + /// The current version of this value, used to help prevent misuse. + private short _version; + + /// Gets or sets whether to force continuations to run asynchronously. + /// Continuations may run asynchronously if this is false, but they'll never run synchronously if this is true. + public bool RunContinuationsAsynchronously { get; set; } + + /// Resets to prepare for the next operation. + public void Reset() + { + // Reset/update state for the next use/await of this instance. + _version++; + _completed = false; + _result = default; + _error = null; + _executionContext = null; + _capturedContext = null; + _continuation = null; + _continuationState = null; + } + + /// Completes with a successful result. + /// The result. + public void SetResult(TResult result) + { + _result = result; + SignalCompletion(); + } + + /// Complets with an error. + /// + public void SetException(Exception error) + { + _error = ExceptionDispatchInfo.Capture(error); + SignalCompletion(); + } + + /// Gets the operation version. + public short Version => _version; + + /// Gets the status of the operation. + /// Opaque value that was provided to the 's constructor. + public ValueTaskSourceStatus GetStatus(short token) + { + ValidateToken(token); + return + !_completed ? ValueTaskSourceStatus.Pending : + _error == null ? ValueTaskSourceStatus.Succeeded : + _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : + ValueTaskSourceStatus.Faulted; + } + + /// Gets the result of the operation. + /// Opaque value that was provided to the 's constructor. + public TResult GetResult(short token) + { + ValidateToken(token); + if (!_completed) + { + ThrowInvalidOperationException(); + } + + _error?.Throw(); + return _result; + } + + /// Schedules the continuation action for this operation. + /// The continuation to invoke when the operation has completed. + /// The state object to pass to when it's invoked. + /// Opaque value that was provided to the 's constructor. + /// The flags describing the behavior of the continuation. + public void OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) + { + if (continuation == null) + { + throw new ArgumentNullException(nameof(continuation)); + } + ValidateToken(token); + + if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) + { + _executionContext = ExecutionContext.Capture(); + } + + if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) + { + SynchronizationContext sc = SynchronizationContext.Current; + if (sc != null && sc.GetType() != typeof(SynchronizationContext)) + { + _capturedContext = sc; + } + else + { + TaskScheduler ts = TaskScheduler.Current; + if (ts != TaskScheduler.Default) + { + _capturedContext = ts; + } + } + } + + _continuationState = state; + if (Interlocked.CompareExchange(ref _continuation, continuation, null) != null) + { + switch (_capturedContext) + { + case null: + if (_executionContext != null) + { + ThreadPool.QueueUserWorkItem(continuation, state, preferLocal: true); + } + else + { + ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true); + } + break; + + case SynchronizationContext sc: + sc.Post(s => + { + var tuple = (Tuple, object>)s; + tuple.Item1(tuple.Item2); + }, Tuple.Create(continuation, state)); + break; + + case TaskScheduler ts: + Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); + break; + } + } + } + + /// Ensures that the specified token matches the current version. + /// The token supplied by . + private void ValidateToken(short token) + { + if (token != _version) + { + ThrowInvalidOperationException(); + } + } + + /// Signals that that the operation has completed. Invoked after the result or error has been set. + private void SignalCompletion() + { + if (_completed) + { + ThrowInvalidOperationException(); + } + _completed = true; + + if (Interlocked.CompareExchange(ref _continuation, ManualResetValueTaskSourceLogicShared.s_sentinel, null) != null) + { + if (_executionContext != null) + { + ExecutionContext.RunInternal( + _executionContext, + (ref ManualResetValueTaskSourceLogic s) => s.InvokeContinuation(), + ref this); + } + else + { + InvokeContinuation(); + } + } + } + + /// + /// Invokes the continuation with the appropriate captured context / scheduler. + /// This assumes that if is not null we're already + /// running within that . + /// + private void InvokeContinuation() + { + switch (_capturedContext) + { + case null: + if (RunContinuationsAsynchronously) + { + if (_executionContext != null) + { + ThreadPool.QueueUserWorkItem(_continuation, _continuationState, preferLocal: true); + } + else + { + ThreadPool.UnsafeQueueUserWorkItem(_continuation, _continuationState, preferLocal: true); + } + } + else + { + _continuation(_continuationState); + } + break; + + case SynchronizationContext sc: + sc.Post(s => + { + var state = (Tuple, object>)s; + state.Item1(state.Item2); + }, Tuple.Create(_continuation, _continuationState)); + break; + + case TaskScheduler ts: + Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); + break; + } + } + + private static void ThrowInvalidOperationException() => + throw new InvalidOperationException(); + } + + internal static class ManualResetValueTaskSourceLogicShared + { + internal static readonly Action s_sentinel = new Action(s => + { + Debug.Fail("The sentinel delegate should never be invoked."); + throw null; + }); + } +} diff --git a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs index d9b378a2ab91..b74c8e34446c 100644 --- a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs +++ b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs @@ -1333,6 +1333,22 @@ public static bool QueueUserWorkItem(Action callBack, TState sta return true; } + // TODO: https://github.com/dotnet/corefx/issues/32547. Make public. + internal static bool UnsafeQueueUserWorkItem(Action callBack, TState state, bool preferLocal) + { + if (callBack == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack); + } + + EnsureVMInitialized(); + + ThreadPoolGlobals.workQueue.Enqueue( + new QueueUserWorkItemCallback(callBack, state, null), forceGlobal: !preferLocal); + + return true; + } + public static bool UnsafeQueueUserWorkItem(WaitCallback callBack, object state) { if (callBack == null) diff --git a/src/System.Private.CoreLib/src/System/Threading/Timer.cs b/src/System.Private.CoreLib/src/System/Threading/Timer.cs index a66563403f1a..796c9ecf0ccf 100644 --- a/src/System.Private.CoreLib/src/System/Threading/Timer.cs +++ b/src/System.Private.CoreLib/src/System/Threading/Timer.cs @@ -664,7 +664,7 @@ public ValueTask CloseAsync() // there wasn't a previous CloseAsync call that did. if (notifyWhenNoCallbacksRunning == null) { - var t = new Task(null, TaskCreationOptions.RunContinuationsAsynchronously); + var t = new Task((object)null, TaskCreationOptions.RunContinuationsAsynchronously); m_notifyWhenNoCallbacksRunning = t; return new ValueTask(t); }