-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Work on ConsumerController
deadlock
#7092
Changes from 8 commits
cf56ea3
d60bd94
2f65374
6641e9a
502ab48
3559587
7f7a5e5
8347acd
ff814f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
// ----------------------------------------------------------------------- | ||
// <copyright file="ConsumerControllerRetryConfirmSpecs.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
// ----------------------------------------------------------------------- | ||
|
||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Configuration; | ||
using Akka.Delivery; | ||
using Akka.Util; | ||
using FluentAssertions; | ||
using FluentAssertions.Extensions; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
using static Akka.Tests.Delivery.TestConsumer; | ||
|
||
namespace Akka.Tests.Delivery; | ||
|
||
public class ConsumerControllerRetryConfirmSpecs : TestKit.Xunit2.TestKit | ||
{ | ||
public static readonly Config Config = @" | ||
akka.reliable-delivery.consumer-controller { | ||
flow-control-window = 20 | ||
resend-interval-min = 1s | ||
retry-confirmation = true | ||
}"; | ||
|
||
public ConsumerControllerRetryConfirmSpecs(ITestOutputHelper outputHelper) : base( | ||
Config.WithFallback(TestSerializer.Config).WithFallback(ZeroLengthSerializer.Config), output: outputHelper) | ||
{ | ||
} | ||
|
||
private int _idCount = 0; | ||
private int NextId() => _idCount++; | ||
private string ProducerId => $"p-{_idCount}"; | ||
|
||
[Fact] | ||
public void ConsumerController_Settings_confirmation_retry_must_not_be_set_by_default() | ||
{ | ||
var config = ConfigurationFactory.Default(); | ||
var settings = ConsumerController.Settings.Create(config.GetConfig("akka.reliable-delivery.consumer-controller")); | ||
settings.RetryConfirmation.Should().BeFalse(); | ||
} | ||
|
||
[Fact] | ||
public void ConsumerController_Settings_confirmation_retry_must_be_set() | ||
{ | ||
var settings = ConsumerController.Settings.Create(Sys); | ||
settings.RetryConfirmation.Should().BeTrue(); | ||
} | ||
|
||
[Fact] | ||
public async Task ConsumerController_must_resend_Delivery_on_confirmation_retry() | ||
{ | ||
var id = NextId(); | ||
var consumerProbe = CreateTestProbe(); | ||
var consumerController = Sys.ActorOf(ConsumerController.Create<Job>(Sys, Option<IActorRef>.None), | ||
$"consumerController-{id}"); | ||
var producerControllerProbe = CreateTestProbe(); | ||
|
||
consumerController.Tell(new ConsumerController.Start<Job>(consumerProbe.Ref)); | ||
consumerController.Tell(new ConsumerController.RegisterToProducerController<Job>(producerControllerProbe.Ref)); | ||
await producerControllerProbe.ExpectMsgAsync<ProducerController.RegisterConsumer<Job>>(); | ||
|
||
consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref)); | ||
|
||
await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>(); | ||
|
||
// expected resend | ||
await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>(1.5.Seconds()); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
using Akka.Delivery.Internal; | ||
using Akka.Util; | ||
using FluentAssertions; | ||
using FluentAssertions.Extensions; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
using static Akka.Tests.Delivery.TestConsumer; | ||
|
@@ -672,4 +673,26 @@ public async Task ConsumerController_can_process_zero_length_Chunk() | |
consumerController.Tell(seqMessages1.First()); | ||
(await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<ZeroLengthSerializer.TestMsg>>()).Message.Should().Be(ZeroLengthSerializer.TestMsg.Instance); | ||
} | ||
|
||
[Fact] | ||
public async Task ConsumerController_must_not_resend_Delivery() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM |
||
{ | ||
var id = NextId(); | ||
var consumerProbe = CreateTestProbe(); | ||
var consumerController = Sys.ActorOf(ConsumerController.Create<Job>(Sys, Option<IActorRef>.None), | ||
$"consumerController-{id}"); | ||
var producerControllerProbe = CreateTestProbe(); | ||
|
||
consumerController.Tell(new ConsumerController.Start<Job>(consumerProbe.Ref)); | ||
consumerController.Tell(new ConsumerController.RegisterToProducerController<Job>(producerControllerProbe.Ref)); | ||
await producerControllerProbe.ExpectMsgAsync<ProducerController.RegisterConsumer<Job>>(); | ||
|
||
consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref)); | ||
|
||
await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>(); | ||
|
||
// expects no resend | ||
await consumerProbe.ExpectNoMsgAsync(1.5.Seconds()); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -664,6 +664,12 @@ akka { | |
# kept in memory in the `ProducerController` until they have been | ||
# confirmed, but the drawback is that lost messages will not be delivered. | ||
only-flow-control = false | ||
|
||
# When disabled, the <see cref="ConsumerController"/> will discard any <c>Retry</c> messages when it is | ||
# waiting for a message delivery confirmation. | ||
# | ||
# When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types. | ||
retry-confirmation = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM |
||
} | ||
|
||
work-pulling { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -229,26 +229,59 @@ public static Settings Create(ActorSystem actorSystem) | |
|
||
public static Settings Create(Config config) | ||
{ | ||
return new Settings(config.GetInt("flow-control-window"), config.GetTimeSpan("resend-interval-min"), | ||
config.GetTimeSpan("resend-interval-max"), config.GetBoolean("only-flow-control")); | ||
return new Settings( | ||
flowControlWindow: config.GetInt("flow-control-window"), | ||
resendIntervalMin: config.GetTimeSpan("resend-interval-min"), | ||
resendIntervalMax: config.GetTimeSpan("resend-interval-max"), | ||
onlyFlowControl: config.GetBoolean("only-flow-control"), | ||
retryConfirmation: config.GetBoolean("retry-confirmation")); | ||
} | ||
|
||
private Settings(int flowControlWindow, TimeSpan resendIntervalMin, TimeSpan resendIntervalMax, | ||
bool onlyFlowControl) | ||
bool onlyFlowControl, bool retryConfirmation) | ||
{ | ||
FlowControlWindow = flowControlWindow; | ||
ResendIntervalMin = resendIntervalMin; | ||
ResendIntervalMax = resendIntervalMax; | ||
OnlyFlowControl = onlyFlowControl; | ||
RetryConfirmation = retryConfirmation; | ||
} | ||
|
||
/// <summary> | ||
/// Number of messages in flight between <see cref="ProducerController"/> and <see cref="ConsumerController"/>. | ||
/// | ||
/// The <see cref="ConsumerController"/> requests for more message when half of the window has been used. | ||
/// </summary> | ||
public int FlowControlWindow { get; init; } | ||
|
||
/// <summary> | ||
/// The ConsumerController resends flow control messages to the ProducerController with the <see cref="ResendIntervalMin"/>, | ||
/// and increasing it gradually to <see cref="ResendIntervalMax"/> when idle. | ||
/// </summary> | ||
public TimeSpan ResendIntervalMin { get; init; } | ||
|
||
/// <summary> | ||
/// The ConsumerController resends flow control messages to the ProducerController with the <see cref="ResendIntervalMin"/>, | ||
/// and increasing it gradually to <see cref="ResendIntervalMax"/> when idle. | ||
/// </summary> | ||
public TimeSpan ResendIntervalMax { get; init; } | ||
|
||
/// <summary> | ||
/// If this is enabled lost messages will not be resent, but flow control is used. | ||
/// | ||
/// This can be more efficient since messages don't have to be kept in memory in the | ||
/// <see cref="ProducerController"/> until they have been confirmed, but the drawback is that lost messages | ||
/// will not be delivered. | ||
/// </summary> | ||
public bool OnlyFlowControl { get; init; } | ||
|
||
/// <summary> | ||
/// When disabled, the <see cref="ConsumerController"/> will discard any <c>Retry</c> messages when it is | ||
/// waiting for a message delivery confirmation. | ||
/// | ||
/// When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types. | ||
/// </summary> | ||
public bool RetryConfirmation { get; init; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM |
||
|
||
public override string ToString() | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -386,6 +386,17 @@ long ComputeNextSeqNr() | |
Receive<Retry>(_ => | ||
{ | ||
// no retries when WaitingForConfirmation, will be performed from (idle) active | ||
if (!Settings.RetryConfirmation) | ||
return; | ||
Comment on lines
+389
to
+390
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only retry the delivery if |
||
|
||
_log.Debug("Consumer received Retry while waiting for confirmation."); | ||
ReceiveRetry(() => | ||
{ | ||
_log.Debug("Retry sending SequencedMessage [{0}].", sequencedMessage.SeqNr); | ||
CurrentState.Consumer.Tell(new Delivery<T>(sequencedMessage.Message.Message!, Context.Self, | ||
sequencedMessage.ProducerId, sequencedMessage.SeqNr)); | ||
Become(() => WaitingForConfirmation(sequencedMessage)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to do a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no harm in doing it, we might as well have the code to explicitly show which state we're transitioning to. |
||
}); | ||
Comment on lines
+393
to
+399
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Retry logic |
||
}); | ||
|
||
Receive<ConsumerController.Start<T>>(start => | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want to lower this given that you're waiting for up to 1.5 seconds - just to provide a bigger window in the event of racy tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will do