Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work on ConsumerController deadlock #7092

Original file line number Diff line number Diff line change
Expand Up @@ -2559,6 +2559,7 @@ namespace Akka.Delivery
public bool OnlyFlowControl { get; set; }
public System.TimeSpan ResendIntervalMax { get; set; }
public System.TimeSpan ResendIntervalMin { get; set; }
public bool RetryConfirmation { get; set; }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { }
public override string ToString() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2557,6 +2557,7 @@ namespace Akka.Delivery
public bool OnlyFlowControl { get; set; }
public System.TimeSpan ResendIntervalMax { get; set; }
public System.TimeSpan ResendIntervalMin { get; set; }
public bool RetryConfirmation { get; set; }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { }
public override string ToString() { }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// -----------------------------------------------------------------------
// <copyright file="ConsumerControllerRetryConfirmSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Delivery;
using Akka.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static Akka.Tests.Delivery.TestConsumer;

namespace Akka.Tests.Delivery;

public class ConsumerControllerRetryConfirmSpecs : TestKit.Xunit2.TestKit
{
public static readonly Config Config = @"
akka.reliable-delivery.consumer-controller {
flow-control-window = 20
resend-interval-min = 500ms
retry-confirmation = true
}";

public ConsumerControllerRetryConfirmSpecs(ITestOutputHelper outputHelper) : base(
Config.WithFallback(TestSerializer.Config).WithFallback(ZeroLengthSerializer.Config), output: outputHelper)
{
}

private int _idCount = 0;
private int NextId() => _idCount++;
private string ProducerId => $"p-{_idCount}";

[Fact]
public void ConsumerController_Settings_confirmation_retry_must_not_be_set_by_default()
{
var config = ConfigurationFactory.Default();
var settings = ConsumerController.Settings.Create(config.GetConfig("akka.reliable-delivery.consumer-controller"));
settings.RetryConfirmation.Should().BeFalse();
}

[Fact]
public void ConsumerController_Settings_confirmation_retry_must_be_set()
{
var settings = ConsumerController.Settings.Create(Sys);
settings.RetryConfirmation.Should().BeTrue();
}

[Fact]
public async Task ConsumerController_must_resend_Delivery_on_confirmation_retry()
{
var id = NextId();
var consumerProbe = CreateTestProbe();
var consumerController = Sys.ActorOf(ConsumerController.Create<Job>(Sys, Option<IActorRef>.None),
$"consumerController-{id}");
var producerControllerProbe = CreateTestProbe();

consumerController.Tell(new ConsumerController.Start<Job>(consumerProbe.Ref));
consumerController.Tell(new ConsumerController.RegisterToProducerController<Job>(producerControllerProbe.Ref));
await producerControllerProbe.ExpectMsgAsync<ProducerController.RegisterConsumer<Job>>();

consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref));

await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>();

// expected resend
await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>(1.5.Seconds());
}

}
23 changes: 23 additions & 0 deletions src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Akka.Delivery.Internal;
using Akka.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static Akka.Tests.Delivery.TestConsumer;
Expand Down Expand Up @@ -672,4 +673,26 @@ public async Task ConsumerController_can_process_zero_length_Chunk()
consumerController.Tell(seqMessages1.First());
(await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<ZeroLengthSerializer.TestMsg>>()).Message.Should().Be(ZeroLengthSerializer.TestMsg.Instance);
}

[Fact]
public async Task ConsumerController_must_not_resend_Delivery()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

{
var id = NextId();
var consumerProbe = CreateTestProbe();
var consumerController = Sys.ActorOf(ConsumerController.Create<Job>(Sys, Option<IActorRef>.None),
$"consumerController-{id}");
var producerControllerProbe = CreateTestProbe();

consumerController.Tell(new ConsumerController.Start<Job>(consumerProbe.Ref));
consumerController.Tell(new ConsumerController.RegisterToProducerController<Job>(producerControllerProbe.Ref));
await producerControllerProbe.ExpectMsgAsync<ProducerController.RegisterConsumer<Job>>();

consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref));

await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>();

// expects no resend
await consumerProbe.ExpectNoMsgAsync(1.5.Seconds());
}

}
6 changes: 6 additions & 0 deletions src/core/Akka/Configuration/akka.conf
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,12 @@ akka {
# kept in memory in the `ProducerController` until they have been
# confirmed, but the drawback is that lost messages will not be delivered.
only-flow-control = false

# When disabled, the <see cref="ConsumerController"/> will discard any <c>Retry</c> messages when it is
# waiting for a message delivery confirmation.
#
# When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types.
retry-confirmation = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RetryConfirmation is false by default

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

work-pulling {
Expand Down
39 changes: 36 additions & 3 deletions src/core/Akka/Delivery/ConsumerController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,26 +229,59 @@ public static Settings Create(ActorSystem actorSystem)

public static Settings Create(Config config)
{
return new Settings(config.GetInt("flow-control-window"), config.GetTimeSpan("resend-interval-min"),
config.GetTimeSpan("resend-interval-max"), config.GetBoolean("only-flow-control"));
return new Settings(
flowControlWindow: config.GetInt("flow-control-window"),
resendIntervalMin: config.GetTimeSpan("resend-interval-min"),
resendIntervalMax: config.GetTimeSpan("resend-interval-max"),
onlyFlowControl: config.GetBoolean("only-flow-control"),
retryConfirmation: config.GetBoolean("retry-confirmation"));
}

private Settings(int flowControlWindow, TimeSpan resendIntervalMin, TimeSpan resendIntervalMax,
bool onlyFlowControl)
bool onlyFlowControl, bool retryConfirmation)
{
FlowControlWindow = flowControlWindow;
ResendIntervalMin = resendIntervalMin;
ResendIntervalMax = resendIntervalMax;
OnlyFlowControl = onlyFlowControl;
RetryConfirmation = retryConfirmation;
}

/// <summary>
/// Number of messages in flight between <see cref="ProducerController"/> and <see cref="ConsumerController"/>.
///
/// The <see cref="ConsumerController"/> requests for more message when half of the window has been used.
/// </summary>
public int FlowControlWindow { get; init; }

/// <summary>
/// The ConsumerController resends flow control messages to the ProducerController with the <see cref="ResendIntervalMin"/>,
/// and increasing it gradually to <see cref="ResendIntervalMax"/> when idle.
/// </summary>
public TimeSpan ResendIntervalMin { get; init; }

/// <summary>
/// The ConsumerController resends flow control messages to the ProducerController with the <see cref="ResendIntervalMin"/>,
/// and increasing it gradually to <see cref="ResendIntervalMax"/> when idle.
/// </summary>
public TimeSpan ResendIntervalMax { get; init; }

/// <summary>
/// If this is enabled lost messages will not be resent, but flow control is used.
///
/// This can be more efficient since messages don't have to be kept in memory in the
/// <see cref="ProducerController"/> until they have been confirmed, but the drawback is that lost messages
/// will not be delivered.
/// </summary>
public bool OnlyFlowControl { get; init; }

/// <summary>
/// When disabled, the <see cref="ConsumerController"/> will discard any <c>Retry</c> messages when it is
/// waiting for a message delivery confirmation.
///
/// When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types.
/// </summary>
public bool RetryConfirmation { get; init; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new RetryConfirmation settings

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


public override string ToString()
{
Expand Down
11 changes: 11 additions & 0 deletions src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,17 @@ long ComputeNextSeqNr()
Receive<Retry>(_ =>
{
// no retries when WaitingForConfirmation, will be performed from (idle) active
if (!Settings.RetryConfirmation)
return;
Comment on lines +389 to +390
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only retry the delivery if RetryConfirmation is set to true


_log.Debug("Consumer received Retry while waiting for confirmation.");
ReceiveRetry(() =>
{
_log.Debug("Retry sending SequencedMessage [{0}].", sequencedMessage.SeqNr);
CurrentState.Consumer.Tell(new Delivery<T>(sequencedMessage.Message.Message!, Context.Self,
sequencedMessage.ProducerId, sequencedMessage.SeqNr));
Become(() => WaitingForConfirmation(sequencedMessage));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do a Become again here? Aren't we already in this state?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no harm in doing it, we might as well have the code to explicitly show which state we're transitioning to.

});
Comment on lines +393 to +399
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry logic

});

Receive<ConsumerController.Start<T>>(start =>
Expand Down
Loading