Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PersistenceIdsPublisher hung on failure messages #6374

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Streams.Actors;

Expand All @@ -22,26 +23,32 @@ public static Props Props(string writeJournalPluginId)
private readonly IActorRef _journalRef;

private readonly DeliveryBuffer<string> _buffer;
private readonly ILoggingAdapter _log;

public IStash Stash { get; set; }

public CurrentPersistenceIdsPublisher(string writeJournalPluginId)
{
_buffer = new DeliveryBuffer<string>(OnNext);
_journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
_log = Context.GetLogger();
}

protected override bool Receive(object message)
{
switch (message)
{
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Initializing);
_journalRef
.Ask<CurrentPersistenceIds>(new SelectCurrentPersistenceIds(0, Self))
.PipeTo(Self);
return true;

case Cancel _:
Context.Stop(Self);
return true;

default:
return false;
}
Expand All @@ -64,9 +71,26 @@ private bool Initializing(object message)
Become(Active);
Stash.UnstashAll();
return true;

case Cancel _:
Context.Stop(Self);
return true;

case Status.Failure msg:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

if (msg.Cause is AskTimeoutException e)
{
_log.Info(e, "Current persistence id query timed out, retrying");
}
else
{
_log.Info(msg.Cause, "Current persistence id query failed, retrying");
}

_journalRef
.Ask<CurrentPersistenceIds>(new SelectCurrentPersistenceIds(0, Self))
.PipeTo(Self);
return true;

default:
Stash.Stash();
return true;
Expand All @@ -77,14 +101,20 @@ private bool Active(object message)
{
switch (message)
{
case CurrentPersistenceIds _:
// Ignore duplicate CurrentPersistenceIds response
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

return true;

case Request _:
_buffer.DeliverBuffer(TotalDemand);
if (_buffer.IsEmpty)
OnCompleteThenStop();
return true;

case Cancel _:
Context.Stop(Self);
return true;

default:
return false;
}
Expand All @@ -93,7 +123,7 @@ private bool Active(object message)

internal sealed class LivePersistenceIdsPublisher : ActorPublisher<string>, IWithUnboundedStash
{
private class Continue
private sealed class Continue
{
public static readonly Continue Instance = new Continue();

Expand All @@ -109,11 +139,13 @@ public static Props Props(TimeSpan refreshInterval, string writeJournalPluginId)
private readonly ICancelable _tickCancelable;
private readonly IActorRef _journalRef;
private readonly DeliveryBuffer<string> _buffer;
private readonly ILoggingAdapter _log;

public IStash Stash { get; set; }

public LivePersistenceIdsPublisher(TimeSpan refreshInterval, string writeJournalPluginId)
{
_log = Context.GetLogger();
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
refreshInterval,
refreshInterval,
Expand All @@ -135,14 +167,19 @@ protected override bool Receive(object message)
switch (message)
{
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Waiting);
_journalRef
.Ask<CurrentPersistenceIds>(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self))
.PipeTo(Self);
return true;

case Continue _:
return true;

case Cancel _:
Context.Stop(Self);
return true;

default:
return false;
}
Expand All @@ -160,11 +197,28 @@ private bool Waiting(object message)
Become(Active);
Stash.UnstashAll();
return true;

case Continue _:
return true;

case Cancel _:
Context.Stop(Self);
return true;

case Status.Failure msg:
if (msg.Cause is AskTimeoutException e)
{
_log.Info(e, $"Current persistence id query timed out, retrying. Offset: {_lastOrderingOffset}");
}
else
{
_log.Info(msg.Cause, $"Current persistence id query failed, retrying. Offset: {_lastOrderingOffset}");
}

Become(Active);
Stash.UnstashAll();
return true;

default:
Stash.Stash();
return true;
Expand All @@ -175,16 +229,29 @@ private bool Active(object message)
{
switch (message)
{
case CurrentPersistenceIds _:
// Ignore duplicate CurrentPersistenceIds response
return true;

case Request _:
_buffer.DeliverBuffer(TotalDemand);
return true;

case Continue _:
_journalRef.Tell(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self));
Become(Waiting);
_journalRef
.Ask<CurrentPersistenceIds>(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self))
.PipeTo(Self);
return true;

case Cancel _:
Context.Stop(Self);
return true;

case Status.Failure msg:
_log.Info(msg.Cause, "Unexpected failure received");
return true;

default:
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using Reactive.Streams;
using Akka.Actor;
using Akka.Configuration;
Expand All @@ -26,7 +25,7 @@ public class SqlReadJournal :
IAllEventsQuery,
ICurrentAllEventsQuery
{
public static string Identifier = "akka.persistence.query.journal.sql";
public const string Identifier = "akka.persistence.query.journal.sql";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


/// <summary>
/// Returns a default query configuration for akka persistence SQLite-based journals and snapshot stores.
Expand All @@ -52,7 +51,6 @@ public SqlReadJournal(ExtendedActorSystem system, Config config)
_maxBufferSize = config.GetInt("max-buffer-size", 0);
_system = system;

_lock = new ReaderWriterLockSlim();
_persistenceIdsPublisher = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Akka.Persistence.Query.Sql
{
public class SqlReadJournal : Akka.Persistence.Query.IAllEventsQuery, Akka.Persistence.Query.ICurrentAllEventsQuery, Akka.Persistence.Query.ICurrentEventsByPersistenceIdQuery, Akka.Persistence.Query.ICurrentEventsByTagQuery, Akka.Persistence.Query.ICurrentPersistenceIdsQuery, Akka.Persistence.Query.IEventsByPersistenceIdQuery, Akka.Persistence.Query.IEventsByTagQuery, Akka.Persistence.Query.IPersistenceIdsQuery, Akka.Persistence.Query.IReadJournal
{
public static string Identifier;
public const string Identifier = "akka.persistence.query.journal.sql";
public SqlReadJournal(Akka.Actor.ExtendedActorSystem system, Akka.Configuration.Config config) { }
public Akka.Streams.Dsl.Source<Akka.Persistence.Query.EventEnvelope, Akka.NotUsed> AllEvents(Akka.Persistence.Query.Offset offset = null) { }
public Akka.Streams.Dsl.Source<Akka.Persistence.Query.EventEnvelope, Akka.NotUsed> CurrentAllEvents(Akka.Persistence.Query.Offset offset) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Akka.Persistence.Query.Sql
{
public class SqlReadJournal : Akka.Persistence.Query.IAllEventsQuery, Akka.Persistence.Query.ICurrentAllEventsQuery, Akka.Persistence.Query.ICurrentEventsByPersistenceIdQuery, Akka.Persistence.Query.ICurrentEventsByTagQuery, Akka.Persistence.Query.ICurrentPersistenceIdsQuery, Akka.Persistence.Query.IEventsByPersistenceIdQuery, Akka.Persistence.Query.IEventsByTagQuery, Akka.Persistence.Query.IPersistenceIdsQuery, Akka.Persistence.Query.IReadJournal
{
public static string Identifier;
public const string Identifier = "akka.persistence.query.journal.sql";
public SqlReadJournal(Akka.Actor.ExtendedActorSystem system, Akka.Configuration.Config config) { }
public Akka.Streams.Dsl.Source<Akka.Persistence.Query.EventEnvelope, Akka.NotUsed> AllEvents(Akka.Persistence.Query.Offset offset = null) { }
public Akka.Streams.Dsl.Source<Akka.Persistence.Query.EventEnvelope, Akka.NotUsed> CurrentAllEvents(Akka.Persistence.Query.Offset offset) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Akka.Persistence.Query.Sql
{
public class SqlReadJournal : Akka.Persistence.Query.IAllEventsQuery, Akka.Persistence.Query.ICurrentAllEventsQuery, Akka.Persistence.Query.ICurrentEventsByPersistenceIdQuery, Akka.Persistence.Query.ICurrentEventsByTagQuery, Akka.Persistence.Query.ICurrentPersistenceIdsQuery, Akka.Persistence.Query.IEventsByPersistenceIdQuery, Akka.Persistence.Query.IEventsByTagQuery, Akka.Persistence.Query.IPersistenceIdsQuery, Akka.Persistence.Query.IReadJournal
{
public static string Identifier;
public const string Identifier = "akka.persistence.query.journal.sql";
public SqlReadJournal(Akka.Actor.ExtendedActorSystem system, Akka.Configuration.Config config) { }
public Akka.Streams.Dsl.Source<Akka.Persistence.Query.EventEnvelope, Akka.NotUsed> AllEvents(Akka.Persistence.Query.Offset offset = null) { }
public Akka.Streams.Dsl.Source<Akka.Persistence.Query.EventEnvelope, Akka.NotUsed> CurrentAllEvents(Akka.Persistence.Query.Offset offset) { }
Expand Down