From 87e5b72339ffff0ab3c7022047b627c6d9920221 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 10 Jan 2022 15:28:26 -0600 Subject: [PATCH] code cleanup on Akka.Persistence Cleaning up some of the `Eventsourced` and `AsyncWriteJournal` code to use newer C# features --- .../Eventsourced.Lifecycle.cs | 70 ++++++++++++------- src/core/Akka.Persistence/Eventsourced.cs | 4 +- .../Journal/AsyncWriteJournal.cs | 18 +++-- 3 files changed, 53 insertions(+), 39 deletions(-) diff --git a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs index 4cd10649223..1cefff8650f 100644 --- a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs +++ b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs @@ -63,10 +63,21 @@ public override void AroundPreRestart(Exception cause, object message) finally { object inner; - if (message is WriteMessageSuccess) inner = (message as WriteMessageSuccess).Persistent; - else if (message is LoopMessageSuccess) inner = (message as LoopMessageSuccess).Message; - else if (message is ReplayedMessage) inner = (message as ReplayedMessage).Persistent; - else inner = message; + switch (message) + { + case WriteMessageSuccess success: + inner = success.Persistent; + break; + case LoopMessageSuccess success: + inner = success.Message; + break; + case ReplayedMessage replayedMessage: + inner = replayedMessage.Persistent; + break; + default: + inner = message; + break; + } FlushJournalBatch(); base.AroundPreRestart(cause, inner); @@ -97,31 +108,36 @@ public override void AroundPostStop() /// protected override void Unhandled(object message) { - if (message is RecoveryCompleted) return; // ignore - if (message is SaveSnapshotFailure) + switch (message) { - var m = (SaveSnapshotFailure) message; - if (Log.IsWarningEnabled) - Log.Warning("Failed to SaveSnapshot given metadata [{0}] due to: [{1}: {2}]", m.Metadata, m.Cause, m.Cause.Message); - } - if (message is DeleteSnapshotFailure) - { - var m = (DeleteSnapshotFailure) message; - if (Log.IsWarningEnabled) - Log.Warning("Failed to DeleteSnapshot given metadata [{0}] due to: [{1}: {2}]", m.Metadata, m.Cause, m.Cause.Message); - } - if (message is DeleteSnapshotsFailure) - { - var m = (DeleteSnapshotsFailure) message; - if (Log.IsWarningEnabled) - Log.Warning("Failed to DeleteSnapshots given criteria [{0}] due to: [{1}: {2}]", m.Criteria, m.Cause, m.Cause.Message); - } - if (message is DeleteMessagesFailure) - { - var m = (DeleteMessagesFailure) message; - if (Log.IsWarningEnabled) - Log.Warning("Failed to DeleteMessages ToSequenceNr [{0}] for PersistenceId [{1}] due to: [{2}: {3}]", m.ToSequenceNr, PersistenceId, m.Cause, m.Cause.Message); + case RecoveryCompleted _: + return; // ignore + case SaveSnapshotFailure failure: + { + if (Log.IsWarningEnabled) + Log.Warning("Failed to SaveSnapshot given metadata [{0}] due to: [{1}: {2}]", failure.Metadata, failure.Cause, failure.Cause.Message); + break; + } + case DeleteSnapshotFailure failure: + { + if (Log.IsWarningEnabled) + Log.Warning("Failed to DeleteSnapshot given metadata [{0}] due to: [{1}: {2}]", failure.Metadata, failure.Cause, failure.Cause.Message); + break; + } + case DeleteSnapshotsFailure failure: + { + if (Log.IsWarningEnabled) + Log.Warning("Failed to DeleteSnapshots given criteria [{0}] due to: [{1}: {2}]", failure.Criteria, failure.Cause, failure.Cause.Message); + break; + } + case DeleteMessagesFailure failure: + { + if (Log.IsWarningEnabled) + Log.Warning("Failed to DeleteMessages ToSequenceNr [{0}] for PersistenceId [{1}] due to: [{2}: {3}]", failure.ToSequenceNr, PersistenceId, failure.Cause, failure.Cause.Message); + break; + } } + base.Unhandled(message); } } diff --git a/src/core/Akka.Persistence/Eventsourced.cs b/src/core/Akka.Persistence/Eventsourced.cs index 0e92a7fc666..ce79d622b39 100644 --- a/src/core/Akka.Persistence/Eventsourced.cs +++ b/src/core/Akka.Persistence/Eventsourced.cs @@ -628,9 +628,9 @@ private void StashInternally(object currentMessage) var sender = Sender; Context.System.DeadLetters.Tell(new DeadLetter(currentMessage, sender, Self), Sender); } - else if (strategy is ReplyToStrategy) + else if (strategy is ReplyToStrategy toStrategy) { - Sender.Tell(((ReplyToStrategy)strategy).Response); + Sender.Tell(toStrategy.Response); } else if (strategy is ThrowOverflowExceptionStrategy) { diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index 1cf19e7c8cf..c6e4d0a0dcc 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -309,8 +309,7 @@ private void HandleReplayMessages(ReplayMessages message) /// TBD protected Exception TryUnwrapException(Exception e) { - var aggregateException = e as AggregateException; - if (aggregateException != null) + if (e is AggregateException aggregateException) { aggregateException = aggregateException.Flatten(); if (aggregateException.InnerExceptions.Count == 1) @@ -354,21 +353,21 @@ private void HandleWriteMessages(WriteMessages message) writeResult = Task.FromResult((IImmutableList)Enumerable.Repeat(e, atomicWriteCount).ToImmutableList()); } - Action, IImmutableList> resequence = (mapper, results) => + void Resequence(Func mapper, IImmutableList results) { var i = 0; var enumerator = results != null ? results.GetEnumerator() : null; foreach (var resequencable in message.Messages) { - if (resequencable is AtomicWrite) + if (resequencable is AtomicWrite aw) { - var aw = resequencable as AtomicWrite; Exception exception = null; if (enumerator != null) { enumerator.MoveNext(); exception = enumerator.Current; } + foreach (var p in (IEnumerable)aw.Payload) { _resequencer.Tell(new Desequenced(mapper(p, exception), counter + i + 1, message.PersistentActor, p.Sender)); @@ -378,12 +377,11 @@ private void HandleWriteMessages(WriteMessages message) else { var loopMsg = new LoopMessageSuccess(resequencable.Payload, message.ActorInstanceId); - _resequencer.Tell(new Desequenced(loopMsg, counter + i + 1, message.PersistentActor, - resequencable.Sender)); + _resequencer.Tell(new Desequenced(loopMsg, counter + i + 1, message.PersistentActor, resequencable.Sender)); i++; } } - }; + } writeResult .ContinueWith(t => @@ -394,7 +392,7 @@ private void HandleWriteMessages(WriteMessages message) throw new IllegalStateException($"AsyncWriteMessages return invalid number or results. Expected [{atomicWriteCount}], but got [{t.Result.Count}]."); _resequencer.Tell(new Desequenced(WriteMessagesSuccessful.Instance, counter, message.PersistentActor, self)); - resequence((x, exception) => exception == null + Resequence((x, exception) => exception == null ? (object)new WriteMessageSuccess(x, message.ActorInstanceId) : new WriteMessageRejected(x, exception, message.ActorInstanceId), t.Result); } @@ -407,7 +405,7 @@ private void HandleWriteMessages(WriteMessages message) : new OperationCanceledException( "WriteMessagesAsync canceled, possibly due to timing out.")); _resequencer.Tell(new Desequenced(new WriteMessagesFailed(exception, atomicWriteCount), counter, message.PersistentActor, self)); - resequence((x, _) => new WriteMessageFailure(x, exception, message.ActorInstanceId), null); + Resequence((x, _) => new WriteMessageFailure(x, exception, message.ActorInstanceId), null); } }, _continuationOptions); }