This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8936b2a
commit 5b55363
Showing
4 changed files
with
371 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
273 changes: 273 additions & 0 deletions
273
....Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceLogic.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,273 @@ | ||
// Licensed to the .NET Foundation under one or more agreements. | ||
// The .NET Foundation licenses this file to you under the MIT license. | ||
// See the LICENSE file in the project root for more information. | ||
|
||
using System.Diagnostics; | ||
using System.Runtime.CompilerServices; | ||
using System.Runtime.ExceptionServices; | ||
using System.Runtime.InteropServices; | ||
|
||
namespace System.Threading.Tasks.Sources | ||
{ | ||
/// <summary>Provides the core logic for implementing a manual-reset <see cref="IValueTaskSource"/> or <see cref="IValueTaskSource{TResult}"/>.</summary> | ||
/// <typeparam name="TResult"></typeparam> | ||
[StructLayout(LayoutKind.Auto)] | ||
public struct ManualResetValueTaskSourceLogic<TResult> | ||
{ | ||
/// <summary> | ||
/// The callback to invoke when the operation completes if <see cref="OnCompleted"/> was called before the operation completed, | ||
/// or <see cref="ManualResetValueTaskSourceLogicShared.s_sentinel"/> if the operation completed before a callback was supplied, | ||
/// or null if a callback hasn't yet been provided and the operation hasn't yet completed. | ||
/// </summary> | ||
private Action<object> _continuation; | ||
/// <summary>State to pass to <see cref="_continuation"/>.</summary> | ||
private object _continuationState; | ||
/// <summary><see cref="ExecutionContext"/> to flow to the callback, or null if no flowing is required.</summary> | ||
private ExecutionContext _executionContext; | ||
/// <summary> | ||
/// A "captured" <see cref="SynchronizationContext"/> or <see cref="TaskScheduler"/> with which to invoke the callback, | ||
/// or null if no special context is required. | ||
/// </summary> | ||
private object _capturedContext; | ||
/// <summary>Whether the current operation has completed.</summary> | ||
private bool _completed; | ||
/// <summary>The result with which the operation succeeded, or the default value if it hasn't yet completed or failed.</summary> | ||
private TResult _result; | ||
/// <summary>The exception with which the operation failed, or null if it hasn't yet completed or completed successfully.</summary> | ||
private ExceptionDispatchInfo _error; | ||
/// <summary>The current version of this value, used to help prevent misuse.</summary> | ||
private short _version; | ||
|
||
/// <summary>Gets or sets whether to force continuations to run asynchronously.</summary> | ||
/// <remarks>Continuations may run asynchronously if this is false, but they'll never run synchronously if this is true.</remarks> | ||
public bool RunContinuationsAsynchronously { get; set; } | ||
|
||
/// <summary>Resets to prepare for the next operation.</summary> | ||
public void Reset() | ||
{ | ||
// Reset/update state for the next use/await of this instance. | ||
_version++; | ||
_completed = false; | ||
_result = default; | ||
_error = null; | ||
_executionContext = null; | ||
_capturedContext = null; | ||
_continuation = null; | ||
_continuationState = null; | ||
} | ||
|
||
/// <summary>Completes with a successful result.</summary> | ||
/// <param name="result">The result.</param> | ||
public void SetResult(TResult result) | ||
{ | ||
_result = result; | ||
SignalCompletion(); | ||
} | ||
|
||
/// <summary>Complets with an error.</summary> | ||
/// <param name="error"></param> | ||
public void SetException(Exception error) | ||
{ | ||
_error = ExceptionDispatchInfo.Capture(error); | ||
SignalCompletion(); | ||
} | ||
|
||
/// <summary>Gets the operation version.</summary> | ||
public short Version => _version; | ||
|
||
/// <summary>Gets the status of the operation.</summary> | ||
/// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param> | ||
public ValueTaskSourceStatus GetStatus(short token) | ||
{ | ||
ValidateToken(token); | ||
return | ||
!_completed ? ValueTaskSourceStatus.Pending : | ||
_error == null ? ValueTaskSourceStatus.Succeeded : | ||
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : | ||
ValueTaskSourceStatus.Faulted; | ||
} | ||
|
||
/// <summary>Gets the result of the operation.</summary> | ||
/// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param> | ||
public TResult GetResult(short token) | ||
{ | ||
ValidateToken(token); | ||
if (!_completed) | ||
{ | ||
ManualResetValueTaskSourceLogicShared.ThrowInvalidOperationException(); | ||
} | ||
|
||
_error?.Throw(); | ||
return _result; | ||
} | ||
|
||
/// <summary>Schedules the continuation action for this operation.</summary> | ||
/// <param name="continuation">The continuation to invoke when the operation has completed.</param> | ||
/// <param name="state">The state object to pass to <paramref name="continuation"/> when it's invoked.</param> | ||
/// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param> | ||
/// <param name="flags">The flags describing the behavior of the continuation.</param> | ||
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) | ||
{ | ||
if (continuation == null) | ||
{ | ||
throw new ArgumentNullException(nameof(continuation)); | ||
} | ||
ValidateToken(token); | ||
|
||
if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) | ||
{ | ||
_executionContext = ExecutionContext.Capture(); | ||
} | ||
|
||
if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) | ||
{ | ||
SynchronizationContext sc = SynchronizationContext.Current; | ||
if (sc != null && sc.GetType() != typeof(SynchronizationContext)) | ||
{ | ||
_capturedContext = sc; | ||
} | ||
else | ||
{ | ||
TaskScheduler ts = TaskScheduler.Current; | ||
if (ts != TaskScheduler.Default) | ||
{ | ||
_capturedContext = ts; | ||
} | ||
} | ||
} | ||
|
||
// We need to set the continuation state before we swap in the delegate, so that | ||
// if there's a race between this and SetResult/Exception and SetResult/Exception | ||
// sees the _continuation as non-null, it'll be able to invoke it with the state | ||
// stored here. However, this also means that if this is used incorrectly (e.g. | ||
// awaited twice concurrently), _continuationState might get erroneously overwritten. | ||
// To minimize the chances of that, we check preemptively whether _continuation | ||
// is already set to something other than the completion sentinel. | ||
object currentContinuation = _continuation; | ||
if (currentContinuation != null && | ||
currentContinuation != ManualResetValueTaskSourceLogicShared.s_sentinel) | ||
{ | ||
ManualResetValueTaskSourceLogicShared.ThrowInvalidOperationException(); | ||
} | ||
_continuationState = state; | ||
|
||
Action<object> oldContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null); | ||
if (oldContinuation != null) | ||
{ | ||
switch (_capturedContext) | ||
{ | ||
case null: | ||
if (_executionContext != null) | ||
{ | ||
ThreadPool.QueueUserWorkItem(continuation, state, preferLocal: true); | ||
} | ||
else | ||
{ | ||
ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true); | ||
} | ||
break; | ||
|
||
case SynchronizationContext sc: | ||
sc.Post(s => | ||
{ | ||
var tuple = (Tuple<Action<object>, object>)s; | ||
tuple.Item1(tuple.Item2); | ||
}, Tuple.Create(continuation, state)); | ||
break; | ||
|
||
case TaskScheduler ts: | ||
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
/// <summary>Ensures that the specified token matches the current version.</summary> | ||
/// <param name="token">The token supplied by <see cref="ValueTask"/>.</param> | ||
private void ValidateToken(short token) | ||
{ | ||
if (token != _version) | ||
{ | ||
ManualResetValueTaskSourceLogicShared.ThrowInvalidOperationException(); | ||
} | ||
} | ||
|
||
/// <summary>Signals that that the operation has completed. Invoked after the result or error has been set.</summary> | ||
private void SignalCompletion() | ||
{ | ||
if (_completed) | ||
{ | ||
ManualResetValueTaskSourceLogicShared.ThrowInvalidOperationException(); | ||
} | ||
_completed = true; | ||
|
||
if (Interlocked.CompareExchange(ref _continuation, ManualResetValueTaskSourceLogicShared.s_sentinel, null) != null) | ||
{ | ||
if (_executionContext != null) | ||
{ | ||
ExecutionContext.RunInternal( | ||
_executionContext, | ||
(ref ManualResetValueTaskSourceLogic<TResult> s) => s.InvokeContinuation(), | ||
ref this); | ||
} | ||
else | ||
{ | ||
InvokeContinuation(); | ||
} | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Invokes the continuation with the appropriate captured context / scheduler. | ||
/// This assumes that if <see cref="_executionContext"/> is not null we're already | ||
/// running within that <see cref="ExecutionContext"/>. | ||
/// </summary> | ||
private void InvokeContinuation() | ||
{ | ||
switch (_capturedContext) | ||
{ | ||
case null: | ||
if (RunContinuationsAsynchronously) | ||
{ | ||
if (_executionContext != null) | ||
{ | ||
ThreadPool.QueueUserWorkItem(_continuation, _continuationState, preferLocal: true); | ||
} | ||
else | ||
{ | ||
ThreadPool.UnsafeQueueUserWorkItem(_continuation, _continuationState, preferLocal: true); | ||
} | ||
} | ||
else | ||
{ | ||
_continuation(_continuationState); | ||
} | ||
break; | ||
|
||
case SynchronizationContext sc: | ||
sc.Post(s => | ||
{ | ||
var state = (Tuple<Action<object>, object>)s; | ||
state.Item1(state.Item2); | ||
}, Tuple.Create(_continuation, _continuationState)); | ||
break; | ||
|
||
case TaskScheduler ts: | ||
Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
internal static class ManualResetValueTaskSourceLogicShared | ||
{ | ||
internal static void ThrowInvalidOperationException() => | ||
throw new InvalidOperationException(); | ||
|
||
internal static readonly Action<object> s_sentinel = new Action<object>(s => | ||
{ | ||
Debug.Fail("The sentinel delegate should never be invoked."); | ||
ThrowInvalidOperationException(); | ||
}); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters