Skip to content

Commit

Permalink
Remove the PatternMatch class (#6099)
Browse files Browse the repository at this point in the history
* Remove the PatternMatch class

* Update API Verify list

* Fix SmallestMailboxSpec

* Update SmallestMailboxSpec to use tuple pattern instead
  • Loading branch information
Arkatufus authored Sep 16, 2022
1 parent 46d02f9 commit 4c2f30e
Show file tree
Hide file tree
Showing 40 changed files with 1,016 additions and 1,234 deletions.
51 changes: 30 additions & 21 deletions src/benchmark/PersistenceBenchmark/PerformanceActors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,38 @@ public PerformanceTestActor(string persistenceId)

public sealed override string PersistenceId { get; }

protected override bool ReceiveRecover(object message) => message.Match()
.With<Stored>(s => state += s.Value)
.WasHandled;

protected override bool ReceiveCommand(object message) => message.Match()
.With<Store>(store =>
protected override bool ReceiveRecover(object message)
{
if (message is Stored s)
{
Persist(new Stored(store.Value), s =>
{
state += s.Value;
});
})
.With<Init>(_ =>
state += s.Value;
return true;
}
return false;
}

protected override bool ReceiveCommand(object message)
{
switch (message)
{
var sender = Sender;
Persist(new Stored(0), s =>
{
state += s.Value;
sender.Tell(Done.Instance);
});
})
.With<Finish>(_ => Sender.Tell(new Finished(state)))
.WasHandled;
case Store store:
Persist(new Stored(store.Value), s => { state += s.Value; });
return true;
case Init _:
var sender = Sender;
Persist(new Stored(0), s =>
{
state += s.Value;
sender.Tell(Done.Instance);
});
return true;
case Finish _:
Sender.Tell(new Finished(state));
return true;
default:
return false;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,30 +150,34 @@ public PointToPointChannel()

private void Idle(object message)
{
message.Match()
.With<RegisterConsumer>(_ =>
{
switch (message)
{
case RegisterConsumer _:
_log.Info("Register consumer [{0}]", Sender.Path);
Sender.Tell(RegistrationOk.Instance);
Context.Become(Active(Sender));
})
.With<UnregisterConsumer>(_ =>
{
break;
case UnregisterConsumer _:
_log.Info("Unexpected unregistration: [{0}]", Sender.Path);
Sender.Tell(UnexpectedRegistration.Instance);
Context.Stop(Self);
})
.With<Reset>(_ => Sender.Tell(ResetOk.Instance))
.Default(msg => { });
break;
case Reset _:
Sender.Tell(ResetOk.Instance);
break;
default:
// no-op
break;
}
}

private UntypedReceive Active(IActorRef consumer)
{
return message =>
{
message.Match()
.With<UnregisterConsumer>(_ =>
{
switch (message)
{
case UnregisterConsumer _:
if (Sender.Equals(consumer))
{
_log.Info("UnregistrationOk: [{0}]", Sender.Path);
Expand All @@ -186,19 +190,23 @@ private UntypedReceive Active(IActorRef consumer)
Sender.Tell(UnexpectedUnregistration.Instance);
Context.Stop(Self);
}
})
.With<RegisterConsumer>(_ =>
{
break;

case RegisterConsumer _:
_log.Info("Unexpected RegisterConsumer: [{0}], active consumer: [{1}]", Sender.Path, consumer.Path);
Sender.Tell(UnexpectedRegistration.Instance);
Context.Stop(Self);
})
.With<Reset>(_ =>
{
break;

case Reset _:
Context.Become(Idle);
Sender.Tell(ResetOk.Instance);
})
.Default(msg => consumer.Tell(msg));
break;

default:
consumer.Tell(message);
break;
}
};
}

Expand Down
83 changes: 52 additions & 31 deletions src/contrib/cluster/Akka.DistributedData/ReadAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,36 @@ protected override void PreStart()
Reply(false);
}

protected override bool Receive(object message) => message.Match()
.With<ReadResult>(x =>
{
if (x.Envelope != null)
{
_result = _result?.Merge(x.Envelope) ?? x.Envelope;
}

Remaining = Remaining.Remove(Sender.Path.Address);
var done = DoneWhenRemainingSize;
Log.Debug("read acks remaining: {0}, done when: {1}, current state: {2}", Remaining.Count, done, _result);
if (Remaining.Count == done) Reply(true);
})
.With<SendToSecondary>(x =>
protected override bool Receive(object message)
{
switch (message)
{
foreach (var n in SecondaryNodes)
Replica(n).Tell(_read);
})
.With<ReceiveTimeout>(_ => Reply(false))
.WasHandled;
case ReadResult x:
if (x.Envelope != null)
{
_result = _result?.Merge(x.Envelope) ?? x.Envelope;
}

Remaining = Remaining.Remove(Sender.Path.Address);
var done = DoneWhenRemainingSize;
Log.Debug("read acks remaining: {0}, done when: {1}, current state: {2}", Remaining.Count, done,
_result);
if (Remaining.Count == done) Reply(true);
return true;

case SendToSecondary x:
foreach (var n in SecondaryNodes)
Replica(n).Tell(_read);
return true;

case ReceiveTimeout _:
Reply(false);
return true;

default:
return false;
}
}

private void Reply(bool ok)
{
Expand All @@ -121,19 +131,30 @@ private void Reply(bool ok)
}
}

private Receive WaitRepairAck(DataEnvelope envelope) => msg => msg.Match()
.With<ReadRepairAck>(x =>
private Receive WaitRepairAck(DataEnvelope envelope) => msg =>
{
switch (msg)
{
var reply = envelope.Data is DeletedData
? (object)new DataDeleted(_key, null)
: new GetSuccess(_key, _req, envelope.Data);
_replyTo.Tell(reply, Context.Parent);
Context.Stop(Self);
})
.With<ReadResult>(x => Remaining = Remaining.Remove(Sender.Path.Address))
.With<SendToSecondary>(_ => { })
.With<ReceiveTimeout>(_ => { })
.WasHandled;
case ReadRepairAck _:
var reply = envelope.Data is DeletedData
? (object)new DataDeleted(_key, null)
: new GetSuccess(_key, _req, envelope.Data);
_replyTo.Tell(reply, Context.Parent);
Context.Stop(Self);
return true;
case ReadResult _:
Remaining = Remaining.Remove(Sender.Path.Address);
return true;
case SendToSecondary _:
// no-op
return true;
case ReceiveTimeout _:
// no-op
return true;
default:
return false;
}
};
}

public interface IReadConsistency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,41 @@ protected override bool Receive(object message)

protected bool Init(object message)
{
return message.Match()
.With<EventsByPersistenceIdPublisher.Continue>(() => { })
.With<Request>(_ => ReceiveInitialRequest())
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;
switch (message)
{
case EventsByPersistenceIdPublisher.Continue _:
// no-op
return true;
case Request _:
ReceiveInitialRequest();
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}

protected bool Idle(object message)
{
return message.Match()
.With<EventsByPersistenceIdPublisher.Continue>(() =>
{
switch (message)
{
case EventsByPersistenceIdPublisher.Continue _:
if (IsTimeForReplay) Replay();
})
.With<EventAppended>(() =>
{
return true;
case EventAppended _:
if (IsTimeForReplay) Replay();
})
.With<Request>(_ => ReceiveIdleRequest())
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;
return true;
case Request _:
ReceiveIdleRequest();
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}

protected void Replay()
Expand All @@ -106,35 +120,55 @@ protected void Replay()

protected Receive Replaying(int limit)
{
return message => message.Match()
.With<ReplayedMessage>(replayed =>
{
var seqNr = replayed.Persistent.SequenceNr;
Buffer.Add(new EventEnvelope(
offset: new Sequence(seqNr),
persistenceId: PersistenceId,
sequenceNr: seqNr,
@event: replayed.Persistent.Payload,
timestamp: replayed.Persistent.Timestamp));
CurrentSequenceNr = seqNr + 1;
Buffer.DeliverBuffer(TotalDemand);
})
.With<RecoverySuccess>(success =>
{
Log.Debug("replay completed for persistenceId [{0}], currSeqNo [{1}]", PersistenceId, CurrentSequenceNr);
ReceiveRecoverySuccess(success.HighestSequenceNr);
})
.With<ReplayMessagesFailure>(failure =>
return message =>
{
switch (message)
{
Log.Debug("replay failed for persistenceId [{0}], due to [{1}]", PersistenceId, failure.Cause.Message);
Buffer.DeliverBuffer(TotalDemand);
OnErrorThenStop(failure.Cause);
})
.With<Request>(_ => Buffer.DeliverBuffer(TotalDemand))
.With<EventsByPersistenceIdPublisher.Continue>(() => { }) // skip during replay
.With<EventAppended>(() => { }) // skip during replay
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;
case ReplayedMessage replayed:
var seqNr = replayed.Persistent.SequenceNr;
Buffer.Add(new EventEnvelope(
offset: new Sequence(seqNr),
persistenceId: PersistenceId,
sequenceNr: seqNr,
@event: replayed.Persistent.Payload,
timestamp: replayed.Persistent.Timestamp));
CurrentSequenceNr = seqNr + 1;
Buffer.DeliverBuffer(TotalDemand);
return true;

case RecoverySuccess success:
Log.Debug("replay completed for persistenceId [{0}], currSeqNo [{1}]", PersistenceId,
CurrentSequenceNr);
ReceiveRecoverySuccess(success.HighestSequenceNr);
return true;

case ReplayMessagesFailure failure:
Log.Debug("replay failed for persistenceId [{0}], due to [{1}]", PersistenceId,
failure.Cause.Message);
Buffer.DeliverBuffer(TotalDemand);
OnErrorThenStop(failure.Cause);
return true;

case Request _:
Buffer.DeliverBuffer(TotalDemand);
return true;

case EventsByPersistenceIdPublisher.Continue _:
// skip during replay
return true;

case EventAppended _:
// skip during replay
return true;

case Cancel _:
Context.Stop(Self);
return true;

default:
return false;
}
};
}
}

Expand Down
Loading

0 comments on commit 4c2f30e

Please sign in to comment.