From 5605d8303f3e901b297d9dface7efce51e90d27b Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 1 Dec 2022 20:00:32 +0700 Subject: [PATCH] Add ReceiveAsync feature to Akka.TestKit TestActorRef (#6281) Co-authored-by: Aaron Stannard --- .../TestActorRefTests/ExceptionHandling.cs | 61 +++++++++++++++ .../Internal/InternalTestActorRef.cs | 74 +++++++++++++++++-- src/core/Akka.TestKit/TestActorRefBase.cs | 17 +++++ src/core/Akka/Dispatch/ActorTaskScheduler.cs | 15 +++- 4 files changed, 157 insertions(+), 10 deletions(-) create mode 100644 src/core/Akka.TestKit.Tests/TestActorRefTests/ExceptionHandling.cs diff --git a/src/core/Akka.TestKit.Tests/TestActorRefTests/ExceptionHandling.cs b/src/core/Akka.TestKit.Tests/TestActorRefTests/ExceptionHandling.cs new file mode 100644 index 00000000000..fe1b576264d --- /dev/null +++ b/src/core/Akka.TestKit.Tests/TestActorRefTests/ExceptionHandling.cs @@ -0,0 +1,61 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; +using static FluentAssertions.FluentActions; + +namespace Akka.TestKit.Tests.TestActorRefTests +{ + public class ExceptionHandling: TestKit.Xunit2.TestKit + { + private class GiveError + { } + + private class GiveErrorAsync + { } + + private class ExceptionActor : ReceiveActor + { + public ExceptionActor() + { + Receive((b) => throw new Exception("WAT")); + + ReceiveAsync(async (b) => + { + await Task.Delay(TimeSpan.FromSeconds(0.1)); + throw new Exception("WATASYNC"); + }); + } + } + + public ExceptionHandling(ITestOutputHelper helper) : base("akka.loglevel = debug", helper) + { + } + + [Fact] + public void GetException() + { + var props = Props.Create(); + var subject = new TestActorRef(Sys, props, null, "testA"); + Invoking(() => subject.Receive(new GiveError())) + .Should().Throw().WithMessage("WAT"); + } + + [Fact] + public async Task GetExceptionAsync() + { + var props = Props.Create(); + var subject = new TestActorRef(Sys, props, null, "testB"); + await Awaiting(() => subject.ReceiveAsync(new GiveErrorAsync())) + .Should().ThrowAsync().WithMessage("WATASYNC"); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.TestKit/Internal/InternalTestActorRef.cs b/src/core/Akka.TestKit/Internal/InternalTestActorRef.cs index 28fce9b2a77..7db216aa67e 100644 --- a/src/core/Akka.TestKit/Internal/InternalTestActorRef.cs +++ b/src/core/Akka.TestKit/Internal/InternalTestActorRef.cs @@ -6,11 +6,12 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.Actor.Internal; using Akka.Dispatch; -using Akka.Dispatch.SysMsg; using Akka.Pattern; using Akka.Util; using Akka.Util.Internal; @@ -89,6 +90,14 @@ public void Receive(object message, IActorRef sender = null) cell.UseThreadContext(() => cell.ReceiveMessageForTest(envelope)); } + public Task ReceiveAsync(object message, IActorRef sender = null) + { + var cell = (TestActorCell)Cell; + sender = sender.IsNobody() ? cell.System.DeadLetters : sender; + var envelope = new Envelope(message, sender); + return cell.UseThreadContextAsync(() => cell.ReceiveMessageForTestAsync(envelope)); + } + /// /// TBD /// @@ -245,25 +254,73 @@ public override ActorTaskScheduler TaskScheduler if (taskScheduler != null) return taskScheduler; - taskScheduler = new TestActorTaskScheduler(this); + taskScheduler = new TestActorTaskScheduler(this, TaskFailureHook); return Interlocked.CompareExchange(ref _taskScheduler, taskScheduler, null) ?? taskScheduler; } } + + private readonly Dictionary> _testActorTasks = + new Dictionary>(); + + /// + /// This is only intended to be called from TestKit's TestActorRef + /// + /// TBD + public Task ReceiveMessageForTestAsync(Envelope envelope) + { + var tcs = new TaskCompletionSource(); + _testActorTasks[envelope.Message] = tcs; + ReceiveMessageForTest(envelope); + return tcs.Task; + } + + /// + /// TBD + /// + /// TBD + public Task UseThreadContextAsync(Func actionAsync) + { + var tmp = InternalCurrentActorCellKeeper.Current; + InternalCurrentActorCellKeeper.Current = this; + try + { + return actionAsync(); + } + finally + { + //ensure we set back the old context + InternalCurrentActorCellKeeper.Current = tmp; + } + } + + private void TaskFailureHook(object message, Exception exception) + { + if (!_testActorTasks.TryGetValue(message, out var tcs)) + return; + if (exception is { }) + tcs.TrySetException(exception); + else + tcs.TrySetResult(Done.Instance); + _testActorTasks.Remove(message); + } + /// /// TBD /// public new object Actor { get { return base.Actor; } } } - internal class TestActorTaskScheduler : ActorTaskScheduler + internal class TestActorTaskScheduler : ActorTaskScheduler, IAsyncResultInterceptor { - private readonly ActorCell _testActorCell; + private readonly TestActorCell _testActorCell; + private readonly Action _taskCallback; /// - internal TestActorTaskScheduler(ActorCell testActorCell) : base(testActorCell) + internal TestActorTaskScheduler(ActorCell testActorCell, Action taskCallback) : base(testActorCell) { - _testActorCell = testActorCell; + _taskCallback = taskCallback; + _testActorCell = (TestActorCell) testActorCell; } /// @@ -277,6 +334,11 @@ protected override void OnAfterTaskCompleted() { ActorCellKeepingSynchronizationContext.AsyncCache = null; } + + public void OnTaskCompleted(object message, Exception exception) + { + _taskCallback(message, exception); + } } /// diff --git a/src/core/Akka.TestKit/TestActorRefBase.cs b/src/core/Akka.TestKit/TestActorRefBase.cs index f6e8c83f16f..c98e58161b4 100644 --- a/src/core/Akka.TestKit/TestActorRefBase.cs +++ b/src/core/Akka.TestKit/TestActorRefBase.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using Akka.Actor; using Akka.Dispatch; using Akka.Dispatch.SysMsg; @@ -51,6 +52,22 @@ public void Receive(object message, IActorRef sender = null) _internalRef.Receive(message, sender); } + /// + /// Directly inject messages into actor ReceiveAsync behavior. Any exceptions + /// thrown will be available to you, while still being able to use + /// become/unbecome. + /// Note: This method violates the actor model and could cause unpredictable + /// behavior. For example, a Receive call to an actor could run simultaneously + /// (2 simultaneous threads running inside the actor) with the actor's handling + /// of a previous Tell call. + /// + /// The message. + /// The sender. + public Task ReceiveAsync(object message, IActorRef sender = null) + { + return _internalRef.ReceiveAsync(message, sender); + } + /// /// TBD /// diff --git a/src/core/Akka/Dispatch/ActorTaskScheduler.cs b/src/core/Akka/Dispatch/ActorTaskScheduler.cs index c78c7fccb03..609bf8994ec 100644 --- a/src/core/Akka/Dispatch/ActorTaskScheduler.cs +++ b/src/core/Akka/Dispatch/ActorTaskScheduler.cs @@ -149,7 +149,7 @@ public static void RunTask(Func asyncAction) //suspend the mailbox dispatcher.Suspend(context); - ActorTaskScheduler actorScheduler = context.TaskScheduler; + var actorScheduler = context.TaskScheduler; actorScheduler.CurrentMessage = context.CurrentMessage; actorScheduler.OnBeforeTaskStarted(); @@ -158,18 +158,21 @@ public static void RunTask(Func asyncAction) .Unwrap() .ContinueWith(parent => { - Exception exception = GetTaskException(parent); - + var exception = GetTaskException(parent); if (exception == null) { dispatcher.Resume(context); - context.CheckReceiveTimeout(); } else { context.Self.AsInstanceOf().SendSystemMessage(new ActorTaskSchedulerMessage(exception, actorScheduler.CurrentMessage)); } + + // Used by TestActorRef to intercept async execution result + if(actorScheduler is IAsyncResultInterceptor interceptor) + interceptor.OnTaskCompleted(actorScheduler.CurrentMessage, exception); + //clear the current message field of the scheduler actorScheduler.CurrentMessage = null; actorScheduler.OnAfterTaskCompleted(); @@ -203,3 +206,7 @@ private static Exception TryUnwrapAggregateException(AggregateException aggregat } } +internal interface IAsyncResultInterceptor +{ + void OnTaskCompleted(object message, Exception exception); +} \ No newline at end of file