Skip to content

Commit

Permalink
Make sure that DeadLetters published by DistributedPubSubMediator
Browse files Browse the repository at this point in the history
… contain full context of topic (#6209) (#6212)

This is a usability improvement aimed at making sure we contain the full context of what topic we were trying to publish to or which `Put`-ed actor we were trying to publish to when there are no subscribers available on the local node.

(cherry picked from commit 9f84438)

Co-authored-by: Aaron Stannard <[email protected]>
  • Loading branch information
Arkatufus and Aaronontheweb authored Oct 24, 2022
1 parent 4bcf3e6 commit 108c701
Showing 1 changed file with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,19 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
new Router(_settings.RoutingLogic, routees.ToArray()).Route(
Internal.Utils.WrapIfNeeded(send.Message), Sender);
else
IgnoreOrSendToDeadLetters(send.Message);
IgnoreOrSendToDeadLetters(send);
});
Receive<SendToAll>(sendToAll =>
{
PublishMessage(sendToAll.Path, sendToAll.Message, sendToAll.ExcludeSelf);
PublishMessage(sendToAll.Path, sendToAll, sendToAll.ExcludeSelf);
});
Receive<Publish>(publish =>
{
string path = Internal.Utils.MakeKey(Self.Path / Internal.Utils.EncodeName(publish.Topic));
if (publish.SendOneMessageToEachGroup)
PublishToEachGroup(path, publish.Message);
PublishToEachGroup(path, publish);
else
PublishMessage(path, publish.Message);
PublishMessage(path, publish);
});
Receive<Put>(put =>
{
Expand Down Expand Up @@ -500,7 +500,7 @@ private void IgnoreOrSendToDeadLetters(object message)
Context.System.DeadLetters.Tell(new DeadLetter(message, Sender, Context.Self));
}

private void PublishMessage(string path, object message, bool allButSelf = false)
private void PublishMessage(string path, IWrappedMessage publish, bool allButSelf = false)
{
IEnumerable<IActorRef> Refs()
{
Expand All @@ -521,24 +521,24 @@ IEnumerable<IActorRef> Refs()
foreach (var r in Refs())
{
if (r == null) continue;
r.Forward(message);
r.Forward(publish.Message);
counter++;
}

if (counter == 0) IgnoreOrSendToDeadLetters(message);
if (counter == 0) IgnoreOrSendToDeadLetters(publish);
}

private void PublishToEachGroup(string path, object message)
private void PublishToEachGroup(string path, Publish publish)
{
var prefix = path + "/";
var lastKey = path + "0"; // '0' is the next char of '/'

var groups = ExtractGroups(prefix, lastKey).GroupBy(kv => kv.Key).ToList();
var wrappedMessage = new SendToOneSubscriber(message);
var wrappedMessage = new SendToOneSubscriber(publish.Message);

if (groups.Count == 0)
{
IgnoreOrSendToDeadLetters(message);
IgnoreOrSendToDeadLetters(publish);
}
else
{
Expand Down

0 comments on commit 108c701

Please sign in to comment.