From bd276df0b229a0528ba2cd54184db1051360d34e Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Wed, 4 Jan 2023 15:18:27 +0100 Subject: [PATCH] Added support for `UnrestrictedStash` --- docs/articles/actors/receive-actor-api.md | 20 +++++++--- docs/articles/actors/untyped-actor-api.md | 20 +++++++--- .../CoreAPISpec.ApproveCore.Core.verified.txt | 9 +++-- ...oreAPISpec.ApproveCore.DotNet.verified.txt | 9 +++-- .../CoreAPISpec.ApproveCore.Net.verified.txt | 9 +++-- ...ISpec.ApprovePersistence.Core.verified.txt | 8 ++-- ...pec.ApprovePersistence.DotNet.verified.txt | 8 ++-- ...PISpec.ApprovePersistence.Net.verified.txt | 8 ++-- ...rovePersistenceSqlCommon.Core.verified.txt | 4 +- ...vePersistenceSqlCommon.DotNet.verified.txt | 4 +- ...provePersistenceSqlCommon.Net.verified.txt | 4 +- .../Akka/Actor/Stash/IWithBoundedStash.cs | 4 +- src/core/Akka/Actor/Stash/IWithStash.cs | 37 +++++++++++++++++++ .../Akka/Actor/Stash/IWithUnboundedStash.cs | 7 +--- .../Actor/Stash/IWithUnrestrictedStash.cs | 19 ++++++++++ .../Actor/Stash/Internal/AbstractStash.cs | 2 +- .../Stash/Internal/UnrestrictedStashImpl.cs | 16 ++++++++ src/core/Akka/Actor/Stash/StashFactory.cs | 19 ++++------ .../Akka/Dispatch/IRequiresMessageQueue.cs | 17 ++++++--- 19 files changed, 159 insertions(+), 65 deletions(-) create mode 100644 src/core/Akka/Actor/Stash/IWithStash.cs create mode 100644 src/core/Akka/Actor/Stash/IWithUnrestrictedStash.cs create mode 100644 src/core/Akka/Actor/Stash/Internal/UnrestrictedStashImpl.cs diff --git a/docs/articles/actors/receive-actor-api.md b/docs/articles/actors/receive-actor-api.md index 9f8b4f79f96..887bc6b088b 100644 --- a/docs/articles/actors/receive-actor-api.md +++ b/docs/articles/actors/receive-actor-api.md @@ -790,12 +790,15 @@ static void Main(string[] args) ## Stash -The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox. +The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. -Here is an example of the `IWithUnboundedStash` interface in action: +> [!NOTE] +> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes). + +Here is an example of the `IWithStash` interface in action: ```csharp -public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash +public class ActorWithProtocol : ReceiveActor, IWithStash { public IStash Stash { get; set; } @@ -827,11 +830,18 @@ public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash } ``` -Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration. +Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration. Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`. -Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior. +Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. + +However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actor’s mailbox. + +The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior. + +> [!NOTE] +> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead. ## Killing an Actor diff --git a/docs/articles/actors/untyped-actor-api.md b/docs/articles/actors/untyped-actor-api.md index d80054d2756..d1d764bf824 100644 --- a/docs/articles/actors/untyped-actor-api.md +++ b/docs/articles/actors/untyped-actor-api.md @@ -708,12 +708,15 @@ static void Main(string[] args) ## Stash -The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox. +The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. -Here is an example of the `IWithUnboundedStash` interface in action: +> [!NOTE] +> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes). + +Here is an example of the `IWithStash` interface in action: ```csharp -public class ActorWithProtocol : UntypedActor, IWithUnboundedStash +public class ActorWithProtocol : UntypedActor, IWithStash { public IStash Stash { get; set; } @@ -748,11 +751,18 @@ public class ActorWithProtocol : UntypedActor, IWithUnboundedStash } ``` -Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration. +Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration. Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`. -Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior. +Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. + +However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actor’s mailbox. + +The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior. + +> [!NOTE] +> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead. ## Killing an Actor diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt index 43eb962ca27..044c1fc7234 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt @@ -1177,14 +1177,15 @@ namespace Akka.Actor void Become(Akka.Actor.UntypedReceive receive); void BecomeStacked(Akka.Actor.UntypedReceive receive); } - [System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " + - "[0.7.0]")] - public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue { } + [System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")] + public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } public interface IWithTimers { Akka.Actor.ITimerScheduler Timers { get; set; } } - public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { } public interface IWrappedMessage { object Message { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index d3eb1bb8107..49e94cfdb24 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -1179,14 +1179,15 @@ namespace Akka.Actor void Become(Akka.Actor.UntypedReceive receive); void BecomeStacked(Akka.Actor.UntypedReceive receive); } - [System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " + - "[0.7.0]")] - public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue { } + [System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")] + public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } public interface IWithTimers { Akka.Actor.ITimerScheduler Timers { get; set; } } - public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { } public interface IWrappedMessage { object Message { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 43eb962ca27..044c1fc7234 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -1177,14 +1177,15 @@ namespace Akka.Actor void Become(Akka.Actor.UntypedReceive receive); void BecomeStacked(Akka.Actor.UntypedReceive receive); } - [System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " + - "[0.7.0]")] - public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue { } + [System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")] + public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } public interface IWithTimers { Akka.Actor.ITimerScheduler Timers { get; set; } } - public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { } public interface IWrappedMessage { object Message { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Core.verified.txt index 05759e954d8..c967f712df1 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Core.verified.txt @@ -209,7 +209,7 @@ namespace Akka.Persistence { public static Akka.Persistence.DiscardToDeadLetterStrategy Instance { get; } } - public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity + public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity { public static readonly System.Func UnstashFilterPredicate; protected Eventsourced() { } @@ -270,7 +270,7 @@ namespace Akka.Persistence { Akka.Persistence.Recovery Recovery { get; } } - public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { Akka.Persistence.IStashOverflowStrategy InternalStashOverflowStrategy { get; } } @@ -844,7 +844,7 @@ namespace Akka.Persistence.Journal protected System.Exception TryUnwrapException(System.Exception e) { } protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages); } - public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected AsyncWriteProxy() { } public Akka.Actor.IStash Stash { get; set; } @@ -980,7 +980,7 @@ namespace Akka.Persistence.Journal public System.Collections.Generic.IDictionary> Update(string pid, long seqNr, System.Func updater) { } protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } } - public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { public PersistencePluginProxy(Akka.Configuration.Config config) { } public Akka.Actor.IStash Stash { get; set; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt index f09ed14fb62..f2f22083a67 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt @@ -209,7 +209,7 @@ namespace Akka.Persistence { public static Akka.Persistence.DiscardToDeadLetterStrategy Instance { get; } } - public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity + public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity { public static readonly System.Func UnstashFilterPredicate; protected Eventsourced() { } @@ -270,7 +270,7 @@ namespace Akka.Persistence { Akka.Persistence.Recovery Recovery { get; } } - public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { Akka.Persistence.IStashOverflowStrategy InternalStashOverflowStrategy { get; } } @@ -844,7 +844,7 @@ namespace Akka.Persistence.Journal protected System.Exception TryUnwrapException(System.Exception e) { } protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages); } - public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected AsyncWriteProxy() { } public Akka.Actor.IStash Stash { get; set; } @@ -980,7 +980,7 @@ namespace Akka.Persistence.Journal public System.Collections.Generic.IDictionary> Update(string pid, long seqNr, System.Func updater) { } protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } } - public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { public PersistencePluginProxy(Akka.Configuration.Config config) { } public Akka.Actor.IStash Stash { get; set; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt index 05759e954d8..c967f712df1 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt @@ -209,7 +209,7 @@ namespace Akka.Persistence { public static Akka.Persistence.DiscardToDeadLetterStrategy Instance { get; } } - public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity + public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity { public static readonly System.Func UnstashFilterPredicate; protected Eventsourced() { } @@ -270,7 +270,7 @@ namespace Akka.Persistence { Akka.Persistence.Recovery Recovery { get; } } - public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { Akka.Persistence.IStashOverflowStrategy InternalStashOverflowStrategy { get; } } @@ -844,7 +844,7 @@ namespace Akka.Persistence.Journal protected System.Exception TryUnwrapException(System.Exception e) { } protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages); } - public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected AsyncWriteProxy() { } public Akka.Actor.IStash Stash { get; set; } @@ -980,7 +980,7 @@ namespace Akka.Persistence.Journal public System.Collections.Generic.IDictionary> Update(string pid, long seqNr, System.Func updater) { } protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } } - public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { public PersistencePluginProxy(Akka.Configuration.Config config) { } public Akka.Actor.IStash Stash { get; set; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Core.verified.txt index f3dfaddb419..9cb9bae2b7f 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Core.verified.txt @@ -272,7 +272,7 @@ namespace Akka.Persistence.Sql.Common.Journal public long Offset { get; } public Akka.Actor.IActorRef ReplyTo { get; } } - public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlJournal(Akka.Configuration.Config journalConfig) { } protected bool HasNewEventSubscribers { get; } @@ -424,7 +424,7 @@ namespace Akka.Persistence.Sql.Common.Snapshot public readonly System.DateTime Timestamp; public SnapshotEntry(string persistenceId, long sequenceNr, System.DateTime timestamp, string manifest, object payload) { } } - public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlSnapshotStore(Akka.Configuration.Config config) { } protected Akka.Event.ILoggingAdapter Log { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt index e5374efd1a8..920493760d0 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt @@ -272,7 +272,7 @@ namespace Akka.Persistence.Sql.Common.Journal public long Offset { get; } public Akka.Actor.IActorRef ReplyTo { get; } } - public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlJournal(Akka.Configuration.Config journalConfig) { } protected bool HasNewEventSubscribers { get; } @@ -424,7 +424,7 @@ namespace Akka.Persistence.Sql.Common.Snapshot public readonly System.DateTime Timestamp; public SnapshotEntry(string persistenceId, long sequenceNr, System.DateTime timestamp, string manifest, object payload) { } } - public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlSnapshotStore(Akka.Configuration.Config config) { } protected Akka.Event.ILoggingAdapter Log { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt index f3dfaddb419..9cb9bae2b7f 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt @@ -272,7 +272,7 @@ namespace Akka.Persistence.Sql.Common.Journal public long Offset { get; } public Akka.Actor.IActorRef ReplyTo { get; } } - public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlJournal(Akka.Configuration.Config journalConfig) { } protected bool HasNewEventSubscribers { get; } @@ -424,7 +424,7 @@ namespace Akka.Persistence.Sql.Common.Snapshot public readonly System.DateTime Timestamp; public SnapshotEntry(string persistenceId, long sequenceNr, System.DateTime timestamp, string manifest, object payload) { } } - public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue + public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { protected SqlSnapshotStore(Akka.Configuration.Config config) { } protected Akka.Event.ILoggingAdapter Log { get; } diff --git a/src/core/Akka/Actor/Stash/IWithBoundedStash.cs b/src/core/Akka/Actor/Stash/IWithBoundedStash.cs index d96e2739161..5955222de46 100644 --- a/src/core/Akka/Actor/Stash/IWithBoundedStash.cs +++ b/src/core/Akka/Actor/Stash/IWithBoundedStash.cs @@ -17,8 +17,8 @@ namespace Akka.Actor /// public IStash Stash { get; set; } /// // ReSharper disable once InconsistentNaming - [Obsolete("Bounded stashing is not yet implemented. Unbounded stashing will be used instead [0.7.0]")] - public interface IWithBoundedStash : IActorStash, IRequiresMessageQueue + [Obsolete("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")] + public interface IWithBoundedStash : IWithUnrestrictedStash, IRequiresMessageQueue { } } diff --git a/src/core/Akka/Actor/Stash/IWithStash.cs b/src/core/Akka/Actor/Stash/IWithStash.cs new file mode 100644 index 00000000000..bcc7b2cafe5 --- /dev/null +++ b/src/core/Akka/Actor/Stash/IWithStash.cs @@ -0,0 +1,37 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Dispatch; + +namespace Akka.Actor +{ + /// + /// The `IWithStash` interface enables an actor to temporarily stash away messages that can not or + /// should not be handled using the actor's current behavior. + /// + /// Note that the `IWithStash` interface can only be used together with actors that have a deque-based + /// mailbox. By default Stash based actors request a Deque based mailbox since the stash + /// interface extends . + /// + /// You can override the default mailbox provided when `IDequeBasedMessageQueueSemantics` are requested via config: + /// + /// akka.actor.mailbox.requirements { + /// "Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics" = your-custom-mailbox + /// } + /// + /// Alternatively, you can add your own requirement marker to the actor and configure a mailbox type to be used + /// for your marker. + /// + /// For a `Stash` that also enforces unboundedness of the deque see . For a `Stash` + /// that does not enforce any mailbox type see . + /// + /// + public interface IWithStash : IWithUnrestrictedStash, IRequiresMessageQueue + { + } +} + diff --git a/src/core/Akka/Actor/Stash/IWithUnboundedStash.cs b/src/core/Akka/Actor/Stash/IWithUnboundedStash.cs index f1b3facb30e..c1f9547a32b 100644 --- a/src/core/Akka/Actor/Stash/IWithUnboundedStash.cs +++ b/src/core/Akka/Actor/Stash/IWithUnboundedStash.cs @@ -10,13 +10,10 @@ namespace Akka.Actor { /// - /// Lets the know that this Actor needs stash support - /// with unrestricted storage capacity. - /// You need to add the property: - /// public IStash Stash { get; set; } + /// The `IWithUnboundedStash` interface is a version of that enforces an unbounded stash for you actor. /// // ReSharper disable once InconsistentNaming - public interface IWithUnboundedStash : IActorStash, IRequiresMessageQueue + public interface IWithUnboundedStash : IWithUnrestrictedStash, IRequiresMessageQueue { } } diff --git a/src/core/Akka/Actor/Stash/IWithUnrestrictedStash.cs b/src/core/Akka/Actor/Stash/IWithUnrestrictedStash.cs new file mode 100644 index 00000000000..65d16dc7935 --- /dev/null +++ b/src/core/Akka/Actor/Stash/IWithUnrestrictedStash.cs @@ -0,0 +1,19 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Dispatch; + +namespace Akka.Actor +{ + /// + /// A version of that does not enforce any mailbox type. The proper mailbox has to be configured + /// manually, and the mailbox should extend the marker interface. + /// + public interface IWithUnrestrictedStash : IActorStash + { + } +} diff --git a/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs b/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs index c029120b0f6..fc1098bdbf8 100644 --- a/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs +++ b/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs @@ -19,7 +19,7 @@ namespace Akka.Actor.Internal /// INTERNAL API /// /// Support class for implementing a stash for an actor instance. A default stash per actor (= user stash) - /// is maintained by [[UnrestrictedStash]] by extending this trait. Actors that explicitly need other stashes + /// is maintained by this class. Actors that explicitly need other stashes /// (optionally in addition to and isolated from the user stash) can create new stashes via . /// /// diff --git a/src/core/Akka/Actor/Stash/Internal/UnrestrictedStashImpl.cs b/src/core/Akka/Actor/Stash/Internal/UnrestrictedStashImpl.cs new file mode 100644 index 00000000000..87e88603371 --- /dev/null +++ b/src/core/Akka/Actor/Stash/Internal/UnrestrictedStashImpl.cs @@ -0,0 +1,16 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +namespace Akka.Actor.Internal +{ + internal class UnrestrictedStashImpl : AbstractStash + { + public UnrestrictedStashImpl(IActorContext context) + : base(context) + { } + } +} diff --git a/src/core/Akka/Actor/Stash/StashFactory.cs b/src/core/Akka/Actor/Stash/StashFactory.cs index 022183585ae..d639dba6f0e 100644 --- a/src/core/Akka/Actor/Stash/StashFactory.cs +++ b/src/core/Akka/Actor/Stash/StashFactory.cs @@ -22,7 +22,7 @@ public static class StashFactory /// TBD /// TBD /// TBD - public static IStash CreateStash(this IActorContext context) where T:ActorBase + public static IStash CreateStash(this IActorContext context) where T : ActorBase { var actorType = typeof(T); return CreateStash(context, actorType); @@ -34,10 +34,8 @@ public static IStash CreateStash(this IActorContext context) where T:ActorBas /// TBD /// TBD /// TBD - public static IStash CreateStash(this IActorContext context, IActorStash actorInstance) - { - return CreateStash(context, actorInstance.GetType()); - } + public static IStash CreateStash(this IActorContext context, IActorStash actorInstance) => + CreateStash(context, actorInstance.GetType()); /// /// TBD @@ -50,15 +48,14 @@ public static IStash CreateStash(this IActorContext context, IActorStash actorIn /// TBD public static IStash CreateStash(this IActorContext context, Type actorType) { - if(actorType.Implements()) - { + if (actorType.Implements()) return new BoundedStashImpl(context); - } - if(actorType.Implements()) - { + if (actorType.Implements()) return new UnboundedStashImpl(context); - } + + if (actorType.Implements()) + return new UnrestrictedStashImpl(context); throw new ArgumentException($"Actor {actorType} implements an unrecognized subclass of {typeof(IActorStash)} - cannot instantiate", nameof(actorType)); } diff --git a/src/core/Akka/Dispatch/IRequiresMessageQueue.cs b/src/core/Akka/Dispatch/IRequiresMessageQueue.cs index d11b16fe8fe..add8a738144 100644 --- a/src/core/Akka/Dispatch/IRequiresMessageQueue.cs +++ b/src/core/Akka/Dispatch/IRequiresMessageQueue.cs @@ -5,17 +5,22 @@ // //----------------------------------------------------------------------- -using Akka.Actor; -using Akka.Dispatch.MessageQueues; - namespace Akka.Dispatch { /// - /// Used to help give hints to the as to what types of this - /// actor requires. Used mostly for system actors. + /// Interface to signal that an Actor requires a certain type of message queue semantics. + /// + /// The mailbox type will be looked up by mapping the type T via akka.actor.mailbox.requirements in the config, + /// to a mailbox configuration. If no mailbox is assigned on Props or in deployment config then this one will be used. + /// + /// + /// The queue type of the created mailbox will be checked against the type T and actor creation will fail if it doesn't + /// fulfill the requirements. + /// /// /// The type of required - public interface IRequiresMessageQueue where T:ISemantics + public interface IRequiresMessageQueue + where T : ISemantics { } }