Skip to content

Commit

Permalink
[BACKPORT #6374] Fix PersistenceIdsPublisher hung on failure messages (
Browse files Browse the repository at this point in the history
…#6375)

* Fix PersistenceIdsPublisher hung on failure messages
* Downgrade failure messages from Warning to Info
* Update API Verify list

(Cherry-picked from d3b89da)
  • Loading branch information
Arkatufus authored Jan 30, 2023
1 parent 8a2afc2 commit 7cee0ed
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Streams.Actors;

Expand All @@ -22,26 +23,32 @@ public static Props Props(string writeJournalPluginId)
private readonly IActorRef _journalRef;

private readonly DeliveryBuffer<string> _buffer;
private readonly ILoggingAdapter _log;

public IStash Stash { get; set; }

public CurrentPersistenceIdsPublisher(string writeJournalPluginId)
{
_buffer = new DeliveryBuffer<string>(OnNext);
_journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
_log = Context.GetLogger();
}

protected override bool Receive(object message)
{
switch (message)
{
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Initializing);
_journalRef
.Ask<CurrentPersistenceIds>(new SelectCurrentPersistenceIds(0, Self))
.PipeTo(Self);
return true;

case Cancel _:
Context.Stop(Self);
return true;

default:
return false;
}
Expand All @@ -64,9 +71,26 @@ private bool Initializing(object message)
Become(Active);
Stash.UnstashAll();
return true;

case Cancel _:
Context.Stop(Self);
return true;

case Status.Failure msg:
if (msg.Cause is AskTimeoutException e)
{
_log.Info(e, "Current persistence id query timed out, retrying");
}
else
{
_log.Info(msg.Cause, "Current persistence id query failed, retrying");
}

_journalRef
.Ask<CurrentPersistenceIds>(new SelectCurrentPersistenceIds(0, Self))
.PipeTo(Self);
return true;

default:
Stash.Stash();
return true;
Expand All @@ -77,14 +101,20 @@ private bool Active(object message)
{
switch (message)
{
case CurrentPersistenceIds _:
// Ignore duplicate CurrentPersistenceIds response
return true;

case Request _:
_buffer.DeliverBuffer(TotalDemand);
if (_buffer.IsEmpty)
OnCompleteThenStop();
return true;

case Cancel _:
Context.Stop(Self);
return true;

default:
return false;
}
Expand All @@ -93,7 +123,7 @@ private bool Active(object message)

internal sealed class LivePersistenceIdsPublisher : ActorPublisher<string>, IWithUnboundedStash
{
private class Continue
private sealed class Continue
{
public static readonly Continue Instance = new Continue();

Expand All @@ -109,11 +139,13 @@ public static Props Props(TimeSpan refreshInterval, string writeJournalPluginId)
private readonly ICancelable _tickCancelable;
private readonly IActorRef _journalRef;
private readonly DeliveryBuffer<string> _buffer;
private readonly ILoggingAdapter _log;

public IStash Stash { get; set; }

public LivePersistenceIdsPublisher(TimeSpan refreshInterval, string writeJournalPluginId)
{
_log = Context.GetLogger();
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
refreshInterval,
refreshInterval,
Expand All @@ -135,14 +167,19 @@ protected override bool Receive(object message)
switch (message)
{
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Waiting);
_journalRef
.Ask<CurrentPersistenceIds>(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self))
.PipeTo(Self);
return true;

case Continue _:
return true;

case Cancel _:
Context.Stop(Self);
return true;

default:
return false;
}
Expand All @@ -160,11 +197,28 @@ private bool Waiting(object message)
Become(Active);
Stash.UnstashAll();
return true;

case Continue _:
return true;

case Cancel _:
Context.Stop(Self);
return true;

case Status.Failure msg:
if (msg.Cause is AskTimeoutException e)
{
_log.Info(e, $"Current persistence id query timed out, retrying. Offset: {_lastOrderingOffset}");
}
else
{
_log.Info(msg.Cause, $"Current persistence id query failed, retrying. Offset: {_lastOrderingOffset}");
}

Become(Active);
Stash.UnstashAll();
return true;

default:
Stash.Stash();
return true;
Expand All @@ -175,16 +229,29 @@ private bool Active(object message)
{
switch (message)
{
case CurrentPersistenceIds _:
// Ignore duplicate CurrentPersistenceIds response
return true;

case Request _:
_buffer.DeliverBuffer(TotalDemand);
return true;

case Continue _:
_journalRef.Tell(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self));
Become(Waiting);
_journalRef
.Ask<CurrentPersistenceIds>(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self))
.PipeTo(Self);
return true;

case Cancel _:
Context.Stop(Self);
return true;

case Status.Failure msg:
_log.Info(msg.Cause, "Unexpected failure received");
return true;

default:
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using Reactive.Streams;
using Akka.Actor;
using Akka.Configuration;
Expand All @@ -26,7 +25,7 @@ public class SqlReadJournal :
IAllEventsQuery,
ICurrentAllEventsQuery
{
public static string Identifier = "akka.persistence.query.journal.sql";
public const string Identifier = "akka.persistence.query.journal.sql";

/// <summary>
/// Returns a default query configuration for akka persistence SQLite-based journals and snapshot stores.
Expand All @@ -52,7 +51,6 @@ public SqlReadJournal(ExtendedActorSystem system, Config config)
_maxBufferSize = config.GetInt("max-buffer-size", 0);
_system = system;

_lock = new ReaderWriterLockSlim();
_persistenceIdsPublisher = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Akka.Persistence.Query.Sql
{
public class SqlReadJournal : Akka.Persistence.Query.IAllEventsQuery, Akka.Persistence.Query.ICurrentAllEventsQuery, Akka.Persistence.Query.ICurrentEventsByPersistenceIdQuery, Akka.Persistence.Query.ICurrentEventsByTagQuery, Akka.Persistence.Query.ICurrentPersistenceIdsQuery, Akka.Persistence.Query.IEventsByPersistenceIdQuery, Akka.Persistence.Query.IEventsByTagQuery, Akka.Persistence.Query.IPersistenceIdsQuery, Akka.Persistence.Query.IReadJournal
{
public static string Identifier;
public const string Identifier = "akka.persistence.query.journal.sql";
public SqlReadJournal(Akka.Actor.ExtendedActorSystem system, Akka.Configuration.Config config) { }
public Akka.Streams.Dsl.Source<Akka.Persistence.Query.EventEnvelope, Akka.NotUsed> AllEvents(Akka.Persistence.Query.Offset offset = null) { }
public Akka.Streams.Dsl.Source<Akka.Persistence.Query.EventEnvelope, Akka.NotUsed> CurrentAllEvents(Akka.Persistence.Query.Offset offset) { }
Expand Down

0 comments on commit 7cee0ed

Please sign in to comment.