Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code cleanup on Akka.Persistence #5497

Merged
merged 1 commit into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 43 additions & 27 deletions src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,21 @@ public override void AroundPreRestart(Exception cause, object message)
finally
{
object inner;
if (message is WriteMessageSuccess) inner = (message as WriteMessageSuccess).Persistent;
else if (message is LoopMessageSuccess) inner = (message as LoopMessageSuccess).Message;
else if (message is ReplayedMessage) inner = (message as ReplayedMessage).Persistent;
else inner = message;
switch (message)
{
case WriteMessageSuccess success:
inner = success.Persistent;
break;
case LoopMessageSuccess success:
inner = success.Message;
break;
case ReplayedMessage replayedMessage:
inner = replayedMessage.Persistent;
break;
default:
inner = message;
break;
}

FlushJournalBatch();
base.AroundPreRestart(cause, inner);
Expand Down Expand Up @@ -97,31 +108,36 @@ public override void AroundPostStop()
/// <inheritdoc/>
protected override void Unhandled(object message)
{
if (message is RecoveryCompleted) return; // ignore
if (message is SaveSnapshotFailure)
switch (message)
{
var m = (SaveSnapshotFailure) message;
if (Log.IsWarningEnabled)
Log.Warning("Failed to SaveSnapshot given metadata [{0}] due to: [{1}: {2}]", m.Metadata, m.Cause, m.Cause.Message);
}
if (message is DeleteSnapshotFailure)
{
var m = (DeleteSnapshotFailure) message;
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteSnapshot given metadata [{0}] due to: [{1}: {2}]", m.Metadata, m.Cause, m.Cause.Message);
}
if (message is DeleteSnapshotsFailure)
{
var m = (DeleteSnapshotsFailure) message;
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteSnapshots given criteria [{0}] due to: [{1}: {2}]", m.Criteria, m.Cause, m.Cause.Message);
}
if (message is DeleteMessagesFailure)
{
var m = (DeleteMessagesFailure) message;
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteMessages ToSequenceNr [{0}] for PersistenceId [{1}] due to: [{2}: {3}]", m.ToSequenceNr, PersistenceId, m.Cause, m.Cause.Message);
case RecoveryCompleted _:
return; // ignore
case SaveSnapshotFailure failure:
{
if (Log.IsWarningEnabled)
Log.Warning("Failed to SaveSnapshot given metadata [{0}] due to: [{1}: {2}]", failure.Metadata, failure.Cause, failure.Cause.Message);
break;
}
case DeleteSnapshotFailure failure:
{
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteSnapshot given metadata [{0}] due to: [{1}: {2}]", failure.Metadata, failure.Cause, failure.Cause.Message);
break;
}
case DeleteSnapshotsFailure failure:
{
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteSnapshots given criteria [{0}] due to: [{1}: {2}]", failure.Criteria, failure.Cause, failure.Cause.Message);
break;
}
case DeleteMessagesFailure failure:
{
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteMessages ToSequenceNr [{0}] for PersistenceId [{1}] due to: [{2}: {3}]", failure.ToSequenceNr, PersistenceId, failure.Cause, failure.Cause.Message);
break;
}
}

base.Unhandled(message);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -628,9 +628,9 @@ private void StashInternally(object currentMessage)
var sender = Sender;
Context.System.DeadLetters.Tell(new DeadLetter(currentMessage, sender, Self), Sender);
}
else if (strategy is ReplyToStrategy)
else if (strategy is ReplyToStrategy toStrategy)
{
Sender.Tell(((ReplyToStrategy)strategy).Response);
Sender.Tell(toStrategy.Response);
}
else if (strategy is ThrowOverflowExceptionStrategy)
{
Expand Down
18 changes: 8 additions & 10 deletions src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,7 @@ private void HandleReplayMessages(ReplayMessages message)
/// <returns>TBD</returns>
protected Exception TryUnwrapException(Exception e)
{
var aggregateException = e as AggregateException;
if (aggregateException != null)
if (e is AggregateException aggregateException)
{
aggregateException = aggregateException.Flatten();
if (aggregateException.InnerExceptions.Count == 1)
Expand Down Expand Up @@ -354,21 +353,21 @@ private void HandleWriteMessages(WriteMessages message)
writeResult = Task.FromResult((IImmutableList<Exception>)Enumerable.Repeat(e, atomicWriteCount).ToImmutableList());
}

Action<Func<IPersistentRepresentation, Exception, object>, IImmutableList<Exception>> resequence = (mapper, results) =>
void Resequence(Func<IPersistentRepresentation, Exception, object> mapper, IImmutableList<Exception> results)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a local function instead of explicit delegate allocation

{
var i = 0;
var enumerator = results != null ? results.GetEnumerator() : null;
foreach (var resequencable in message.Messages)
{
if (resequencable is AtomicWrite)
if (resequencable is AtomicWrite aw)
{
var aw = resequencable as AtomicWrite;
Exception exception = null;
if (enumerator != null)
{
enumerator.MoveNext();
exception = enumerator.Current;
}

foreach (var p in (IEnumerable<IPersistentRepresentation>)aw.Payload)
{
_resequencer.Tell(new Desequenced(mapper(p, exception), counter + i + 1, message.PersistentActor, p.Sender));
Expand All @@ -378,12 +377,11 @@ private void HandleWriteMessages(WriteMessages message)
else
{
var loopMsg = new LoopMessageSuccess(resequencable.Payload, message.ActorInstanceId);
_resequencer.Tell(new Desequenced(loopMsg, counter + i + 1, message.PersistentActor,
resequencable.Sender));
_resequencer.Tell(new Desequenced(loopMsg, counter + i + 1, message.PersistentActor, resequencable.Sender));
i++;
}
}
};
}

writeResult
.ContinueWith(t =>
Expand All @@ -394,7 +392,7 @@ private void HandleWriteMessages(WriteMessages message)
throw new IllegalStateException($"AsyncWriteMessages return invalid number or results. Expected [{atomicWriteCount}], but got [{t.Result.Count}].");

_resequencer.Tell(new Desequenced(WriteMessagesSuccessful.Instance, counter, message.PersistentActor, self));
resequence((x, exception) => exception == null
Resequence((x, exception) => exception == null
? (object)new WriteMessageSuccess(x, message.ActorInstanceId)
: new WriteMessageRejected(x, exception, message.ActorInstanceId), t.Result);
}
Expand All @@ -407,7 +405,7 @@ private void HandleWriteMessages(WriteMessages message)
: new OperationCanceledException(
"WriteMessagesAsync canceled, possibly due to timing out."));
_resequencer.Tell(new Desequenced(new WriteMessagesFailed(exception, atomicWriteCount), counter, message.PersistentActor, self));
resequence((x, _) => new WriteMessageFailure(x, exception, message.ActorInstanceId), null);
Resequence((x, _) => new WriteMessageFailure(x, exception, message.ActorInstanceId), null);
}
}, _continuationOptions);
}
Expand Down