Skip to content

Commit

Permalink
remove some state machine (#6787)
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonCropp authored Jun 8, 2023
1 parent c3e03a8 commit e014825
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,12 @@ SELECT MAX(e.{Configuration.OrderingColumnName}) as Ordering
/// <param name="cancellationToken">TBD</param>
/// <param name="offset">TBD</param>
/// <returns>TBD</returns>
public virtual async Task<ImmutableArray<string>> SelectAllPersistenceIdsAsync(
DbConnection connection,
public virtual Task<ImmutableArray<string>> SelectAllPersistenceIdsAsync(
DbConnection connection,
CancellationToken cancellationToken,
long offset)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
using var command = GetCommand(connection, AllPersistenceIdsSql);
command.Transaction = tx;
Expand Down Expand Up @@ -685,15 +685,15 @@ public virtual async Task<long> SelectByTagAsync(
});
}

public virtual async Task<long> SelectAllEventsAsync(
public virtual Task<long> SelectAllEventsAsync(
DbConnection connection,
CancellationToken cancellationToken,
CancellationToken cancellationToken,
long fromOffset,
long toOffset,
long max,
long max,
Action<ReplayedEvent> callback)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
long maxOrdering;
using (var command = GetCommand(connection, HighestOrderingSql))
Expand Down Expand Up @@ -736,9 +736,9 @@ public virtual async Task<long> SelectAllEventsAsync(
/// <param name="cancellationToken">TBD</param>
/// <param name="persistenceId">TBD</param>
/// <returns>TBD</returns>
public virtual async Task<long> SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId)
public virtual Task<long> SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
using var command = GetCommand(connection, HighestSequenceNrSql);
command.Transaction = tx;
Expand All @@ -749,9 +749,9 @@ public virtual async Task<long> SelectHighestSequenceNrAsync(DbConnection connec
});
}

public virtual async Task<long> SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken)
public virtual Task<long> SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
using var command = GetCommand(connection, HighestOrderingSql);
command.Transaction = tx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,18 +548,18 @@ await connection.ExecuteInTransaction(WriteIsolationLevel, cancellationToken, as
/// <param name="maxSequenceNr">TBD</param>
/// <param name="maxTimestamp">TBD</param>
/// <returns>TBD</returns>
public virtual async Task<SelectedSnapshot> SelectSnapshotAsync(
public virtual Task<SelectedSnapshot> SelectSnapshotAsync(
DbConnection connection,
CancellationToken cancellationToken,
string persistenceId,
long maxSequenceNr,
DateTime maxTimestamp)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
using var command = GetCommand(connection, SelectSnapshotSql);
command.Transaction = tx;

SetPersistenceIdParameter(persistenceId, command);
SetSequenceNrParameter(maxSequenceNr, command);
SetTimestampParameter(maxTimestamp, command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,23 @@ private async Task<IActorRef> Here()
return identity.Subject;
}

private async Task<bool> Throttle(ThrottleTransportAdapter.Direction direction, ThrottleMode mode)
private Task<bool> Throttle(ThrottleTransportAdapter.Direction direction, ThrottleMode mode)
{
var rootBAddress = new Address("akka", "systemB", "localhost", RootB.Address.Port.Value);
var transport =
Sys.AsInstanceOf<ExtendedActorSystem>().Provider.AsInstanceOf<RemoteActorRefProvider>().Transport;
return await transport.ManagementCommand(new SetThrottle(rootBAddress, direction, mode))

return transport.ManagementCommand(new SetThrottle(rootBAddress, direction, mode))
.ShouldCompleteWithin(DefaultTimeout);
}

private async Task<bool> Disassociate()
private Task<bool> Disassociate()
{
var rootBAddress = new Address("akka", "systemB", "localhost", RootB.Address.Port.Value);
var transport =
Sys.AsInstanceOf<ExtendedActorSystem>().Provider.AsInstanceOf<RemoteActorRefProvider>().Transport;
return await transport.ManagementCommand(new ForceDisassociate(rootBAddress))

return transport.ManagementCommand(new ForceDisassociate(rootBAddress))
.ShouldCompleteWithin(DefaultTimeout);
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ protected async Task<IChannel> NewServer(EndPoint listenAddress)
}
}

public override async Task<AssociationHandle> Associate(Address remoteAddress)
public override Task<AssociationHandle> Associate(Address remoteAddress)
{
if (!ServerChannel.Open)
throw new ChannelException("Transport is not open");

return await AssociateInternal(remoteAddress).ConfigureAwait(false);
return AssociateInternal(remoteAddress);
}

protected abstract Task<AssociationHandle> AssociateInternal(Address remoteAddress);
Expand Down
35 changes: 15 additions & 20 deletions src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,16 @@ public T ExpectOne<T>(Func<T> func, CancellationToken cancellationToken = defaul
/// <summary>
/// Async version of ExpectOne
/// </summary>
public async Task<T> ExpectOneAsync<T>(
public Task<T> ExpectOneAsync<T>(
Func<Task<T>> func,
CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: null,
expectedOccurrences: 1,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand All @@ -262,19 +261,18 @@ public T ExpectOne<T>(
/// <summary>
/// Async version of ExpectOne
/// </summary>
public async Task<T> ExpectOneAsync<T>(
public Task<T> ExpectOneAsync<T>(
TimeSpan timeout,
Func<Task<T>> func,
CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: timeout,
expectedOccurrences: 1,
matchedEventHandler: null,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand All @@ -297,19 +295,18 @@ public T Expect<T>(
/// <summary>
/// Async version of Expect
/// </summary>
public async Task<T> ExpectAsync<T>(
public Task<T> ExpectAsync<T>(
int expectedCount,
Func<Task<T>> func,
CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: null,
expectedOccurrences: expectedCount,
matchedEventHandler: null,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand All @@ -335,20 +332,19 @@ public T Expect<T>(
/// Async version of Expect
/// Note: <paramref name="func"/> might not get awaited.
/// </summary>
public async Task<T> ExpectAsync<T>(
public Task<T> ExpectAsync<T>(
int expectedCount,
TimeSpan timeout,
Func<Task<T>> func,
CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: timeout,
expectedOccurrences: expectedCount,
matchedEventHandler: null,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand All @@ -367,16 +363,15 @@ public T Mute<T>(Func<T> func, CancellationToken cancellationToken = default)
/// <summary>
/// Async version of Mute
/// </summary>
public async Task<T> MuteAsync<T>(Func<Task<T>> func, CancellationToken cancellationToken = default)
public Task<T> MuteAsync<T>(Func<Task<T>> func, CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: null,
expectedOccurrences: null,
matchedEventHandler: null,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand Down
Loading

0 comments on commit e014825

Please sign in to comment.