Skip to content

Commit

Permalink
code cleanup on Akka.Persistence (#5497)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb authored Jan 11, 2022
1 parent 4ba2443 commit 2896372
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 39 deletions.
70 changes: 43 additions & 27 deletions src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,21 @@ public override void AroundPreRestart(Exception cause, object message)
finally
{
object inner;
if (message is WriteMessageSuccess) inner = (message as WriteMessageSuccess).Persistent;
else if (message is LoopMessageSuccess) inner = (message as LoopMessageSuccess).Message;
else if (message is ReplayedMessage) inner = (message as ReplayedMessage).Persistent;
else inner = message;
switch (message)
{
case WriteMessageSuccess success:
inner = success.Persistent;
break;
case LoopMessageSuccess success:
inner = success.Message;
break;
case ReplayedMessage replayedMessage:
inner = replayedMessage.Persistent;
break;
default:
inner = message;
break;
}

FlushJournalBatch();
base.AroundPreRestart(cause, inner);
Expand Down Expand Up @@ -97,31 +108,36 @@ public override void AroundPostStop()
/// <inheritdoc/>
protected override void Unhandled(object message)
{
if (message is RecoveryCompleted) return; // ignore
if (message is SaveSnapshotFailure)
switch (message)
{
var m = (SaveSnapshotFailure) message;
if (Log.IsWarningEnabled)
Log.Warning("Failed to SaveSnapshot given metadata [{0}] due to: [{1}: {2}]", m.Metadata, m.Cause, m.Cause.Message);
}
if (message is DeleteSnapshotFailure)
{
var m = (DeleteSnapshotFailure) message;
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteSnapshot given metadata [{0}] due to: [{1}: {2}]", m.Metadata, m.Cause, m.Cause.Message);
}
if (message is DeleteSnapshotsFailure)
{
var m = (DeleteSnapshotsFailure) message;
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteSnapshots given criteria [{0}] due to: [{1}: {2}]", m.Criteria, m.Cause, m.Cause.Message);
}
if (message is DeleteMessagesFailure)
{
var m = (DeleteMessagesFailure) message;
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteMessages ToSequenceNr [{0}] for PersistenceId [{1}] due to: [{2}: {3}]", m.ToSequenceNr, PersistenceId, m.Cause, m.Cause.Message);
case RecoveryCompleted _:
return; // ignore
case SaveSnapshotFailure failure:
{
if (Log.IsWarningEnabled)
Log.Warning("Failed to SaveSnapshot given metadata [{0}] due to: [{1}: {2}]", failure.Metadata, failure.Cause, failure.Cause.Message);
break;
}
case DeleteSnapshotFailure failure:
{
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteSnapshot given metadata [{0}] due to: [{1}: {2}]", failure.Metadata, failure.Cause, failure.Cause.Message);
break;
}
case DeleteSnapshotsFailure failure:
{
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteSnapshots given criteria [{0}] due to: [{1}: {2}]", failure.Criteria, failure.Cause, failure.Cause.Message);
break;
}
case DeleteMessagesFailure failure:
{
if (Log.IsWarningEnabled)
Log.Warning("Failed to DeleteMessages ToSequenceNr [{0}] for PersistenceId [{1}] due to: [{2}: {3}]", failure.ToSequenceNr, PersistenceId, failure.Cause, failure.Cause.Message);
break;
}
}

base.Unhandled(message);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -628,9 +628,9 @@ private void StashInternally(object currentMessage)
var sender = Sender;
Context.System.DeadLetters.Tell(new DeadLetter(currentMessage, sender, Self), Sender);
}
else if (strategy is ReplyToStrategy)
else if (strategy is ReplyToStrategy toStrategy)
{
Sender.Tell(((ReplyToStrategy)strategy).Response);
Sender.Tell(toStrategy.Response);
}
else if (strategy is ThrowOverflowExceptionStrategy)
{
Expand Down
18 changes: 8 additions & 10 deletions src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,7 @@ private void HandleReplayMessages(ReplayMessages message)
/// <returns>TBD</returns>
protected Exception TryUnwrapException(Exception e)
{
var aggregateException = e as AggregateException;
if (aggregateException != null)
if (e is AggregateException aggregateException)
{
aggregateException = aggregateException.Flatten();
if (aggregateException.InnerExceptions.Count == 1)
Expand Down Expand Up @@ -354,21 +353,21 @@ private void HandleWriteMessages(WriteMessages message)
writeResult = Task.FromResult((IImmutableList<Exception>)Enumerable.Repeat(e, atomicWriteCount).ToImmutableList());
}

Action<Func<IPersistentRepresentation, Exception, object>, IImmutableList<Exception>> resequence = (mapper, results) =>
void Resequence(Func<IPersistentRepresentation, Exception, object> mapper, IImmutableList<Exception> results)
{
var i = 0;
var enumerator = results != null ? results.GetEnumerator() : null;
foreach (var resequencable in message.Messages)
{
if (resequencable is AtomicWrite)
if (resequencable is AtomicWrite aw)
{
var aw = resequencable as AtomicWrite;
Exception exception = null;
if (enumerator != null)
{
enumerator.MoveNext();
exception = enumerator.Current;
}

foreach (var p in (IEnumerable<IPersistentRepresentation>)aw.Payload)
{
_resequencer.Tell(new Desequenced(mapper(p, exception), counter + i + 1, message.PersistentActor, p.Sender));
Expand All @@ -378,12 +377,11 @@ private void HandleWriteMessages(WriteMessages message)
else
{
var loopMsg = new LoopMessageSuccess(resequencable.Payload, message.ActorInstanceId);
_resequencer.Tell(new Desequenced(loopMsg, counter + i + 1, message.PersistentActor,
resequencable.Sender));
_resequencer.Tell(new Desequenced(loopMsg, counter + i + 1, message.PersistentActor, resequencable.Sender));
i++;
}
}
};
}

writeResult
.ContinueWith(t =>
Expand All @@ -394,7 +392,7 @@ private void HandleWriteMessages(WriteMessages message)
throw new IllegalStateException($"AsyncWriteMessages return invalid number or results. Expected [{atomicWriteCount}], but got [{t.Result.Count}].");

_resequencer.Tell(new Desequenced(WriteMessagesSuccessful.Instance, counter, message.PersistentActor, self));
resequence((x, exception) => exception == null
Resequence((x, exception) => exception == null
? (object)new WriteMessageSuccess(x, message.ActorInstanceId)
: new WriteMessageRejected(x, exception, message.ActorInstanceId), t.Result);
}
Expand All @@ -407,7 +405,7 @@ private void HandleWriteMessages(WriteMessages message)
: new OperationCanceledException(
"WriteMessagesAsync canceled, possibly due to timing out."));
_resequencer.Tell(new Desequenced(new WriteMessagesFailed(exception, atomicWriteCount), counter, message.PersistentActor, self));
resequence((x, _) => new WriteMessageFailure(x, exception, message.ActorInstanceId), null);
Resequence((x, _) => new WriteMessageFailure(x, exception, message.ActorInstanceId), null);
}
}, _continuationOptions);
}
Expand Down

0 comments on commit 2896372

Please sign in to comment.