From 9f84438ca4a04efb34b8f5a20ff1dc8c020a256b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 24 Oct 2022 12:22:15 -0500 Subject: [PATCH] Make sure that `DeadLetter`s published by `DistributedPubSubMediator` contain full context of topic (#6209) This is a usability improvement aimed at making sure we contain the full context of what topic we were trying to publish to or which `Put`-ed actor we were trying to publish to when there are no subscribers available on the local node. --- .../DistributedPubSubMediator.cs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 6a30dfa09df..2c02b8d6c66 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -192,19 +192,19 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) new Router(_settings.RoutingLogic, routees.ToArray()).Route( Internal.Utils.WrapIfNeeded(send.Message), Sender); else - IgnoreOrSendToDeadLetters(send.Message); + IgnoreOrSendToDeadLetters(send); }); Receive(sendToAll => { - PublishMessage(sendToAll.Path, sendToAll.Message, sendToAll.ExcludeSelf); + PublishMessage(sendToAll.Path, sendToAll, sendToAll.ExcludeSelf); }); Receive(publish => { string path = Internal.Utils.MakeKey(Self.Path / Internal.Utils.EncodeName(publish.Topic)); if (publish.SendOneMessageToEachGroup) - PublishToEachGroup(path, publish.Message); + PublishToEachGroup(path, publish); else - PublishMessage(path, publish.Message); + PublishMessage(path, publish); }); Receive(put => { @@ -500,7 +500,7 @@ private void IgnoreOrSendToDeadLetters(object message) Context.System.DeadLetters.Tell(new DeadLetter(message, Sender, Context.Self)); } - private void PublishMessage(string path, object message, bool allButSelf = false) + private void PublishMessage(string path, IWrappedMessage publish, bool allButSelf = false) { IEnumerable Refs() { @@ -521,24 +521,24 @@ IEnumerable Refs() foreach (var r in Refs()) { if (r == null) continue; - r.Forward(message); + r.Forward(publish.Message); counter++; } - if (counter == 0) IgnoreOrSendToDeadLetters(message); + if (counter == 0) IgnoreOrSendToDeadLetters(publish); } - private void PublishToEachGroup(string path, object message) + private void PublishToEachGroup(string path, Publish publish) { var prefix = path + "/"; var lastKey = path + "0"; // '0' is the next char of '/' var groups = ExtractGroups(prefix, lastKey).GroupBy(kv => kv.Key).ToList(); - var wrappedMessage = new SendToOneSubscriber(message); + var wrappedMessage = new SendToOneSubscriber(publish.Message); if (groups.Count == 0) { - IgnoreOrSendToDeadLetters(message); + IgnoreOrSendToDeadLetters(publish); } else {