Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: no DeadLetters when publishing to a Topic with no subscribers #5561

Merged
merged 5 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,23 @@
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.TestKit;
using Xunit;
using Akka.Event;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Cluster.Tools.Tests.PublishSubscribe
{
[Collection(nameof(DistributedPubSubMediatorSpec))]
public class DistributedPubSubMediatorSpec : AkkaSpec
{
public DistributedPubSubMediatorSpec() : base(GetConfig()) { }
public DistributedPubSubMediatorSpec() : base(GetConfig())
{
}

public static Config GetConfig()
{
return ConfigurationFactory.ParseString("akka.actor.provider = \"Akka.Cluster.ClusterActorRefProvider, Akka.Cluster\"");
return ConfigurationFactory.ParseString(
@"akka.actor.provider = cluster");
}

/// <summary>
Expand All @@ -41,11 +47,8 @@ public void DistributedPubSubMediator_can_be_activated_in_child_actor()
mediator = DistributedPubSub.Get(actorContext.System).Mediator;
};

dsl.Receive<string>(s => s.Equals("check"), (s, actorContext) =>
{
actorContext.Sender.Tell(mediator);
});

dsl.Receive<string>(s => s.Equals("check"),
(s, actorContext) => { actorContext.Sender.Tell(mediator); });
}, "childActor");

actor.Tell("check");
Expand All @@ -54,5 +57,40 @@ public void DistributedPubSubMediator_can_be_activated_in_child_actor()
a.ShouldNotBe(ActorRefs.Nobody);
});
}

/// <summary>
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/5352
/// </summary>
[Fact]
public async Task DistributedPubSubMediator_should_send_messages_to_dead_letter()
{
// arrange
var mediator = DistributedPubSub.Get(Sys).Mediator;
var actor = Sys.ActorOf((dsl, context) => { }, "childActor");

// act
// create a topic
mediator.Tell(new Subscribe("pub-sub", actor));
_ = ExpectMsg<SubscribeAck>();

// all subscribers should be removed from this topic
// topic actor will still be alive for default value of 120s
mediator.Tell(new Unsubscribe("pub-sub", actor));
_ = ExpectMsg<UnsubscribeAck>();

// assert
await EventFilter.DeadLetter<object>().ExpectAsync(1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reproduction - as @eaba pointed out this spec would fail before because the TTL value kept the topic actor alive for 120s. The real bug was that the topic actor never published any DeadLetter instances if it was alive but had no subscribers.

() => { mediator.Tell(new Publish("pub-sub", $"hit")); });
}
}

public sealed class QueryTopics
{
public static QueryTopics Instance = new QueryTopics();
}

public sealed class PublishTopic
{
public static PublishTopic Instance = new PublishTopic();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ private long NextVersion()

private IActorRef NewTopicActor(string encodedTopic)
{
var t = Context.ActorOf(Actor.Props.Create(() => new Topic(_settings.RemovedTimeToLive, _settings.RoutingLogic)), encodedTopic);
var t = Context.ActorOf(Actor.Props.Create(() => new Topic(_settings.RemovedTimeToLive, _settings.RoutingLogic, _settings.SendToDeadLettersWhenNoSubscribers)), encodedTopic);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Propagate akka.cluster.pub-sub.send-to-dead-letters-when-no-subscribers to the topic actors, who are the ones who actually need to implement this.

HandleRegisterTopic(t);
return t;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,53 @@
using System.Linq;
using System.Runtime.CompilerServices;
using Akka.Actor;
using Akka.Event;
using Akka.Remote;
using Akka.Routing;

namespace Akka.Cluster.Tools.PublishSubscribe.Internal
{
/// <summary>
/// TBD
/// A <see cref="DeadLetter"/> published when there are no subscribers
/// for a topic that has received a <see cref="Publish"/> event.
/// </summary>
internal readonly struct NoSubscribersDeadLetter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an envelope NoSubscribersDeadLetter in order to help make it clear inside the user's logging system why this DeadLetter was published. Created it as a readonly struct because it will immediately be captured inside a DeadLetter and therefore we can save on an allocation here in the event of a large number of deadletters being published all at once.

{
public NoSubscribersDeadLetter(string topic, object message)
{
Topic = topic;
Message = message;
}

public string Topic { get; }
public object Message { get; }

public override string ToString()
{
return $"NoSubscribersDeadLetter(Topic=[{Topic}],Message=[{Message}])";
}
}

/// <summary>
/// Base class for both topics and groups.
/// </summary>
internal abstract class TopicLike : ActorBase
{
/// <summary>
/// TBD
/// </summary>
protected readonly TimeSpan PruneInterval;

/// <summary>
/// TBD
/// </summary>
protected readonly ICancelable PruneCancelable;

/// <summary>
/// TBD
/// </summary>
protected readonly ISet<IActorRef> Subscribers;

/// <summary>
/// TBD
/// </summary>
Expand All @@ -44,31 +69,39 @@ internal abstract class TopicLike : ActorBase
protected Deadline PruneDeadline = null;

/// <summary>
/// TBD
/// Used to toggle what we do during publication when there are no subscribers
/// </summary>
protected readonly bool SendToDeadLettersWhenNoSubscribers;

/// <summary>
/// Creates a new instance of a topic or group actor.
/// </summary>
/// <param name="emptyTimeToLive">TBD</param>
protected TopicLike(TimeSpan emptyTimeToLive)
/// <param name="emptyTimeToLive">The TTL for how often this actor will be removed.</param>
/// <param name="sendToDeadLettersWhenNone">When set to <c>true</c>, this actor will
/// publish a <see cref="DeadLetter"/> for each message if the total number of subscribers == 0.</param>
protected TopicLike(TimeSpan emptyTimeToLive, bool sendToDeadLettersWhenNone)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated constructor to receive new value. These are all internal classes so no breaking changes here.

{
Subscribers = new HashSet<IActorRef>();
EmptyTimeToLive = emptyTimeToLive;
SendToDeadLettersWhenNoSubscribers = sendToDeadLettersWhenNone;
PruneInterval = new TimeSpan(emptyTimeToLive.Ticks / 2);
PruneCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(PruneInterval, PruneInterval, Self, Prune.Instance, Self);
PruneCancelable =
Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(PruneInterval, PruneInterval, Self,
Prune.Instance, Self);
}

/// <summary>
/// TBD
/// </summary>
/// <inheritdoc cref="ActorBase.PostStop"/>
protected override void PostStop()
{
base.PostStop();
PruneCancelable.Cancel();
}

/// <summary>
/// TBD
/// Default <see cref="Receive"/> method for <see cref="DistributedPubSub"/> messages.
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
/// <param name="message">The message we're going to process.</param>
/// <returns>true if we handled it, false otherwise.</returns>
protected bool DefaultReceive(object message)
{
switch (message)
Expand Down Expand Up @@ -96,6 +129,7 @@ protected bool DefaultReceive(object message)
PruneDeadline = null;
Context.Parent.Tell(NoMoreSubscribers.Instance);
}

return true;

case TerminateRequest _:
Expand All @@ -107,6 +141,7 @@ protected bool DefaultReceive(object message)
{
Context.Parent.Tell(NewSubscriberArrived.Instance);
}

return true;

case Count _:
Expand All @@ -116,22 +151,23 @@ protected bool DefaultReceive(object message)
default:
foreach (var subscriber in Subscribers)
subscriber.Forward(message);

// no subscribers
if (Subscribers.Count == 0 && SendToDeadLettersWhenNoSubscribers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bugfix: @eaba pointed out that this actor never was programmed to publish to deadletters when the subscriber count is zero, so we do this after the foreach loop (arbitrary) if we're configured for it. This causes the reproduction spec to now pass.

{
var noSubs = new NoSubscribersDeadLetter(Context.Self.Path.Name, message);
var deadLetter = new DeadLetter(noSubs, Sender, Self);
Context.System.EventStream.Publish(deadLetter);
}

return true;
}
}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
/// <inheritdoc cref="TopicLike.Business"/>
protected abstract bool Business(object message);

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
/// <inheritdoc cref="ActorBase.Receive"/>
protected override bool Receive(object message)
{
return Business(message) || DefaultReceive(message);
Expand All @@ -149,29 +185,27 @@ protected void Remove(IActorRef actorRef)
}

/// <summary>
/// TBD
/// Actor responsible for owning a single topic.
/// </summary>
internal class Topic : TopicLike
{
private readonly RoutingLogic _routingLogic;
private readonly PerGroupingBuffer _buffer;

/// <summary>
/// TBD
/// Creates a new topic actor
/// </summary>
/// <param name="emptyTimeToLive">TBD</param>
/// <param name="routingLogic">TBD</param>
public Topic(TimeSpan emptyTimeToLive, RoutingLogic routingLogic) : base(emptyTimeToLive)
/// <param name="emptyTimeToLive">The TTL for how often this actor will be removed.</param>
/// <param name="sendToDeadLettersWhenNone">When set to <c>true</c>, this actor will
/// publish a <see cref="DeadLetter"/> for each message if the total number of subscribers == 0.</param>
/// <param name="routingLogic">The routing logic to use for distributing messages to subscribers.</param>
public Topic(TimeSpan emptyTimeToLive, RoutingLogic routingLogic, bool sendToDeadLettersWhenNone) : base(emptyTimeToLive, sendToDeadLettersWhenNone)
{
_routingLogic = routingLogic;
_buffer = new PerGroupingBuffer();
}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
/// <inheritdoc cref="TopicLike.Business"/>
protected override bool Business(object message)
{
switch (message)
Expand Down Expand Up @@ -234,40 +268,39 @@ protected override bool Business(object message)
Remove(terminated.ActorRef);
return true;
}

return false;
}

private IActorRef NewGroupActor(string encodedGroup)
{
var g = Context.ActorOf(Props.Create(() => new Group(EmptyTimeToLive, _routingLogic)), encodedGroup);
var g = Context.ActorOf(Props.Create(() => new Group(EmptyTimeToLive, _routingLogic, SendToDeadLettersWhenNoSubscribers)), encodedGroup);
Context.Watch(g);
Context.Parent.Tell(new RegisterTopic(g));
return g;
}
}

/// <summary>
/// TBD
/// Actor that handles "group" subscribers to a topic.
/// </summary>
internal class Group : TopicLike
{
private readonly RoutingLogic _routingLogic;

/// <summary>
/// TBD
/// Creates a new group actor.
/// </summary>
/// <param name="emptyTimeToLive">TBD</param>
/// <param name="routingLogic">TBD</param>
public Group(TimeSpan emptyTimeToLive, RoutingLogic routingLogic) : base(emptyTimeToLive)
/// <param name="emptyTimeToLive">The TTL for how often this actor will be removed.</param>
/// <param name="sendToDeadLettersWhenNone">When set to <c>true</c>, this actor will
/// publish a <see cref="DeadLetter"/> for each message if the total number of subscribers == 0.</param>
/// <param name="routingLogic">The routing logic to use for distributing messages to subscribers.</param>
public Group(TimeSpan emptyTimeToLive, RoutingLogic routingLogic, bool sendToDeadLettersWhenNone) : base(emptyTimeToLive, sendToDeadLettersWhenNone)
{
_routingLogic = routingLogic;
}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
/// <inheritdoc cref="TopicLike.Business"/>
protected override bool Business(object message)
{
if (message is SendToOneSubscriber send)
Expand All @@ -279,16 +312,20 @@ protected override bool Business(object message)
}
}
else return false;

return true;
}
}

/// <summary>
/// TBD
/// INTERNAL API
///
/// Used for generating Uri-safe topic and group names.
/// </summary>
internal static class Utils
{
private static System.Text.RegularExpressions.Regex _pathRegex = new System.Text.RegularExpressions.Regex("^/remote/.+(/user/.+)");
private static System.Text.RegularExpressions.Regex _pathRegex =
new System.Text.RegularExpressions.Regex("^/remote/.+(/user/.+)");

/// <summary>
/// <para>
Expand Down Expand Up @@ -339,4 +376,4 @@ public static string MakeKey(ActorPath path)
return _pathRegex.Replace(path.ToStringWithoutAddress(), "$1");
}
}
}
}