From c96094fe83a9bfd5b642b86f28284d11078ab905 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 31 Jan 2023 00:01:50 +0700 Subject: [PATCH] [BACKPORT #6374] Fix PersistenceIdsPublisher hung on failure messages * Fix PersistenceIdsPublisher hung on failure messages * Downgrade failure messages from Warning to Info * Update API Verify list (Cherry-picked from d3b89da6faaf06b394382aad78e475982cded23f) --- .../AllPersistenceIdsPublisher.cs | 75 ++++++++++++++++++- .../SqlReadJournal.cs | 4 +- ...rovePersistenceSqlCommonQuery.verified.txt | 2 +- 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllPersistenceIdsPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllPersistenceIdsPublisher.cs index d4cf033bebc..3d19afca79f 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllPersistenceIdsPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllPersistenceIdsPublisher.cs @@ -7,6 +7,7 @@ using System; using Akka.Actor; +using Akka.Event; using Akka.Persistence.Sql.Common.Journal; using Akka.Streams.Actors; @@ -22,6 +23,7 @@ public static Props Props(string writeJournalPluginId) private readonly IActorRef _journalRef; private readonly DeliveryBuffer _buffer; + private readonly ILoggingAdapter _log; public IStash Stash { get; set; } @@ -29,6 +31,7 @@ public CurrentPersistenceIdsPublisher(string writeJournalPluginId) { _buffer = new DeliveryBuffer(OnNext); _journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); + _log = Context.GetLogger(); } protected override bool Receive(object message) @@ -36,12 +39,16 @@ protected override bool Receive(object message) switch (message) { case Request _: - _journalRef.Tell(new SelectCurrentPersistenceIds(0, Self)); Become(Initializing); + _journalRef + .Ask(new SelectCurrentPersistenceIds(0, Self)) + .PipeTo(Self); return true; + case Cancel _: Context.Stop(Self); return true; + default: return false; } @@ -64,9 +71,26 @@ private bool Initializing(object message) Become(Active); Stash.UnstashAll(); return true; + case Cancel _: Context.Stop(Self); return true; + + case Status.Failure msg: + if (msg.Cause is AskTimeoutException e) + { + _log.Info(e, "Current persistence id query timed out, retrying"); + } + else + { + _log.Info(msg.Cause, "Current persistence id query failed, retrying"); + } + + _journalRef + .Ask(new SelectCurrentPersistenceIds(0, Self)) + .PipeTo(Self); + return true; + default: Stash.Stash(); return true; @@ -77,14 +101,20 @@ private bool Active(object message) { switch (message) { + case CurrentPersistenceIds _: + // Ignore duplicate CurrentPersistenceIds response + return true; + case Request _: _buffer.DeliverBuffer(TotalDemand); if (_buffer.IsEmpty) OnCompleteThenStop(); return true; + case Cancel _: Context.Stop(Self); return true; + default: return false; } @@ -93,7 +123,7 @@ private bool Active(object message) internal sealed class LivePersistenceIdsPublisher : ActorPublisher, IWithUnboundedStash { - private class Continue + private sealed class Continue { public static readonly Continue Instance = new Continue(); @@ -109,11 +139,13 @@ public static Props Props(TimeSpan refreshInterval, string writeJournalPluginId) private readonly ICancelable _tickCancelable; private readonly IActorRef _journalRef; private readonly DeliveryBuffer _buffer; + private readonly ILoggingAdapter _log; public IStash Stash { get; set; } public LivePersistenceIdsPublisher(TimeSpan refreshInterval, string writeJournalPluginId) { + _log = Context.GetLogger(); _tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( refreshInterval, refreshInterval, @@ -135,14 +167,19 @@ protected override bool Receive(object message) switch (message) { case Request _: - _journalRef.Tell(new SelectCurrentPersistenceIds(0, Self)); Become(Waiting); + _journalRef + .Ask(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self)) + .PipeTo(Self); return true; + case Continue _: return true; + case Cancel _: Context.Stop(Self); return true; + default: return false; } @@ -160,11 +197,28 @@ private bool Waiting(object message) Become(Active); Stash.UnstashAll(); return true; + case Continue _: return true; + case Cancel _: Context.Stop(Self); return true; + + case Status.Failure msg: + if (msg.Cause is AskTimeoutException e) + { + _log.Info(e, $"Current persistence id query timed out, retrying. Offset: {_lastOrderingOffset}"); + } + else + { + _log.Info(msg.Cause, $"Current persistence id query failed, retrying. Offset: {_lastOrderingOffset}"); + } + + Become(Active); + Stash.UnstashAll(); + return true; + default: Stash.Stash(); return true; @@ -175,16 +229,29 @@ private bool Active(object message) { switch (message) { + case CurrentPersistenceIds _: + // Ignore duplicate CurrentPersistenceIds response + return true; + case Request _: _buffer.DeliverBuffer(TotalDemand); return true; + case Continue _: - _journalRef.Tell(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self)); Become(Waiting); + _journalRef + .Ask(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self)) + .PipeTo(Self); return true; + case Cancel _: Context.Stop(Self); return true; + + case Status.Failure msg: + _log.Info(msg.Cause, "Unexpected failure received"); + return true; + default: return false; } diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/SqlReadJournal.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/SqlReadJournal.cs index 727272c0dfb..c10372d616d 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/SqlReadJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/SqlReadJournal.cs @@ -6,7 +6,6 @@ //----------------------------------------------------------------------- using System; -using System.Threading; using Reactive.Streams; using Akka.Actor; using Akka.Configuration; @@ -26,7 +25,7 @@ public class SqlReadJournal : IAllEventsQuery, ICurrentAllEventsQuery { - public static string Identifier = "akka.persistence.query.journal.sql"; + public const string Identifier = "akka.persistence.query.journal.sql"; /// /// Returns a default query configuration for akka persistence SQLite-based journals and snapshot stores. @@ -52,7 +51,6 @@ public SqlReadJournal(ExtendedActorSystem system, Config config) _maxBufferSize = config.GetInt("max-buffer-size", 0); _system = system; - _lock = new ReaderWriterLockSlim(); _persistenceIdsPublisher = null; } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistenceSqlCommonQuery.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistenceSqlCommonQuery.verified.txt index 57f95967345..df259e53321 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistenceSqlCommonQuery.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistenceSqlCommonQuery.verified.txt @@ -6,7 +6,7 @@ namespace Akka.Persistence.Query.Sql { public class SqlReadJournal : Akka.Persistence.Query.IAllEventsQuery, Akka.Persistence.Query.ICurrentAllEventsQuery, Akka.Persistence.Query.ICurrentEventsByPersistenceIdQuery, Akka.Persistence.Query.ICurrentEventsByTagQuery, Akka.Persistence.Query.ICurrentPersistenceIdsQuery, Akka.Persistence.Query.IEventsByPersistenceIdQuery, Akka.Persistence.Query.IEventsByTagQuery, Akka.Persistence.Query.IPersistenceIdsQuery, Akka.Persistence.Query.IReadJournal { - public static string Identifier; + public const string Identifier = "akka.persistence.query.journal.sql"; public SqlReadJournal(Akka.Actor.ExtendedActorSystem system, Akka.Configuration.Config config) { } public Akka.Streams.Dsl.Source AllEvents(Akka.Persistence.Query.Offset offset = null) { } public Akka.Streams.Dsl.Source CurrentAllEvents(Akka.Persistence.Query.Offset offset) { }