Skip to content

Commit

Permalink
feature: Akka.Delivery - reliable point-to-point delivery + message c…
Browse files Browse the repository at this point in the history
…hunking (#6720)

* feature: Akka.Delivery

Added core Akka.Delivery classes:

* `ProducerController`
* `ConsumerController`
* `ChunkedMessage`

* fixed missing curly brace

* enabled `#nullable enable` on new APIs

* checking in API changes with nullability enabled

* fixed some XML-DOC errors

* added Akka.Delivery protobuf definitions

* added Akka.Cluster.Sharding components

* finished integrating Akka.Cluster.Sharding.Delivery tests

* fixed XML-DOC error

* added type manifest data to ReliableDelivery protos

* working on generics serialization

* fix compilation error

* have sequenced message serialization down

* fleshing out more serialized types

* added chunk detection to MessageSent

* completed serializer

* moved reliable delivery serializer to Akka.Cluster

* fixed serialization namespaces

* defining ReliableDeliverySerializerSpecs

* finished all serialization specs

* added API approvals

* added event-sourced durable producer queue

* finished event sourced queue implementation - added API approvals

* cleanedup persistence code

* fixed some EventSourcedProducerQueue bugs

* finished all EventSourcedProducerQueue specs

* fix DocFx issue

* added reliable delivery specs on top of event-sourced queue

* increased timeouts

reviewed the logs - the tests just took longer to run on AzDo than my development machine. Everything was still operating normally but SeqNr 42 hadn't been hit yet by any of the consumers at the time the test terminated (they were at 37 / 38)

* reformat

* adding DurableShardingSpec

* fixed durable state bug in ShardingProducerControllerImpl

* finished final spec

* updating ShoppingCart sample

* made Akka.Remote "Failed to write message to the transport" more explicit

* adding ShardingEnvelope to ClusterMessageSerializer

* finished `ShardingEnvelope` serialization support

* fixed Uri encoding error for sharded consumer controllers

* fixed code sample

* added API approvals

* fix serialization error during chunking

* added test to validate error with chunking and acks

* disable chunking support for Akka.Cluster.Sharding

* fixed config loading error

* fixed compilation error

* fixes and final API approvals
  • Loading branch information
Aaronontheweb authored May 8, 2023
1 parent 899e62f commit 96cd8db
Show file tree
Hide file tree
Showing 64 changed files with 13,172 additions and 100 deletions.
2 changes: 1 addition & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,12 @@ Target "Protobuf" <| fun _ ->
let protoFiles = [
("WireFormats.proto", "/src/core/Akka.Remote/Serialization/Proto/");
("ContainerFormats.proto", "/src/core/Akka.Remote/Serialization/Proto/");
("ContainerFormats.proto", "/src/core/Akka.Remote/Serialization/Proto/");
("SystemMessageFormats.proto", "/src/core/Akka.Remote/Serialization/Proto/");
("ClusterMessages.proto", "/src/core/Akka.Cluster/Serialization/Proto/");
("ClusterClientMessages.proto", "/src/contrib/cluster/Akka.Cluster.Tools/Client/Serialization/Proto/");
("DistributedPubSubMessages.proto", "/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Serialization/Proto/");
("ClusterShardingMessages.proto", "/src/contrib/cluster/Akka.Cluster.Sharding/Serialization/Proto/");
("ReliableDelivery.proto", "/src/core/Akka.Cluster/Serialization/Proto/");
("TestConductorProtocol.proto", "/src/core/Akka.Remote.TestKit/Proto/");
("Persistence.proto", "/src/core/Akka.Persistence/Serialization/Proto/");
("StreamRefMessages.proto", "/src/core/Akka.Streams/Serialization/Proto/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public override void AroundPreStart()
/// <param name="receive">TBD</param>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
protected override bool AroundReceive(Receive receive, object message)
protected internal override bool AroundReceive(Receive receive, object message)
{
if (_isInitialized)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public override void AroundPreStart()
/// <param name="receive">TBD</param>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
protected override bool AroundReceive(Receive receive, object message)
protected internal override bool AroundReceive(Receive receive, object message)
{
if (_isInitialized)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\..\..\core\Akka.Tests\Delivery\TestConsumer.cs">
<Link>Delivery\TestConsumer.cs</Link>
</Compile>
<Compile Include="..\..\..\core\Akka.Tests\Delivery\TestProducer.cs">
<Link>Delivery\TestProducer.cs</Link>
</Compile>
<Compile Include="..\Akka.Cluster.Sharding.Tests.MultiNode\AsyncWriteProxyEx.cs" Link="AsyncWriteProxyEx.cs" />
<Compile Include="..\Akka.Cluster.Sharding.Tests.MultiNode\MemoryJournalShared.cs" Link="MemoryJournalShared.cs" />
<Compile Include="..\Akka.Cluster.Sharding.Tests.MultiNode\MemorySnapshotStoreShared.cs" Link="MemorySnapshotStoreShared.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
using System;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Cluster.Configuration;
using Akka.Cluster.Sharding.Internal;
using Akka.Cluster.Sharding.Serialization;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Delivery;
using Akka.Serialization;
using Akka.TestKit;
using Akka.TestKit.TestActors;
using Akka.Util.Internal;
using FluentAssertions;
using Google.Protobuf;
Expand All @@ -30,7 +34,7 @@ public class ClusterShardingMessageSerializerSpec : AkkaSpec
private IActorRef regionProxy2;

private static Config SpecConfig =>
ClusterSingletonManager.DefaultConfig().WithFallback(ClusterSharding.DefaultConfig());
ClusterSingletonManager.DefaultConfig().WithFallback(ClusterSharding.DefaultConfig()).WithFallback(ClusterConfigFactory.Default());

public ClusterShardingMessageSerializerSpec() : base(SpecConfig)
{
Expand All @@ -47,7 +51,7 @@ private void CheckSerialization(object obj)
serializer.Should().BeOfType<ClusterShardingMessageSerializer>();
var blob = serializer.ToBinary(obj);
var reference = serializer.FromBinary(blob, serializer.Manifest(obj));
reference.Should().Be(obj);
reference.Should().BeEquivalentTo(obj);
}

[Fact]
Expand Down Expand Up @@ -211,5 +215,16 @@ public void ClusterShardingMessageSerializer_must_serialize_ShardRegionQuery()
ImmutableHashSet.Create("14", "15")
));
}

[Fact]
public void ClusterShardingMessageSerializer_must_serialize_ShardingEnvelope()
{
var producer = Sys.ActorOf(BlackHoleActor.Props, "fakeProducer");
CheckSerialization(new ShardingEnvelope("entity-1", 11));
CheckSerialization(new ShardingEnvelope("entity-1", new ConsumerController.SequencedMessage<string>("p1", 11, "msg-1", true, false, producer)));
CheckSerialization(new ShardingEnvelope("entity-1", new ConsumerController.SequencedMessage<TestJob>("p1", 11, new TestJob("foo1"), true, false, producer)));
}

private record TestJob(string Job);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
//-----------------------------------------------------------------------
// <copyright file="DurableShardingSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Cluster.Sharding.Delivery;
using Akka.Configuration;
using Akka.Delivery;
using Akka.Event;
using Akka.Persistence.Delivery;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;
using static Akka.Tests.Delivery.TestConsumer;

namespace Akka.Cluster.Sharding.Tests.Delivery;

public class DurableShardingSpec : AkkaSpec
{
public static readonly Config Config = @"
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.reliable-delivery.consumer-controller.flow-control-window = 20
akka.persistence.journal.plugin = ""akka.persistence.journal.inmem""
akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.inmem""
";

public DurableShardingSpec(ITestOutputHelper output) : base(Config, output)
{
// TODO: add journal operations subscriptions, once that's properly supported in Akka.Persistence
}

private int _idCount;

private string ProducerId => $"p-{_idCount}";

private int NextId()
{
return _idCount++;
}

private async Task JoinCluster()
{
var cluster = Cluster.Get(Sys);
await cluster.JoinAsync(cluster.SelfAddress);
await AwaitAssertAsync(() => Assert.True(cluster.IsUp));
}

[Fact]
public async Task ReliableDelivery_with_sharding_and_durable_queue_must_load_initial_state_and_resend_unconfirmed()
{
await JoinCluster();
NextId();

var consumerProbe = CreateTestProbe();
var sharding = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", s =>
ShardingConsumerController.Create<Job>(c =>
Props.Create(() => new Consumer(c, consumerProbe)),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));

var durableQueueProps = EventSourcedProducerQueue.Create<Job>(ProducerId, Sys);
var shardingProducerController =
Sys.ActorOf(
ShardingProducerController.Create<Job>(ProducerId, sharding, durableQueueProps,
ShardingProducerController.Settings.Create(Sys)), $"shardingProducerController-{_idCount}");
var producerProbe = CreateTestProbe();
shardingProducerController.Tell(new ShardingProducerController.Start<Job>(producerProbe.Ref));

for (var i = 1; i <= 4; i++)
{
(await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>()).SendNextTo.Tell(
new ShardingEnvelope("entity-1", new Job($"msg-{i}")));
// TODO: need journal operations queries here to verify that the message was persisted
}

var delivery1 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
delivery1.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
// TODO: need journal operations queries here to verify that the Confirmed was persisted

var delivery2 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
delivery2.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
// TODO: need journal operations queries here to verify that the Confirmed was persisted

await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>();

// let the initial messages reach the ShardingConsumerController before stopping ShardingProducerController
var delivery3 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
delivery3.Msg.Should().Be(new Job("msg-3"));
delivery3.SeqNr.Should().Be(3);

await Task.Delay(1000);

Sys.Log.Info("Stopping [{0}]", shardingProducerController);
Watch(shardingProducerController);
Sys.Stop(shardingProducerController);
await ExpectTerminatedAsync(shardingProducerController);

var shardingProducerController2 =
Sys.ActorOf(
ShardingProducerController.Create<Job>(ProducerId, sharding, durableQueueProps,
ShardingProducerController.Settings.Create(Sys)), $"shardingProducerController2-{_idCount}");
shardingProducerController2.Tell(new ShardingProducerController.Start<Job>(producerProbe.Ref));

// delivery3 and delivery4 are still from old shardingProducerController, that were queued in ConsumerController
delivery3.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
// that confirmation goes to old dead shardingProducerController, and therefore not stored
// TODO: need journal operations queries here to verify that the Confirmed WAS NOT persisted

var delivery4 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
delivery4.Msg.Should().Be(new Job("msg-4"));
delivery4.SeqNr.Should().Be(4);
delivery4.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
// that confirmation goes to old dead shardingProducerController, and therefore not stored
// TODO: need journal operations queries here to verify that the Confirmed WAS NOT persisted

// now the unconfirmed are re-delivered
var redelivery3 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
redelivery3.Msg.Should().Be(new Job("msg-3"));
redelivery3.SeqNr.Should().Be(1); // new ProducerController and there starting at 1
redelivery3.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
// TODO: need journal operations queries here to verify that the Confirmed was persisted

var redelivery4 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
redelivery4.Msg.Should().Be(new Job("msg-4"));
redelivery4.SeqNr.Should().Be(2);
redelivery4.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
// TODO: need journal operations queries here to verify that the Confirmed was persisted

var next5 = await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>();
next5.SendNextTo.Tell(new ShardingEnvelope("entity-1", new Job("msg-5")));
// TODO: need journal operations queries here to verify that the message was persisted


// the consumer controller may have stopped after msg-5, so allow for resend on timeout (10-15s)
var delivery5 = await consumerProbe.ExpectMsgAsync<JobDelivery>(TimeSpan.FromSeconds(20));
delivery5.Msg.Should().Be(new Job("msg-5"));
delivery5.SeqNr.Should().Be(3); // 3, instead of 5, because SeqNr reset upon ProducerController restart
delivery5.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
// TODO: need journal operations queries here to verify that the Confirmed was persisted
}

[Fact]
public async Task ReliableDelivery_with_sharding_and_durable_queue_must_reply_to_MessageWithConfirmation_after_storage()
{
await JoinCluster();
NextId();

var consumerProbe = CreateTestProbe();
var sharding = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", s =>
ShardingConsumerController.Create<Job>(c =>
Props.Create(() => new Consumer(c, consumerProbe)),
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
HashCodeMessageExtractor.Create(10,
o =>
{
if (o is ShardingEnvelope se)
return se.EntityId;
return string.Empty;
}, o =>
{
if (o is ShardingEnvelope se)
return se.Message;
return o;
}));

var durableQueueProps = EventSourcedProducerQueue.Create<Job>(ProducerId, Sys);
var shardingProducerController =
Sys.ActorOf(
ShardingProducerController.Create<Job>(ProducerId, sharding, durableQueueProps,
ShardingProducerController.Settings.Create(Sys)), $"shardingProducerController-{_idCount}");
var producerProbe = CreateTestProbe();
shardingProducerController.Tell(new ShardingProducerController.Start<Job>(producerProbe.Ref));

var replyProbe = CreateTestProbe();
(await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>())
.AskNextTo(new ShardingProducerController.MessageWithConfirmation<Job>("entity-1", new Job("msg-1"),
replyProbe.Ref));
await replyProbe.ExpectMsgAsync<Done>();

(await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>())
.AskNextTo(new ShardingProducerController.MessageWithConfirmation<Job>("entity-2", new Job("msg-2"),
replyProbe.Ref));
await replyProbe.ExpectMsgAsync<Done>();
}

private class Consumer : ReceiveActor
{
private readonly TestProbe _consumerProbe;
private readonly IActorRef _consumerController;
private readonly IActorRef _deliveryAdapter;

public Consumer(IActorRef consumerController, TestProbe consumerProbe)
{
_consumerController = consumerController;
_consumerProbe = consumerProbe;

var self = Self;
_deliveryAdapter = Context.ActorOf(
act =>
{
act.Receive<ConsumerController.Delivery<Job>>((delivery, ctx) =>
{
self.Forward(new JobDelivery(delivery.Message, delivery.ConfirmTo, delivery.ProducerId,
delivery.SeqNr));
});
}, "delivery-adapter");

Receive<JobDelivery>(job => { _consumerProbe.Ref.Tell(job); });
}

protected override void PreStart()
{
_consumerController.Tell(new ConsumerController.Start<Job>(_deliveryAdapter));
}
}
}
Loading

0 comments on commit 96cd8db

Please sign in to comment.