From 0f2fec645c47246f6d311840a55643ca6d8cd43d Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 22 Feb 2022 18:35:05 +0100 Subject: [PATCH 01/10] Fix the remaining `FishForMessage` `Sync` over `Async` methods --- src/core/Akka.TestKit/TestKitBase_Receive.cs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 8cfbf5162ec..5f4394eb83a 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -32,6 +32,12 @@ public abstract partial class TestKitBase public object FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "") { return FishForMessage(isMessage, max, hint); + } + + /// + public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpan? max = null, string hint = "") + { + return await FishForMessageAsync(isMessage, max, hint); } /// @@ -48,6 +54,12 @@ public T FishForMessage(Predicate isMessage, TimeSpan? max = null, string return FishForMessage(isMessage: isMessage, max: max, hint: hint, allMessages: null); } + /// + public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpan? max = null, string hint = "") + { + return await FishForMessageAsync(isMessage: isMessage, max: max, hint: hint, allMessages: null); + } + /// /// Receives messages until returns true. /// Use it to ignore certain messages while waiting for a specific message. From 525bd90b1be93d5ad37a7ed4708d65faac38d2b1 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 22 Feb 2022 18:58:38 +0100 Subject: [PATCH 02/10] * Changed `ReceiveWhile` to `Sync` over `Async` * Created `ReceiveWhileAsync()` --- src/core/Akka.TestKit/TestKitBase_Receive.cs | 93 +++++++++++++------- 1 file changed, 62 insertions(+), 31 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 5f4394eb83a..88e1256585c 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -127,24 +127,21 @@ await Task.Run(() => /// Returns all the messages encountered before 'radio-silence' was reached. public async Task WaitForRadioSilenceAsync(TimeSpan? max = null, uint? maxMessages = null) { - return await Task.Run(() => - { - var messages = new ArrayList(); - - for (uint i = 0; ; i++) - { - _assertions.AssertFalse(maxMessages.HasValue && i > maxMessages.Value, $"{nameof(maxMessages)} violated (current iteration: {i})."); + var messages = new ArrayList(); - var message = ReceiveOne(max: max); + for (uint i = 0; ; i++) + { + _assertions.AssertFalse(maxMessages.HasValue && i > maxMessages.Value, $"{nameof(maxMessages)} violated (current iteration: {i})."); - if (message == null) - { - return ArrayList.ReadOnly(messages); - } + var message = await ReceiveOneAsync(max: max); - messages.Add(message); + if (message == null) + { + return ArrayList.ReadOnly(messages); } - }); + + messages.Add(message); + } } /// /// Receive one message from the internal queue of the TestActor. @@ -487,6 +484,11 @@ public IReadOnlyList ReceiveWhile(TimeSpan? max, Func filter, i return ReceiveWhile(filter, max, Timeout.InfiniteTimeSpan, msgs); } + /// + public async ValueTask> ReceiveWhileAsync(TimeSpan? max, Func filter, int msgs = int.MaxValue) where T : class + { + return await ReceiveWhileAsync(filter, max, Timeout.InfiniteTimeSpan, msgs); + } /// /// Receive a series of messages until the function returns null or the idle /// timeout is met or the overall maximum duration is elapsed or @@ -497,7 +499,7 @@ public IReadOnlyList ReceiveWhile(TimeSpan? max, Func filter, i /// The max duration is scaled by /// /// TBD - /// TBD + /// /// TBD /// TBD /// TBD @@ -507,6 +509,12 @@ public IReadOnlyList ReceiveWhile(TimeSpan? max, TimeSpan? idle, Func + public async ValueTask> ReceiveWhileAsync(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue) + { + return await ReceiveWhileAsync(filter, max, idle, msgs); + } + /// /// Receive a series of messages until the function returns null or the idle /// timeout is met (disabled by default) or the overall @@ -523,6 +531,15 @@ public IReadOnlyList ReceiveWhile(TimeSpan? max, TimeSpan? idle, FuncTBD /// TBD public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue) + { + var task = ReceiveWhileAsync(filter, max, idle, msgs).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; + } + + + /// + public async ValueTask> ReceiveWhileAsync(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue) { var maxValue = RemainingOrDilated(max); var start = Now; @@ -535,26 +552,30 @@ public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = while (count < msgs) { // Peek the message on the front of the queue - if (!TryPeekOne(out var envelope, (stop - Now).Min(idleValue))) + var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue)) + .ConfigureAwait(false); + if (!peeked.success) { _testState.LastMessage = msg; break; } - var message = envelope.Message; + var message = peeked.envelope.Message; var result = filter(message); - + // If the message is accepted by the filter, remove it from the queue if (result != null) { // This should happen immediately (zero timespan). Something is wrong if this fails. - if (!InternalTryReceiveOne(out var removed, TimeSpan.Zero, CancellationToken.None, true)) + var received = await InternalTryReceiveOneAsync(TimeSpan.Zero, CancellationToken.None, true) + .ConfigureAwait(false); + if (!received.success) throw new InvalidOperationException("[RACY] Could not dequeue an item from test queue."); - + // The removed item should be equal to the one peeked previously - if(!ReferenceEquals(envelope, removed)) + if (!ReferenceEquals(peeked.envelope, received.envelope)) throw new InvalidOperationException("[RACY] Dequeued item does not match earlier peeked item"); - - msg = envelope; + + msg = peeked.envelope; } // If the message is rejected by the filter, stop the loop else @@ -562,7 +583,7 @@ public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = _testState.LastMessage = msg; break; } - + // Store the accepted message and continue. acc.Add(result); count++; @@ -573,7 +594,6 @@ public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = return acc; } - /// /// Receive a series of messages. /// It will continue to receive messages until the predicate returns false or the idle @@ -594,6 +614,14 @@ public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = /// TBD /// TBD public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true) where T : class + { + var task = ReceiveWhileAsync(shouldContinue, max, idle, msgs, shouldIgnoreOtherMessageTypes).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; + } + + /// + public async ValueTask> ReceiveWhileAsync(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true) where T : class { var start = Now; var maxValue = RemainingOrDilated(max); @@ -606,12 +634,13 @@ public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? m MessageEnvelope msg = NullMessageEnvelope.Instance; while (count < msgs) { - if (!TryPeekOne(out var envelope, (stop - Now).Min(idleValue))) + var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue)).ConfigureAwait(false); + if (!peeked.success) { _testState.LastMessage = msg; break; } - var message = envelope.Message; + var message = peeked.envelope.Message; var typedMessage = message as T; var shouldStop = false; if (typedMessage != null) @@ -635,11 +664,13 @@ public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? m if (!shouldStop) { // This should happen immediately (zero timespan). Something is wrong if this fails. - if (!InternalTryReceiveOne(out var removed, TimeSpan.Zero, CancellationToken.None, true)) + var received = await InternalTryReceiveOneAsync(TimeSpan.Zero, CancellationToken.None, true) + .ConfigureAwait(false); + if (!received.success) throw new InvalidOperationException("[RACY] Could not dequeue an item from test queue."); - + // The removed item should be equal to the one peeked previously - if(!ReferenceEquals(envelope, removed)) + if (!ReferenceEquals(peeked.envelope, received.envelope)) throw new InvalidOperationException("[RACY] Dequeued item does not match earlier peeked item"); } // If the message is rejected by the filter, stop the loop @@ -648,7 +679,7 @@ public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? m _testState.LastMessage = msg; break; } - msg = envelope; + msg = peeked.envelope; } ConditionalLog("Received {0} messages with filter during {1}", count, Now - start); From c155a18827b94cf146791851ef693202102d2500 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 22 Feb 2022 19:45:17 +0100 Subject: [PATCH 03/10] Add missing TBD --- src/core/Akka.TestKit/TestKitBase_Receive.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 88e1256585c..612325e5737 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -499,7 +499,7 @@ public async ValueTask> ReceiveWhileAsync(TimeSpan? max, Fun /// The max duration is scaled by /// /// TBD - /// + /// TBD /// TBD /// TBD /// TBD From b862e735049431d547e02e480dcf9e5ddabe1648 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 22 Feb 2022 20:17:25 +0100 Subject: [PATCH 04/10] Create `ReceiveNAsync()` --- src/core/Akka.TestKit/Akka.TestKit.csproj | 2 ++ src/core/Akka.TestKit/TestKitBase_Receive.cs | 24 +++++++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.TestKit/Akka.TestKit.csproj b/src/core/Akka.TestKit/Akka.TestKit.csproj index a09cb7fbba6..f56c969e28c 100644 --- a/src/core/Akka.TestKit/Akka.TestKit.csproj +++ b/src/core/Akka.TestKit/Akka.TestKit.csproj @@ -28,8 +28,10 @@ + + diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 612325e5737..6d090386124 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -698,6 +698,13 @@ public IReadOnlyCollection ReceiveN(int numberOfMessages) return result; } + /// + public async ValueTask> ReceiveNAsync(int numberOfMessages) + { + var result = await InternalReceiveNAsync(numberOfMessages, RemainingOrDefault, true).ToListAsync(); + return result; + } + /// /// Receive the specified number of messages in a row before the given deadline. /// The deadline is scaled by "akka.test.timefactor" using . @@ -713,7 +720,22 @@ public IReadOnlyCollection ReceiveN(int numberOfMessages, TimeSpan max) return result; } + /// + public async ValueTask> ReceiveNAsync(int numberOfMessages, TimeSpan max) + { + max.EnsureIsPositiveFinite("max"); + var dilated = Dilated(max); + var result = await InternalReceiveNAsync(numberOfMessages, dilated, true).ToListAsync(); + return result; + } + private IEnumerable InternalReceiveN(int numberOfMessages, TimeSpan max, bool shouldLog) + { + foreach(var msg in InternalReceiveNAsync(numberOfMessages, max, shouldLog).ToEnumerable()) + yield return msg; + } + + private async IAsyncEnumerable InternalReceiveNAsync(int numberOfMessages, TimeSpan max, bool shouldLog) { var start = Now; var stop = max + start; @@ -721,7 +743,7 @@ private IEnumerable InternalReceiveN(int numberOfMessages, TimeSpan max, for (int i = 0; i < numberOfMessages; i++) { var timeout = stop - Now; - var o = ReceiveOne(timeout); + var o = await ReceiveOneAsync(timeout); var condition = o != null; if (!condition) { From 579292d16b91df036ae2080b53f131fde5cd90bd Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 22 Feb 2022 20:21:09 +0100 Subject: [PATCH 05/10] Potential fix for DocFx `StackOverflow` exception --- src/core/Akka.TestKit/TestKitBase_Receive.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 6d090386124..0e3c9f94042 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -54,7 +54,7 @@ public T FishForMessage(Predicate isMessage, TimeSpan? max = null, string return FishForMessage(isMessage: isMessage, max: max, hint: hint, allMessages: null); } - /// + /// public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpan? max = null, string hint = "") { return await FishForMessageAsync(isMessage: isMessage, max: max, hint: hint, allMessages: null); From f8ab3e31dc044fc415334d502b61dc16887d7a86 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Wed, 23 Feb 2022 19:13:18 +0100 Subject: [PATCH 06/10] * Changed `FishForMessage` to directly call its `async` version * Fix possible cause of `Stackoverflow` exception - methods inheriting docs from itself. --- src/core/Akka.TestKit/TestKitBase_Receive.cs | 27 ++++++++++---------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 0e3c9f94042..b99ad75f543 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -31,7 +31,9 @@ public abstract partial class TestKitBase /// Returns the message that matched public object FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "") { - return FishForMessage(isMessage, max, hint); + var task = FishForMessageAsync(isMessage, max, hint).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; } /// @@ -51,7 +53,9 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, /// Returns the message that matched public T FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "") { - return FishForMessage(isMessage: isMessage, max: max, hint: hint, allMessages: null); + var task = FishForMessageAsync(isMessage, max, hint).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; } /// @@ -72,7 +76,7 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpa /// Returns the message that matched public T FishForMessage(Predicate isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "") { - var task = FishForMessageAsync(isMessage, allMessages, max, hint).AsTask(); + var task = FishForMessageAsync(isMessage, allMessages, max, hint).AsTask(); task.WaitAndUnwrapException(); return task.Result; } @@ -104,15 +108,12 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayLi /// /// The type that the message is not supposed to be. /// Optional. The maximum wait duration. Defaults to when unset. - public async Task FishUntilMessageAsync(TimeSpan? max = null) + public async ValueTask FishUntilMessageAsync(TimeSpan? max = null) { - await Task.Run(() => + await ReceiveWhileAsync(max: max, shouldContinue: x => { - ReceiveWhile(max: max, shouldContinue: x => - { - _assertions.AssertFalse(x is T, "did not expect a message of type {0}", typeof(T)); - return true; // please continue receiving, don't stop - }); + _assertions.AssertFalse(x is T, "did not expect a message of type {0}", typeof(T)); + return true; // please continue receiving, don't stop }); } @@ -484,7 +485,7 @@ public IReadOnlyList ReceiveWhile(TimeSpan? max, Func filter, i return ReceiveWhile(filter, max, Timeout.InfiniteTimeSpan, msgs); } - /// + /// public async ValueTask> ReceiveWhileAsync(TimeSpan? max, Func filter, int msgs = int.MaxValue) where T : class { return await ReceiveWhileAsync(filter, max, Timeout.InfiniteTimeSpan, msgs); @@ -538,7 +539,7 @@ public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = } - /// + /// public async ValueTask> ReceiveWhileAsync(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue) { var maxValue = RemainingOrDilated(max); @@ -620,7 +621,7 @@ public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? m return task.Result; } - /// + /// public async ValueTask> ReceiveWhileAsync(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true) where T : class { var start = Now; From c3342cf2db282b9bf18cb2548164b9721808e232 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Wed, 23 Feb 2022 19:55:26 +0100 Subject: [PATCH 07/10] Fix build error --- src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs index 6963f5c7e90..6a29a14638f 100644 --- a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs +++ b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs @@ -115,7 +115,7 @@ public async Task FishUntilMessageAsync_should_fail_with_bad_input() { var probe = CreateTestProbe("probe"); probe.Ref.Tell(3, TestActor); - Func func = () => probe.FishUntilMessageAsync(max: TimeSpan.FromMilliseconds(10)); + Func func = () => probe.FishUntilMessageAsync(max: TimeSpan.FromMilliseconds(10)).AsTask(); await func.Should().ThrowAsync(); } From e76060bb4316ab30d7148545d522853f9fbd8617 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Thu, 24 Feb 2022 13:19:15 +0100 Subject: [PATCH 08/10] Added `CancellationToken` support --- src/core/Akka.TestKit/TestKitBase_Receive.cs | 152 +++++++++---------- 1 file changed, 70 insertions(+), 82 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index b99ad75f543..ee1e930aedc 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -9,6 +9,7 @@ using System.Collections; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Akka.TestKit.Internal; @@ -28,18 +29,19 @@ public abstract partial class TestKitBase /// The is message. /// The maximum. /// The hint. + /// /// Returns the message that matched - public object FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "") + public object FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { - var task = FishForMessageAsync(isMessage, max, hint).AsTask(); + var task = FishForMessageAsync(isMessage, max, hint, cancellationToken).AsTask(); task.WaitAndUnwrapException(); return task.Result; } - /// - public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpan? max = null, string hint = "") + /// + public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { - return await FishForMessageAsync(isMessage, max, hint); + return await FishForMessageAsync(isMessage, max, hint, cancellationToken); } /// @@ -50,18 +52,19 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, /// The is message. /// The maximum. /// The hint. + /// /// Returns the message that matched - public T FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "") + public T FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { - var task = FishForMessageAsync(isMessage, max, hint).AsTask(); + var task = FishForMessageAsync(isMessage, max, hint, cancellationToken).AsTask(); task.WaitAndUnwrapException(); return task.Result; } - /// - public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpan? max = null, string hint = "") + /// + public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { - return await FishForMessageAsync(isMessage: isMessage, max: max, hint: hint, allMessages: null); + return await FishForMessageAsync(isMessage: isMessage, max: max, hint: hint, allMessages: null, cancellationToken: cancellationToken); } /// @@ -72,17 +75,18 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpa /// The is message. /// The maximum. /// The hint. + /// /// If null then will be ignored. If not null then will be initially cleared, then filled with all the messages until returns true /// Returns the message that matched - public T FishForMessage(Predicate isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "") + public T FishForMessage(Predicate isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { - var task = FishForMessageAsync(isMessage, allMessages, max, hint).AsTask(); + var task = FishForMessageAsync(isMessage, allMessages, max, hint, cancellationToken).AsTask(); task.WaitAndUnwrapException(); return task.Result; } - /// - public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "") + /// + public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { var maxValue = RemainingOrDilated(max); var end = Now + maxValue; @@ -90,7 +94,7 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayLi while (true) { var left = end - Now; - var msg = await ReceiveOneAsync(left).ConfigureAwait(false); + var msg = await ReceiveOneAsync(left, cancellationToken).ConfigureAwait(false); _assertions.AssertTrue(msg != null, "Timeout ({0}) during fishForMessage{1}", maxValue, string.IsNullOrEmpty(hint) ? "" : ", hint: " + hint); if (msg is T msg1 && isMessage(msg1)) { @@ -108,13 +112,14 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayLi /// /// The type that the message is not supposed to be. /// Optional. The maximum wait duration. Defaults to when unset. - public async ValueTask FishUntilMessageAsync(TimeSpan? max = null) + /// + public async ValueTask FishUntilMessageAsync(TimeSpan? max = null, CancellationToken cancellationToken = default) { await ReceiveWhileAsync(max: max, shouldContinue: x => { _assertions.AssertFalse(x is T, "did not expect a message of type {0}", typeof(T)); return true; // please continue receiving, don't stop - }); + },cancellationToken: cancellationToken); } /// @@ -123,10 +128,11 @@ await ReceiveWhileAsync(max: max, shouldContinue: x => /// /// A temporary period of 'radio-silence'. /// The method asserts that is never reached. + /// /// If set to null then this method will loop for an infinite number of periods. /// NOTE: If set to null and radio-silence is never reached then this method will never return. /// Returns all the messages encountered before 'radio-silence' was reached. - public async Task WaitForRadioSilenceAsync(TimeSpan? max = null, uint? maxMessages = null) + public async Task WaitForRadioSilenceAsync(TimeSpan? max = null, uint? maxMessages = null, CancellationToken cancellationToken = default) { var messages = new ArrayList(); @@ -134,7 +140,7 @@ public async Task WaitForRadioSilenceAsync(TimeSpan? max = null, uint { _assertions.AssertFalse(maxMessages.HasValue && i > maxMessages.Value, $"{nameof(maxMessages)} violated (current iteration: {i})."); - var message = await ReceiveOneAsync(max: max); + var message = await ReceiveOneAsync(max: max, cancellationToken); if (message == null) { @@ -154,19 +160,20 @@ public async Task WaitForRadioSilenceAsync(TimeSpan? max = null, uint /// If null the config value "akka.test.single-expect-default" is used as timeout. /// If set to a negative value or , blocks forever. /// This method does NOT automatically scale its Duration parameter using ! + /// /// The message if one was received; null otherwise - public object ReceiveOne(TimeSpan? max = null) + public object ReceiveOne(TimeSpan? max = null,CancellationToken cancellationToken = default) { - var task = ReceiveOneAsync(max).AsTask(); + var task = ReceiveOneAsync(max, cancellationToken).AsTask(); task.WaitAndUnwrapException(); var received = task.Result; return received; } - /// - public async ValueTask ReceiveOneAsync(TimeSpan? max = null) + /// + public async ValueTask ReceiveOneAsync(TimeSpan? max = null, CancellationToken cancellationToken = default) { - var received = await TryReceiveOneAsync(max, CancellationToken.None); + var received = await TryReceiveOneAsync(max, cancellationToken); if (received.success) return received.envelope.Message; @@ -358,30 +365,6 @@ public async ValueTask PeekOneAsync(CancellationToken cancellationToken) return null; } - /// - /// Peek one message from the head of the internal queue of the TestActor within - /// the specified duration. The method blocks the specified duration. - /// Note! that the returned - /// is a containing the sender and the message. - /// This method does NOT automatically scale its Duration parameter using ! - /// - /// The received envelope. - /// Optional: The maximum duration to wait. - /// If null the config value "akka.test.single-expect-default" is used as timeout. - /// If set to a negative value or , blocks forever. - /// This method does NOT automatically scale its Duration parameter using ! - /// True if a message was received within the specified duration; false otherwise. - public bool TryPeekOne(out MessageEnvelope envelope, TimeSpan? max = null) - { - return InternalTryPeekOne(out envelope, max, CancellationToken.None, true); - } - - /// - public async ValueTask<(bool success, MessageEnvelope envelope)> TryPeekOneAsync(TimeSpan? max = null) - { - return await InternalTryPeekOneAsync(max, CancellationToken.None, true); - } - /// /// Peek one message from the head of the internal queue of the TestActor within /// the specified duration. @@ -479,16 +462,17 @@ private bool InternalTryPeekOne(out MessageEnvelope envelope, TimeSpan? max, Can /// TBD /// TBD /// TBD + /// /// TBD - public IReadOnlyList ReceiveWhile(TimeSpan? max, Func filter, int msgs = int.MaxValue) where T : class + public IReadOnlyList ReceiveWhile(TimeSpan? max, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) where T : class { - return ReceiveWhile(filter, max, Timeout.InfiniteTimeSpan, msgs); + return ReceiveWhile(filter, max, Timeout.InfiniteTimeSpan, msgs, cancellationToken); } - /// - public async ValueTask> ReceiveWhileAsync(TimeSpan? max, Func filter, int msgs = int.MaxValue) where T : class + /// + public async ValueTask> ReceiveWhileAsync(TimeSpan? max, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) where T : class { - return await ReceiveWhileAsync(filter, max, Timeout.InfiniteTimeSpan, msgs); + return await ReceiveWhileAsync(filter, max, Timeout.InfiniteTimeSpan, msgs, cancellationToken); } /// /// Receive a series of messages until the function returns null or the idle @@ -505,12 +489,12 @@ public async ValueTask> ReceiveWhileAsync(TimeSpan? max, Fun /// TBD /// TBD /// TBD - public IReadOnlyList ReceiveWhile(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue) + public IReadOnlyList ReceiveWhile(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) { - return ReceiveWhile(filter, max, idle, msgs); + return ReceiveWhile(filter, max, idle, msgs, cancellationToken); } - /// + /// public async ValueTask> ReceiveWhileAsync(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue) { return await ReceiveWhileAsync(filter, max, idle, msgs); @@ -530,17 +514,18 @@ public async ValueTask> ReceiveWhileAsync(TimeSpan? max, Tim /// TBD /// TBD /// TBD + /// /// TBD - public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue) + public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, CancellationToken cancellationToken = default) { - var task = ReceiveWhileAsync(filter, max, idle, msgs).AsTask(); + var task = ReceiveWhileAsync(filter, max, idle, msgs, cancellationToken).AsTask(); task.WaitAndUnwrapException(); return task.Result; } - /// - public async ValueTask> ReceiveWhileAsync(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue) + /// + public async ValueTask> ReceiveWhileAsync(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, CancellationToken cancellationToken = default) { var maxValue = RemainingOrDilated(max); var start = Now; @@ -553,7 +538,7 @@ public async ValueTask> ReceiveWhileAsync(Func fi while (count < msgs) { // Peek the message on the front of the queue - var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue)) + var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue), cancellationToken) .ConfigureAwait(false); if (!peeked.success) { @@ -567,7 +552,7 @@ public async ValueTask> ReceiveWhileAsync(Func fi if (result != null) { // This should happen immediately (zero timespan). Something is wrong if this fails. - var received = await InternalTryReceiveOneAsync(TimeSpan.Zero, CancellationToken.None, true) + var received = await InternalTryReceiveOneAsync(TimeSpan.Zero, cancellationToken, true) .ConfigureAwait(false); if (!received.success) throw new InvalidOperationException("[RACY] Could not dequeue an item from test queue."); @@ -613,16 +598,17 @@ public async ValueTask> ReceiveWhileAsync(Func fi /// TBD /// TBD /// TBD + /// /// TBD - public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true) where T : class + public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true, CancellationToken cancellationToken = default) where T : class { - var task = ReceiveWhileAsync(shouldContinue, max, idle, msgs, shouldIgnoreOtherMessageTypes).AsTask(); + var task = ReceiveWhileAsync(shouldContinue, max, idle, msgs, shouldIgnoreOtherMessageTypes, cancellationToken).AsTask(); task.WaitAndUnwrapException(); return task.Result; } - /// - public async ValueTask> ReceiveWhileAsync(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true) where T : class + /// + public async ValueTask> ReceiveWhileAsync(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true, CancellationToken cancellationToken = default) where T : class { var start = Now; var maxValue = RemainingOrDilated(max); @@ -635,7 +621,7 @@ public async ValueTask> ReceiveWhileAsync(Predicate shoul MessageEnvelope msg = NullMessageEnvelope.Instance; while (count < msgs) { - var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue)).ConfigureAwait(false); + var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue), cancellationToken).ConfigureAwait(false); if (!peeked.success) { _testState.LastMessage = msg; @@ -665,7 +651,7 @@ public async ValueTask> ReceiveWhileAsync(Predicate shoul if (!shouldStop) { // This should happen immediately (zero timespan). Something is wrong if this fails. - var received = await InternalTryReceiveOneAsync(TimeSpan.Zero, CancellationToken.None, true) + var received = await InternalTryReceiveOneAsync(TimeSpan.Zero, cancellationToken, true) .ConfigureAwait(false); if (!received.success) throw new InvalidOperationException("[RACY] Could not dequeue an item from test queue."); @@ -692,17 +678,18 @@ public async ValueTask> ReceiveWhileAsync(Predicate shoul /// Receive the specified number of messages using as timeout. /// /// The number of messages. + /// /// The received messages - public IReadOnlyCollection ReceiveN(int numberOfMessages) + public IReadOnlyCollection ReceiveN(int numberOfMessages, CancellationToken cancellationToken = default) { - var result = InternalReceiveN(numberOfMessages, RemainingOrDefault, true).ToList(); + var result = InternalReceiveN(numberOfMessages, RemainingOrDefault, true, cancellationToken).ToList(); return result; } - /// - public async ValueTask> ReceiveNAsync(int numberOfMessages) + /// + public async ValueTask> ReceiveNAsync(int numberOfMessages, CancellationToken cancellationToken) { - var result = await InternalReceiveNAsync(numberOfMessages, RemainingOrDefault, true).ToListAsync(); + var result = await InternalReceiveNAsync(numberOfMessages, RemainingOrDefault, true, cancellationToken).ToListAsync(); return result; } @@ -712,31 +699,32 @@ public async ValueTask> ReceiveNAsync(int numberOfMe /// /// The number of messages. /// The timeout scaled by "akka.test.timefactor" using . + /// /// The received messages - public IReadOnlyCollection ReceiveN(int numberOfMessages, TimeSpan max) + public IReadOnlyCollection ReceiveN(int numberOfMessages, TimeSpan max, CancellationToken cancellationToken = default) { max.EnsureIsPositiveFinite("max"); var dilated = Dilated(max); - var result = InternalReceiveN(numberOfMessages, dilated, true).ToList(); + var result = InternalReceiveN(numberOfMessages, dilated, true, cancellationToken).ToList(); return result; } - /// - public async ValueTask> ReceiveNAsync(int numberOfMessages, TimeSpan max) + /// + public async ValueTask> ReceiveNAsync(int numberOfMessages, TimeSpan max, CancellationToken cancellationToken = default) { max.EnsureIsPositiveFinite("max"); var dilated = Dilated(max); - var result = await InternalReceiveNAsync(numberOfMessages, dilated, true).ToListAsync(); + var result = await InternalReceiveNAsync(numberOfMessages, dilated, true, cancellationToken).ToListAsync(); return result; } - private IEnumerable InternalReceiveN(int numberOfMessages, TimeSpan max, bool shouldLog) + private IEnumerable InternalReceiveN(int numberOfMessages, TimeSpan max, bool shouldLog, CancellationToken cancellationToken = default) { - foreach(var msg in InternalReceiveNAsync(numberOfMessages, max, shouldLog).ToEnumerable()) + foreach(var msg in InternalReceiveNAsync(numberOfMessages, max, shouldLog, cancellationToken).ToEnumerable()) yield return msg; } - private async IAsyncEnumerable InternalReceiveNAsync(int numberOfMessages, TimeSpan max, bool shouldLog) + private async IAsyncEnumerable InternalReceiveNAsync(int numberOfMessages, TimeSpan max, bool shouldLog, [EnumeratorCancellation] CancellationToken cancellationToken) { var start = Now; var stop = max + start; @@ -744,7 +732,7 @@ private async IAsyncEnumerable InternalReceiveNAsync(int numberOfMessage for (int i = 0; i < numberOfMessages; i++) { var timeout = stop - Now; - var o = await ReceiveOneAsync(timeout); + var o = await ReceiveOneAsync(timeout, cancellationToken); var condition = o != null; if (!condition) { From 41df4136e080321d1a45fbd39e4768c6b62effc4 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Thu, 24 Feb 2022 18:56:27 +0100 Subject: [PATCH 09/10] Changed Receive methods to sync-over-async --- .../TestKitBaseTests/ReceiveTests.cs | 2 +- src/core/Akka.TestKit/TestKitBase_Receive.cs | 28 +++++++++++-------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs index 6a29a14638f..6963f5c7e90 100644 --- a/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs +++ b/src/core/Akka.TestKit.Tests/TestKitBaseTests/ReceiveTests.cs @@ -115,7 +115,7 @@ public async Task FishUntilMessageAsync_should_fail_with_bad_input() { var probe = CreateTestProbe("probe"); probe.Ref.Tell(3, TestActor); - Func func = () => probe.FishUntilMessageAsync(max: TimeSpan.FromMilliseconds(10)).AsTask(); + Func func = () => probe.FishUntilMessageAsync(max: TimeSpan.FromMilliseconds(10)); await func.Should().ThrowAsync(); } diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index ee1e930aedc..42dedc9ff9d 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -113,7 +113,7 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayLi /// The type that the message is not supposed to be. /// Optional. The maximum wait duration. Defaults to when unset. /// - public async ValueTask FishUntilMessageAsync(TimeSpan? max = null, CancellationToken cancellationToken = default) + public async Task FishUntilMessageAsync(TimeSpan? max = null, CancellationToken cancellationToken = default) { await ReceiveWhileAsync(max: max, shouldContinue: x => { @@ -466,7 +466,9 @@ private bool InternalTryPeekOne(out MessageEnvelope envelope, TimeSpan? max, Can /// TBD public IReadOnlyList ReceiveWhile(TimeSpan? max, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) where T : class { - return ReceiveWhile(filter, max, Timeout.InfiniteTimeSpan, msgs, cancellationToken); + var task = ReceiveWhileAsync(max, filter, msgs, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; } /// @@ -488,16 +490,19 @@ public async ValueTask> ReceiveWhileAsync(TimeSpan? max, Fun /// TBD /// TBD /// TBD + /// /// TBD public IReadOnlyList ReceiveWhile(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) { - return ReceiveWhile(filter, max, idle, msgs, cancellationToken); + var task = ReceiveWhileAsync(max, idle, filter, msgs, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; } /// - public async ValueTask> ReceiveWhileAsync(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue) + public async ValueTask> ReceiveWhileAsync(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) { - return await ReceiveWhileAsync(filter, max, idle, msgs); + return await ReceiveWhileAsync(filter, max, idle, msgs, cancellationToken); } /// @@ -537,6 +542,7 @@ public async ValueTask> ReceiveWhileAsync(Func fi MessageEnvelope msg = NullMessageEnvelope.Instance; while (count < msgs) { + cancellationToken.ThrowIfCancellationRequested(); // Peek the message on the front of the queue var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue), cancellationToken) .ConfigureAwait(false); @@ -682,8 +688,9 @@ public async ValueTask> ReceiveWhileAsync(Predicate shoul /// The received messages public IReadOnlyCollection ReceiveN(int numberOfMessages, CancellationToken cancellationToken = default) { - var result = InternalReceiveN(numberOfMessages, RemainingOrDefault, true, cancellationToken).ToList(); - return result; + var task = ReceiveNAsync(numberOfMessages, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; } /// @@ -703,10 +710,9 @@ public async ValueTask> ReceiveNAsync(int numberOfMe /// The received messages public IReadOnlyCollection ReceiveN(int numberOfMessages, TimeSpan max, CancellationToken cancellationToken = default) { - max.EnsureIsPositiveFinite("max"); - var dilated = Dilated(max); - var result = InternalReceiveN(numberOfMessages, dilated, true, cancellationToken).ToList(); - return result; + var task = ReceiveNAsync(numberOfMessages, max, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; } /// From 014a41736b70613c229d829b1b136b52622138f4 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 25 Feb 2022 01:59:33 +0700 Subject: [PATCH 10/10] Add CancellationToken support to InternalReceiveNAsync, remove non-async private InternalReceiveN --- src/core/Akka.TestKit/TestKitBase_Expect.cs | 22 +++++++++++++++----- src/core/Akka.TestKit/TestKitBase_Receive.cs | 13 +++++------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase_Expect.cs b/src/core/Akka.TestKit/TestKitBase_Expect.cs index 54581346cce..7d25b21b3fc 100644 --- a/src/core/Akka.TestKit/TestKitBase_Expect.cs +++ b/src/core/Akka.TestKit/TestKitBase_Expect.cs @@ -9,9 +9,11 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.TestKit.Internal; using Akka.Util; +using Nito.AsyncEx.Synchronous; namespace Akka.TestKit { @@ -369,19 +371,29 @@ public IReadOnlyCollection ExpectMsgAllOf(TimeSpan max, params T[] message return InternalExpectMsgAllOf(dilated, messages); } - private IReadOnlyCollection InternalExpectMsgAllOf(TimeSpan max, IReadOnlyCollection messages, Func areEqual = null, bool shouldLog=false) + private IReadOnlyCollection InternalExpectMsgAllOf(TimeSpan max, IReadOnlyCollection messages, Func areEqual = null, bool shouldLog=false, CancellationToken cancellationToken = default) + { + var task = InternalExpectMsgAllOfAsync(max, messages, areEqual, shouldLog, cancellationToken); + task.WaitAndUnwrapException(cancellationToken); + return task.Result; + } + + private async Task> InternalExpectMsgAllOfAsync(TimeSpan max, + IReadOnlyCollection messages, Func areEqual = null, bool shouldLog = false, + CancellationToken cancellationToken = default) { ConditionalLog(shouldLog, "Expecting {0} messages during {1}", messages.Count, max); areEqual = areEqual ?? ((x, y) => Equals(x, y)); var start = Now; - var receivedMessages = InternalReceiveN(messages.Count, max, shouldLog).ToList(); - var missing = messages.Where(m => !receivedMessages.Any(r => r is T && areEqual((T)r, m))).ToList(); - var unexpected = receivedMessages.Where(r => !messages.Any(m => r is T && areEqual((T)r, m))).ToList(); + + var receivedMessages = await InternalReceiveNAsync(messages.Count, max, shouldLog, cancellationToken).ToListAsync(cancellationToken); + + var missing = messages.Where(m => !receivedMessages.Any(r => r is T obj && areEqual(obj, m))).ToList(); + var unexpected = receivedMessages.Where(r => !messages.Any(m => r is T obj && areEqual(obj, m))).ToList(); CheckMissingAndUnexpected(missing, unexpected, "not found", "found unexpected", shouldLog, string.Format("Expected {0} messages during {1}. Failed after {2}. ", messages.Count, max, Now-start)); return receivedMessages.Cast().ToList(); } - private void CheckMissingAndUnexpected(IReadOnlyCollection missing, IReadOnlyCollection unexpected, string missingMessage, string unexpectedMessage, bool shouldLog, string hint) { var missingIsEmpty = missing.Count == 0; diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 42dedc9ff9d..0d4c0c7ab5b 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -627,6 +627,7 @@ public async ValueTask> ReceiveWhileAsync(Predicate shoul MessageEnvelope msg = NullMessageEnvelope.Instance; while (count < msgs) { + cancellationToken.ThrowIfCancellationRequested(); var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue), cancellationToken).ConfigureAwait(false); if (!peeked.success) { @@ -720,23 +721,19 @@ public async ValueTask> ReceiveNAsync(int numberOfMe { max.EnsureIsPositiveFinite("max"); var dilated = Dilated(max); - var result = await InternalReceiveNAsync(numberOfMessages, dilated, true, cancellationToken).ToListAsync(); + var result = await InternalReceiveNAsync(numberOfMessages, dilated, true, cancellationToken).ToListAsync(cancellationToken); return result; } - private IEnumerable InternalReceiveN(int numberOfMessages, TimeSpan max, bool shouldLog, CancellationToken cancellationToken = default) - { - foreach(var msg in InternalReceiveNAsync(numberOfMessages, max, shouldLog, cancellationToken).ToEnumerable()) - yield return msg; - } - private async IAsyncEnumerable InternalReceiveNAsync(int numberOfMessages, TimeSpan max, bool shouldLog, [EnumeratorCancellation] CancellationToken cancellationToken) { var start = Now; var stop = max + start; ConditionalLog(shouldLog, "Trying to receive {0} messages during {1}.", numberOfMessages, max); - for (int i = 0; i < numberOfMessages; i++) + for (var i = 0; i < numberOfMessages; i++) { + cancellationToken.ThrowIfCancellationRequested(); + var timeout = stop - Now; var o = await ReceiveOneAsync(timeout, cancellationToken); var condition = o != null;