From af961f84c63341ff9101d3300eeaabaf7749c197 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 29 Feb 2024 23:45:00 +0700 Subject: [PATCH] Revert "Work on `ConsumerController` deadlock (#7092)" (#7107) This reverts commit 1c8ffc3a5959c3f0b9f99dbd3dbb31dcf823a42a. --- ...oreAPISpec.ApproveCore.DotNet.verified.txt | 1 - .../CoreAPISpec.ApproveCore.Net.verified.txt | 1 - .../ConsumerControllerRetryConfirmSpecs.cs | 75 ------------------- .../Delivery/ConsumerControllerSpecs.cs | 23 ------ src/core/Akka/Configuration/akka.conf | 6 -- src/core/Akka/Delivery/ConsumerController.cs | 39 +--------- .../Internal/ConsumerControllerImpl.cs | 11 --- 7 files changed, 3 insertions(+), 153 deletions(-) delete mode 100644 src/core/Akka.Tests/Delivery/ConsumerControllerRetryConfirmSpecs.cs diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 4bd36b8e712..6a0f57ea371 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -2559,7 +2559,6 @@ namespace Akka.Delivery public bool OnlyFlowControl { get; set; } public System.TimeSpan ResendIntervalMax { get; set; } public System.TimeSpan ResendIntervalMin { get; set; } - public bool RetryConfirmation { get; set; } public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { } public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { } public override string ToString() { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 55a5ef3780b..88832976eb4 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -2557,7 +2557,6 @@ namespace Akka.Delivery public bool OnlyFlowControl { get; set; } public System.TimeSpan ResendIntervalMax { get; set; } public System.TimeSpan ResendIntervalMin { get; set; } - public bool RetryConfirmation { get; set; } public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { } public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { } public override string ToString() { } diff --git a/src/core/Akka.Tests/Delivery/ConsumerControllerRetryConfirmSpecs.cs b/src/core/Akka.Tests/Delivery/ConsumerControllerRetryConfirmSpecs.cs deleted file mode 100644 index 74d6e821cc9..00000000000 --- a/src/core/Akka.Tests/Delivery/ConsumerControllerRetryConfirmSpecs.cs +++ /dev/null @@ -1,75 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2009-2024 Lightbend Inc. -// Copyright (C) 2013-2024 .NET Foundation -// -// ----------------------------------------------------------------------- - -using System.Threading.Tasks; -using Akka.Actor; -using Akka.Configuration; -using Akka.Delivery; -using Akka.Util; -using FluentAssertions; -using FluentAssertions.Extensions; -using Xunit; -using Xunit.Abstractions; -using static Akka.Tests.Delivery.TestConsumer; - -namespace Akka.Tests.Delivery; - -public class ConsumerControllerRetryConfirmSpecs : TestKit.Xunit2.TestKit -{ - public static readonly Config Config = @" - akka.reliable-delivery.consumer-controller { - flow-control-window = 20 - resend-interval-min = 500ms - retry-confirmation = true - }"; - - public ConsumerControllerRetryConfirmSpecs(ITestOutputHelper outputHelper) : base( - Config.WithFallback(TestSerializer.Config).WithFallback(ZeroLengthSerializer.Config), output: outputHelper) - { - } - - private int _idCount = 0; - private int NextId() => _idCount++; - private string ProducerId => $"p-{_idCount}"; - - [Fact] - public void ConsumerController_Settings_confirmation_retry_must_not_be_set_by_default() - { - var config = ConfigurationFactory.Default(); - var settings = ConsumerController.Settings.Create(config.GetConfig("akka.reliable-delivery.consumer-controller")); - settings.RetryConfirmation.Should().BeFalse(); - } - - [Fact] - public void ConsumerController_Settings_confirmation_retry_must_be_set() - { - var settings = ConsumerController.Settings.Create(Sys); - settings.RetryConfirmation.Should().BeTrue(); - } - - [Fact] - public async Task ConsumerController_must_resend_Delivery_on_confirmation_retry() - { - var id = NextId(); - var consumerProbe = CreateTestProbe(); - var consumerController = Sys.ActorOf(ConsumerController.Create(Sys, Option.None), - $"consumerController-{id}"); - var producerControllerProbe = CreateTestProbe(); - - consumerController.Tell(new ConsumerController.Start(consumerProbe.Ref)); - consumerController.Tell(new ConsumerController.RegisterToProducerController(producerControllerProbe.Ref)); - await producerControllerProbe.ExpectMsgAsync>(); - - consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref)); - - await consumerProbe.ExpectMsgAsync>(); - - // expected resend - await consumerProbe.ExpectMsgAsync>(1.5.Seconds()); - } - -} \ No newline at end of file diff --git a/src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs b/src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs index d407942a449..4ca05b86d06 100644 --- a/src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs +++ b/src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs @@ -7,7 +7,6 @@ using Akka.Delivery.Internal; using Akka.Util; using FluentAssertions; -using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; using static Akka.Tests.Delivery.TestConsumer; @@ -673,26 +672,4 @@ public async Task ConsumerController_can_process_zero_length_Chunk() consumerController.Tell(seqMessages1.First()); (await consumerProbe.ExpectMsgAsync>()).Message.Should().Be(ZeroLengthSerializer.TestMsg.Instance); } - - [Fact] - public async Task ConsumerController_must_not_resend_Delivery() - { - var id = NextId(); - var consumerProbe = CreateTestProbe(); - var consumerController = Sys.ActorOf(ConsumerController.Create(Sys, Option.None), - $"consumerController-{id}"); - var producerControllerProbe = CreateTestProbe(); - - consumerController.Tell(new ConsumerController.Start(consumerProbe.Ref)); - consumerController.Tell(new ConsumerController.RegisterToProducerController(producerControllerProbe.Ref)); - await producerControllerProbe.ExpectMsgAsync>(); - - consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref)); - - await consumerProbe.ExpectMsgAsync>(); - - // expects no resend - await consumerProbe.ExpectNoMsgAsync(1.5.Seconds()); - } - } \ No newline at end of file diff --git a/src/core/Akka/Configuration/akka.conf b/src/core/Akka/Configuration/akka.conf index fb95fb6612e..e4594491f39 100644 --- a/src/core/Akka/Configuration/akka.conf +++ b/src/core/Akka/Configuration/akka.conf @@ -664,12 +664,6 @@ akka { # kept in memory in the `ProducerController` until they have been # confirmed, but the drawback is that lost messages will not be delivered. only-flow-control = false - - # When disabled, the will discard any Retry messages when it is - # waiting for a message delivery confirmation. - # - # When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types. - retry-confirmation = false } work-pulling { diff --git a/src/core/Akka/Delivery/ConsumerController.cs b/src/core/Akka/Delivery/ConsumerController.cs index 6912dc26858..415c3349643 100644 --- a/src/core/Akka/Delivery/ConsumerController.cs +++ b/src/core/Akka/Delivery/ConsumerController.cs @@ -229,59 +229,26 @@ public static Settings Create(ActorSystem actorSystem) public static Settings Create(Config config) { - return new Settings( - flowControlWindow: config.GetInt("flow-control-window"), - resendIntervalMin: config.GetTimeSpan("resend-interval-min"), - resendIntervalMax: config.GetTimeSpan("resend-interval-max"), - onlyFlowControl: config.GetBoolean("only-flow-control"), - retryConfirmation: config.GetBoolean("retry-confirmation")); + return new Settings(config.GetInt("flow-control-window"), config.GetTimeSpan("resend-interval-min"), + config.GetTimeSpan("resend-interval-max"), config.GetBoolean("only-flow-control")); } private Settings(int flowControlWindow, TimeSpan resendIntervalMin, TimeSpan resendIntervalMax, - bool onlyFlowControl, bool retryConfirmation) + bool onlyFlowControl) { FlowControlWindow = flowControlWindow; ResendIntervalMin = resendIntervalMin; ResendIntervalMax = resendIntervalMax; OnlyFlowControl = onlyFlowControl; - RetryConfirmation = retryConfirmation; } - /// - /// Number of messages in flight between and . - /// - /// The requests for more message when half of the window has been used. - /// public int FlowControlWindow { get; init; } - /// - /// The ConsumerController resends flow control messages to the ProducerController with the , - /// and increasing it gradually to when idle. - /// public TimeSpan ResendIntervalMin { get; init; } - /// - /// The ConsumerController resends flow control messages to the ProducerController with the , - /// and increasing it gradually to when idle. - /// public TimeSpan ResendIntervalMax { get; init; } - /// - /// If this is enabled lost messages will not be resent, but flow control is used. - /// - /// This can be more efficient since messages don't have to be kept in memory in the - /// until they have been confirmed, but the drawback is that lost messages - /// will not be delivered. - /// public bool OnlyFlowControl { get; init; } - - /// - /// When disabled, the will discard any Retry messages when it is - /// waiting for a message delivery confirmation. - /// - /// When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types. - /// - public bool RetryConfirmation { get; init; } public override string ToString() { diff --git a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs index 5eeafbfe3d5..94db04dc68c 100644 --- a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs @@ -386,17 +386,6 @@ long ComputeNextSeqNr() Receive(_ => { // no retries when WaitingForConfirmation, will be performed from (idle) active - if (!Settings.RetryConfirmation) - return; - - _log.Debug("Consumer received Retry while waiting for confirmation."); - ReceiveRetry(() => - { - _log.Debug("Retry sending SequencedMessage [{0}].", sequencedMessage.SeqNr); - CurrentState.Consumer.Tell(new Delivery(sequencedMessage.Message.Message!, Context.Self, - sequencedMessage.ProducerId, sequencedMessage.SeqNr)); - Become(() => WaitingForConfirmation(sequencedMessage)); - }); }); Receive>(start =>