diff --git a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
index 4cd10649223..1cefff8650f 100644
--- a/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
+++ b/src/core/Akka.Persistence/Eventsourced.Lifecycle.cs
@@ -63,10 +63,21 @@ public override void AroundPreRestart(Exception cause, object message)
finally
{
object inner;
- if (message is WriteMessageSuccess) inner = (message as WriteMessageSuccess).Persistent;
- else if (message is LoopMessageSuccess) inner = (message as LoopMessageSuccess).Message;
- else if (message is ReplayedMessage) inner = (message as ReplayedMessage).Persistent;
- else inner = message;
+ switch (message)
+ {
+ case WriteMessageSuccess success:
+ inner = success.Persistent;
+ break;
+ case LoopMessageSuccess success:
+ inner = success.Message;
+ break;
+ case ReplayedMessage replayedMessage:
+ inner = replayedMessage.Persistent;
+ break;
+ default:
+ inner = message;
+ break;
+ }
FlushJournalBatch();
base.AroundPreRestart(cause, inner);
@@ -97,31 +108,36 @@ public override void AroundPostStop()
///
protected override void Unhandled(object message)
{
- if (message is RecoveryCompleted) return; // ignore
- if (message is SaveSnapshotFailure)
+ switch (message)
{
- var m = (SaveSnapshotFailure) message;
- if (Log.IsWarningEnabled)
- Log.Warning("Failed to SaveSnapshot given metadata [{0}] due to: [{1}: {2}]", m.Metadata, m.Cause, m.Cause.Message);
- }
- if (message is DeleteSnapshotFailure)
- {
- var m = (DeleteSnapshotFailure) message;
- if (Log.IsWarningEnabled)
- Log.Warning("Failed to DeleteSnapshot given metadata [{0}] due to: [{1}: {2}]", m.Metadata, m.Cause, m.Cause.Message);
- }
- if (message is DeleteSnapshotsFailure)
- {
- var m = (DeleteSnapshotsFailure) message;
- if (Log.IsWarningEnabled)
- Log.Warning("Failed to DeleteSnapshots given criteria [{0}] due to: [{1}: {2}]", m.Criteria, m.Cause, m.Cause.Message);
- }
- if (message is DeleteMessagesFailure)
- {
- var m = (DeleteMessagesFailure) message;
- if (Log.IsWarningEnabled)
- Log.Warning("Failed to DeleteMessages ToSequenceNr [{0}] for PersistenceId [{1}] due to: [{2}: {3}]", m.ToSequenceNr, PersistenceId, m.Cause, m.Cause.Message);
+ case RecoveryCompleted _:
+ return; // ignore
+ case SaveSnapshotFailure failure:
+ {
+ if (Log.IsWarningEnabled)
+ Log.Warning("Failed to SaveSnapshot given metadata [{0}] due to: [{1}: {2}]", failure.Metadata, failure.Cause, failure.Cause.Message);
+ break;
+ }
+ case DeleteSnapshotFailure failure:
+ {
+ if (Log.IsWarningEnabled)
+ Log.Warning("Failed to DeleteSnapshot given metadata [{0}] due to: [{1}: {2}]", failure.Metadata, failure.Cause, failure.Cause.Message);
+ break;
+ }
+ case DeleteSnapshotsFailure failure:
+ {
+ if (Log.IsWarningEnabled)
+ Log.Warning("Failed to DeleteSnapshots given criteria [{0}] due to: [{1}: {2}]", failure.Criteria, failure.Cause, failure.Cause.Message);
+ break;
+ }
+ case DeleteMessagesFailure failure:
+ {
+ if (Log.IsWarningEnabled)
+ Log.Warning("Failed to DeleteMessages ToSequenceNr [{0}] for PersistenceId [{1}] due to: [{2}: {3}]", failure.ToSequenceNr, PersistenceId, failure.Cause, failure.Cause.Message);
+ break;
+ }
}
+
base.Unhandled(message);
}
}
diff --git a/src/core/Akka.Persistence/Eventsourced.cs b/src/core/Akka.Persistence/Eventsourced.cs
index 0e92a7fc666..ce79d622b39 100644
--- a/src/core/Akka.Persistence/Eventsourced.cs
+++ b/src/core/Akka.Persistence/Eventsourced.cs
@@ -628,9 +628,9 @@ private void StashInternally(object currentMessage)
var sender = Sender;
Context.System.DeadLetters.Tell(new DeadLetter(currentMessage, sender, Self), Sender);
}
- else if (strategy is ReplyToStrategy)
+ else if (strategy is ReplyToStrategy toStrategy)
{
- Sender.Tell(((ReplyToStrategy)strategy).Response);
+ Sender.Tell(toStrategy.Response);
}
else if (strategy is ThrowOverflowExceptionStrategy)
{
diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
index 1cf19e7c8cf..c6e4d0a0dcc 100644
--- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
+++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
@@ -309,8 +309,7 @@ private void HandleReplayMessages(ReplayMessages message)
/// TBD
protected Exception TryUnwrapException(Exception e)
{
- var aggregateException = e as AggregateException;
- if (aggregateException != null)
+ if (e is AggregateException aggregateException)
{
aggregateException = aggregateException.Flatten();
if (aggregateException.InnerExceptions.Count == 1)
@@ -354,21 +353,21 @@ private void HandleWriteMessages(WriteMessages message)
writeResult = Task.FromResult((IImmutableList)Enumerable.Repeat(e, atomicWriteCount).ToImmutableList());
}
- Action, IImmutableList> resequence = (mapper, results) =>
+ void Resequence(Func mapper, IImmutableList results)
{
var i = 0;
var enumerator = results != null ? results.GetEnumerator() : null;
foreach (var resequencable in message.Messages)
{
- if (resequencable is AtomicWrite)
+ if (resequencable is AtomicWrite aw)
{
- var aw = resequencable as AtomicWrite;
Exception exception = null;
if (enumerator != null)
{
enumerator.MoveNext();
exception = enumerator.Current;
}
+
foreach (var p in (IEnumerable)aw.Payload)
{
_resequencer.Tell(new Desequenced(mapper(p, exception), counter + i + 1, message.PersistentActor, p.Sender));
@@ -378,12 +377,11 @@ private void HandleWriteMessages(WriteMessages message)
else
{
var loopMsg = new LoopMessageSuccess(resequencable.Payload, message.ActorInstanceId);
- _resequencer.Tell(new Desequenced(loopMsg, counter + i + 1, message.PersistentActor,
- resequencable.Sender));
+ _resequencer.Tell(new Desequenced(loopMsg, counter + i + 1, message.PersistentActor, resequencable.Sender));
i++;
}
}
- };
+ }
writeResult
.ContinueWith(t =>
@@ -394,7 +392,7 @@ private void HandleWriteMessages(WriteMessages message)
throw new IllegalStateException($"AsyncWriteMessages return invalid number or results. Expected [{atomicWriteCount}], but got [{t.Result.Count}].");
_resequencer.Tell(new Desequenced(WriteMessagesSuccessful.Instance, counter, message.PersistentActor, self));
- resequence((x, exception) => exception == null
+ Resequence((x, exception) => exception == null
? (object)new WriteMessageSuccess(x, message.ActorInstanceId)
: new WriteMessageRejected(x, exception, message.ActorInstanceId), t.Result);
}
@@ -407,7 +405,7 @@ private void HandleWriteMessages(WriteMessages message)
: new OperationCanceledException(
"WriteMessagesAsync canceled, possibly due to timing out."));
_resequencer.Tell(new Desequenced(new WriteMessagesFailed(exception, atomicWriteCount), counter, message.PersistentActor, self));
- resequence((x, _) => new WriteMessageFailure(x, exception, message.ActorInstanceId), null);
+ Resequence((x, _) => new WriteMessageFailure(x, exception, message.ActorInstanceId), null);
}
}, _continuationOptions);
}