Skip to content

Commit

Permalink
Revert "Work on ConsumerController deadlock (#7092)" (#7107)
Browse files Browse the repository at this point in the history
This reverts commit 1c8ffc3.
  • Loading branch information
Arkatufus authored Feb 29, 2024
1 parent 1a9c209 commit af961f8
Show file tree
Hide file tree
Showing 7 changed files with 3 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2559,7 +2559,6 @@ namespace Akka.Delivery
public bool OnlyFlowControl { get; set; }
public System.TimeSpan ResendIntervalMax { get; set; }
public System.TimeSpan ResendIntervalMin { get; set; }
public bool RetryConfirmation { get; set; }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { }
public override string ToString() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2557,7 +2557,6 @@ namespace Akka.Delivery
public bool OnlyFlowControl { get; set; }
public System.TimeSpan ResendIntervalMax { get; set; }
public System.TimeSpan ResendIntervalMin { get; set; }
public bool RetryConfirmation { get; set; }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { }
public override string ToString() { }
Expand Down

This file was deleted.

23 changes: 0 additions & 23 deletions src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Akka.Delivery.Internal;
using Akka.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static Akka.Tests.Delivery.TestConsumer;
Expand Down Expand Up @@ -673,26 +672,4 @@ public async Task ConsumerController_can_process_zero_length_Chunk()
consumerController.Tell(seqMessages1.First());
(await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<ZeroLengthSerializer.TestMsg>>()).Message.Should().Be(ZeroLengthSerializer.TestMsg.Instance);
}

[Fact]
public async Task ConsumerController_must_not_resend_Delivery()
{
var id = NextId();
var consumerProbe = CreateTestProbe();
var consumerController = Sys.ActorOf(ConsumerController.Create<Job>(Sys, Option<IActorRef>.None),
$"consumerController-{id}");
var producerControllerProbe = CreateTestProbe();

consumerController.Tell(new ConsumerController.Start<Job>(consumerProbe.Ref));
consumerController.Tell(new ConsumerController.RegisterToProducerController<Job>(producerControllerProbe.Ref));
await producerControllerProbe.ExpectMsgAsync<ProducerController.RegisterConsumer<Job>>();

consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref));

await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>();

// expects no resend
await consumerProbe.ExpectNoMsgAsync(1.5.Seconds());
}

}
6 changes: 0 additions & 6 deletions src/core/Akka/Configuration/akka.conf
Original file line number Diff line number Diff line change
Expand Up @@ -664,12 +664,6 @@ akka {
# kept in memory in the `ProducerController` until they have been
# confirmed, but the drawback is that lost messages will not be delivered.
only-flow-control = false

# When disabled, the <see cref="ConsumerController"/> will discard any <c>Retry</c> messages when it is
# waiting for a message delivery confirmation.
#
# When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types.
retry-confirmation = false
}

work-pulling {
Expand Down
39 changes: 3 additions & 36 deletions src/core/Akka/Delivery/ConsumerController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,59 +229,26 @@ public static Settings Create(ActorSystem actorSystem)

public static Settings Create(Config config)
{
return new Settings(
flowControlWindow: config.GetInt("flow-control-window"),
resendIntervalMin: config.GetTimeSpan("resend-interval-min"),
resendIntervalMax: config.GetTimeSpan("resend-interval-max"),
onlyFlowControl: config.GetBoolean("only-flow-control"),
retryConfirmation: config.GetBoolean("retry-confirmation"));
return new Settings(config.GetInt("flow-control-window"), config.GetTimeSpan("resend-interval-min"),
config.GetTimeSpan("resend-interval-max"), config.GetBoolean("only-flow-control"));
}

private Settings(int flowControlWindow, TimeSpan resendIntervalMin, TimeSpan resendIntervalMax,
bool onlyFlowControl, bool retryConfirmation)
bool onlyFlowControl)
{
FlowControlWindow = flowControlWindow;
ResendIntervalMin = resendIntervalMin;
ResendIntervalMax = resendIntervalMax;
OnlyFlowControl = onlyFlowControl;
RetryConfirmation = retryConfirmation;
}

/// <summary>
/// Number of messages in flight between <see cref="ProducerController"/> and <see cref="ConsumerController"/>.
///
/// The <see cref="ConsumerController"/> requests for more message when half of the window has been used.
/// </summary>
public int FlowControlWindow { get; init; }

/// <summary>
/// The ConsumerController resends flow control messages to the ProducerController with the <see cref="ResendIntervalMin"/>,
/// and increasing it gradually to <see cref="ResendIntervalMax"/> when idle.
/// </summary>
public TimeSpan ResendIntervalMin { get; init; }

/// <summary>
/// The ConsumerController resends flow control messages to the ProducerController with the <see cref="ResendIntervalMin"/>,
/// and increasing it gradually to <see cref="ResendIntervalMax"/> when idle.
/// </summary>
public TimeSpan ResendIntervalMax { get; init; }

/// <summary>
/// If this is enabled lost messages will not be resent, but flow control is used.
///
/// This can be more efficient since messages don't have to be kept in memory in the
/// <see cref="ProducerController"/> until they have been confirmed, but the drawback is that lost messages
/// will not be delivered.
/// </summary>
public bool OnlyFlowControl { get; init; }

/// <summary>
/// When disabled, the <see cref="ConsumerController"/> will discard any <c>Retry</c> messages when it is
/// waiting for a message delivery confirmation.
///
/// When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types.
/// </summary>
public bool RetryConfirmation { get; init; }

public override string ToString()
{
Expand Down
11 changes: 0 additions & 11 deletions src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -386,17 +386,6 @@ long ComputeNextSeqNr()
Receive<Retry>(_ =>
{
// no retries when WaitingForConfirmation, will be performed from (idle) active
if (!Settings.RetryConfirmation)
return;

_log.Debug("Consumer received Retry while waiting for confirmation.");
ReceiveRetry(() =>
{
_log.Debug("Retry sending SequencedMessage [{0}].", sequencedMessage.SeqNr);
CurrentState.Consumer.Tell(new Delivery<T>(sequencedMessage.Message.Message!, Context.Self,
sequencedMessage.ProducerId, sequencedMessage.SeqNr));
Become(() => WaitingForConfirmation(sequencedMessage));
});
});

Receive<ConsumerController.Start<T>>(start =>
Expand Down

0 comments on commit af961f8

Please sign in to comment.