Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make JoinAsync and JoinSeedNodesAsync more robust by checking cluster UP status #6033

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
407fb02
Make JoinAsync and JoinSeedNodesAsync more robust by using an async s…
Arkatufus Jul 5, 2022
81c2d9c
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 5, 2022
12f6c16
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 6, 2022
8addeaf
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Aaronontheweb Jul 6, 2022
ed586f2
Update how join state is being handled
Arkatufus Jul 14, 2022
e49465c
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 14, 2022
15ba469
Fix missing exception
Arkatufus Jul 14, 2022
dfea2e8
Merge branch 'cluster/fix_JoinAsync_and_JoinSeedNodesAsync' of github…
Arkatufus Jul 14, 2022
54e4d85
Update unit test
Arkatufus Jul 14, 2022
79d8f77
Update API Verify list
Arkatufus Jul 14, 2022
5bd8b66
update IsUp check
Arkatufus Jul 14, 2022
093ff5b
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 19, 2022
be804a9
Change IsUp implementation to check SelfMember instead
Arkatufus Jul 22, 2022
0f90d06
Remove state handling code
Arkatufus Jul 22, 2022
b5d6c59
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 22, 2022
2b55eef
Remove spec
Arkatufus Jul 22, 2022
3b2472e
Merge branch 'cluster/fix_JoinAsync_and_JoinSeedNodesAsync' of github…
Arkatufus Jul 22, 2022
080256c
Revert state changes
Arkatufus Jul 22, 2022
10e6a8e
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Aaronontheweb Jul 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util.Internal;
Expand Down Expand Up @@ -317,6 +318,42 @@ await AwaitAssertAsync(() =>
}
}

[Fact]
public async Task Should_be_able_to_invoke_JoinAsync_multiple_times()
{
var timeout = TimeSpan.FromSeconds(10);
var probe = CreateTestProbe("error_probe");
Sys.EventStream.Subscribe(probe, typeof(Error));
try
{
var task1 = _cluster.JoinAsync(_selfAddress);

var task2 = _cluster.JoinAsync(_selfAddress);
var error = await probe.ExpectMsgAsync<Error>();
error.Message.Should().Be("Another async cluster join is already in progress");
task2.Should().Be(task1);

var task3 = _cluster.JoinAsync(_selfAddress);
error = await probe.ExpectMsgAsync<Error>();
error.Message.Should().Be("Another async cluster join is already in progress");
task3.Should().Be(task1);

await task1.ShouldCompleteWithin(timeout);
LeaderActions();
// Member should already be up
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.IMemberEvent));
await ExpectMsgAsync<ClusterEvent.MemberUp>();

// join second time - response should be immediate success
await _cluster.JoinSeedNodesAsync(new[] { _selfAddress }).ShouldCompleteWithin(100.Milliseconds());
}
finally
{
_cluster.Shutdown();
}
}


[Fact]
public async Task A_cluster_must_be_able_to_JoinAsync()
{
Expand Down Expand Up @@ -409,6 +446,43 @@ await Awaiting(async () =>
.ShouldCompleteWithin(timeout);
}

[Fact]
public async Task Should_be_able_to_invoke_JoinSeedNodesAsync_multiple_times()
{
var timeout = TimeSpan.FromSeconds(10);
var seed = new[] { _selfAddress };
var probe = CreateTestProbe("error_probe");
Sys.EventStream.Subscribe(probe, typeof(Error));
try
{
var task1 = _cluster.JoinSeedNodesAsync(seed);

var task2 = _cluster.JoinSeedNodesAsync(seed);
var error = await probe.ExpectMsgAsync<Error>();
error.Message.Should().Be("Another async cluster join is already in progress");
task2.Should().Be(task1);

var task3 = _cluster.JoinSeedNodesAsync(seed);
error = await probe.ExpectMsgAsync<Error>();
error.Message.Should().Be("Another async cluster join is already in progress");
task3.Should().Be(task1);

await task1.ShouldCompleteWithin(timeout);

LeaderActions();
// Member should already be up
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.IMemberEvent));
await ExpectMsgAsync<ClusterEvent.MemberUp>();

// join second time - response should be immediate success
await _cluster.JoinSeedNodesAsync(new[] { _selfAddress }).ShouldCompleteWithin(100.Milliseconds());
}
finally
{
_cluster.Shutdown();
}
}

[Fact]
public async Task A_cluster_JoinSeedNodesAsync_must_fail_if_could_not_connect_to_cluster()
{
Expand Down
71 changes: 71 additions & 0 deletions src/core/Akka.Cluster/AsyncJoinState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// -----------------------------------------------------------------------
// <copyright file="AsyncJoinState.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Cluster
{
internal sealed class AsyncJoinState: IDisposable
{
private readonly TaskCompletionSource<NotUsed> _completion;
private CancellationTokenSource _timeoutCts;
private bool _isDisposed;

public Task Task => _completion.Task;
public bool IsCompleted => _completion.Task.IsCompleted; // IsCompleted covers Faulted, Canceled, and RanToCompletion

public AsyncJoinState(
Cluster cluster,
Exception failException,
Action onComplete,
CancellationToken token = default)
{
Debug.Assert(cluster != null, $"{nameof(cluster)} != null");
Debug.Assert(failException != null, $"{nameof(failException)} != null");
Debug.Assert(onComplete != null, $"{nameof(onComplete)} != null");

_completion = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);

_timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
_timeoutCts.CancelAfter(cluster.Settings.SeedNodeTimeout);
_timeoutCts.Token.Register(() =>
{
if (IsCompleted)
return;

_completion.TrySetException(failException);
onComplete();
Dispose();
});

cluster.RegisterOnMemberUp(() =>
{
if (IsCompleted)
return;

_completion.TrySetResult(NotUsed.Instance);
onComplete();
Dispose();
});
}

public void Dispose()
{
if(!_isDisposed)
{
//Clean up managed resources
_timeoutCts?.Dispose();
//Clean up unmanaged resources
_timeoutCts = null;
}
_isDisposed = true;
}
}
}
71 changes: 35 additions & 36 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ public void Join(Address address)
ClusterCore.Tell(new ClusterUserAction.JoinTo(FillLocal(address)));
}

// This object holds the current asynchronous join state
private AsyncJoinState _asyncJoinState;

/// <summary>
/// Try to asynchronously join this cluster node specified by <paramref name="address"/>.
/// A <see cref="Join"/> command is sent to the node to join. Returned task will be completed
Expand All @@ -264,27 +267,24 @@ public void Join(Address address)
/// <returns>Task which completes, once current cluster node reaches <see cref="MemberStatus.Up"/> state.</returns>
public Task JoinAsync(Address address, CancellationToken token = default)
{
var completion = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);

var timeout = Settings.RetryUnsuccessfulJoinAfter ?? TimeSpan.FromSeconds(10);
var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
timeoutCts.CancelAfter(timeout);
timeoutCts.Token.Register(() =>
if (_asyncJoinState != null)
Arkatufus marked this conversation as resolved.
Show resolved Hide resolved
{
timeoutCts.Dispose();
completion.TrySetException(new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided address: {address}"));
});
_log.Error("Another async cluster join is already in progress");
Arkatufus marked this conversation as resolved.
Show resolved Hide resolved
return _asyncJoinState.Task;
Arkatufus marked this conversation as resolved.
Show resolved Hide resolved
}

RegisterOnMemberUp(() =>
{
timeoutCts.Dispose();
completion.TrySetResult(NotUsed.Instance);
});

_asyncJoinState = new AsyncJoinState(
cluster: this,
failException: new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided address: {address}"),
onComplete: () => _asyncJoinState = null,
token: token);

// Guard against possibility that _asyncJoinState got nullified immediately
var task = _asyncJoinState.Task;
Join(address);

return completion.Task.WithCancellation(token);
return task;
}

private Address FillLocal(Address address)
Expand Down Expand Up @@ -333,28 +333,27 @@ public void JoinSeedNodes(IEnumerable<Address> seedNodes)
/// </summary>
/// <param name="seedNodes">TBD</param>
/// <param name="token">TBD</param>
public Task JoinSeedNodesAsync(IEnumerable<Address> seedNodes, CancellationToken token = default(CancellationToken))
public Task JoinSeedNodesAsync(IEnumerable<Address> seedNodes, CancellationToken token = default)
{
var completion = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);
Arkatufus marked this conversation as resolved.
Show resolved Hide resolved

var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
timeoutCts.CancelAfter(Settings.SeedNodeTimeout);
timeoutCts.Token.Register(() =>
if (_asyncJoinState != null)
{
timeoutCts.Dispose();
completion.TrySetException(new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", seedNodes)}."));
});
_log.Error("Another async cluster join is already in progress");
return _asyncJoinState.Task;
}

RegisterOnMemberUp(() =>
{
timeoutCts.Dispose();
completion.TrySetResult(NotUsed.Instance);
});

JoinSeedNodes(seedNodes);

return completion.Task.WithCancellation(token);
var nodes = seedNodes.ToList();
_asyncJoinState = new AsyncJoinState(
cluster: this,
failException: new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", nodes)}."),
onComplete: () => _asyncJoinState = null,
Arkatufus marked this conversation as resolved.
Show resolved Hide resolved
token: token);

// Guard against possibility that _asyncJoinState got nullified immediately
var task = _asyncJoinState.Task;
JoinSeedNodes(nodes);

return task;
}

/// <summary>
Expand Down