Skip to content

Commit

Permalink
Add pull argument to Ubuntu GHA setup script
Browse files Browse the repository at this point in the history
Add assertion to see if test fails earlier

* Fix `TestBindingRecovery_GH1035`

* Fix `QueueUnbindAsync` default argument in `IChannel`

* Rename variables in test to make it easier to understand

* Check for unexpected callback exceptions.

* Add `ConnectionRecoveryError` handler check

* Modify `TestPublishRpcRightAfterReconnect` to just log error condition

* Disregard "unexpected" exceptions in `TestConnectionRecovery`

* Do not use the `amq.fanout` fanout exchange because messages published to it could end up in tests that should note have messages, like  `TestConcurrentQueueDeclareAndBindAsync`
  • Loading branch information
lukebakken committed Apr 30, 2024
1 parent 79659d9 commit 10e58e4
Show file tree
Hide file tree
Showing 25 changed files with 267 additions and 85 deletions.
16 changes: 13 additions & 3 deletions .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ else
readonly run_toxiproxy='false'
fi

if [[ $2 == 'pull' ]]
then
readonly docker_pull_args='--pull always'
else
readonly docker_pull_args=''
fi

set -o nounset

declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
Expand All @@ -43,7 +50,8 @@ function start_toxiproxy
# sudo ss -4nlp
echo "[INFO] starting Toxiproxy server docker container"
docker rm --force "$toxiproxy_docker_name" 2>/dev/null || echo "[INFO] $toxiproxy_docker_name was not running"
docker run --detach \
# shellcheck disable=SC2086
docker run --detach $docker_pull_args \
--name "$toxiproxy_docker_name" \
--hostname "$toxiproxy_docker_name" \
--publish 8474:8474 \
Expand All @@ -58,7 +66,8 @@ function start_rabbitmq
echo "[INFO] starting RabbitMQ server docker container"
chmod 0777 "$GITHUB_WORKSPACE/.ci/ubuntu/log"
docker rm --force "$rabbitmq_docker_name" 2>/dev/null || echo "[INFO] $rabbitmq_docker_name was not running"
docker run --detach \
# shellcheck disable=SC2086
docker run --detach $docker_pull_args \
--name "$rabbitmq_docker_name" \
--hostname "$rabbitmq_docker_name" \
--publish 5671:5671 \
Expand Down Expand Up @@ -101,7 +110,8 @@ function wait_rabbitmq

function get_rabbitmq_id
{
local rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")"
local rabbitmq_docker_id
rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")"
echo "[INFO] '$rabbitmq_docker_name' docker id is '$rabbitmq_docker_id'"
if [[ -v GITHUB_OUTPUT ]]
then
Expand Down
3 changes: 1 addition & 2 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ if ($RunTests)
dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
if ($LASTEXITCODE -ne 0)
{
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
Exit 1
Write-Host "[WARNING] tests errored, exiting" -Foreground "Red"
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.IChannel.QueueDeclarePassiveAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
~RabbitMQ.Client.IChannel.QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
~RabbitMQ.Client.IChannel.QueuePurgeAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
~RabbitMQ.Client.IChannel.QueueUnbindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.QueueUnbindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxCommitAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxRollbackAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ Task QueueBindAsync(string queue, string exchange, string routingKey,
/// Routing key must be shorter than 255 bytes.
/// </remarks>
Task QueueUnbindAsync(string queue, string exchange, string routingKey,
IDictionary<string, object> arguments,
IDictionary<string, object> arguments = null,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
7 changes: 5 additions & 2 deletions projects/RabbitMQ.Client/client/impl/RecordedBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ public Task RecoverAsync(IChannel channel)

public bool Equals(RecordedBinding other)
{
return _isQueueBinding == other._isQueueBinding && _destination == other._destination && _source == other._source &&
_routingKey == other._routingKey && _arguments == other._arguments;
return _isQueueBinding == other._isQueueBinding &&
_destination == other._destination &&
_source == other._source &&
_routingKey == other._routingKey &&
_arguments == other._arguments;
}

public override bool Equals(object? obj)
Expand Down
130 changes: 111 additions & 19 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public abstract class IntegrationFixture : IAsyncLifetime
private static readonly bool s_isVerbose = false;
private static int _connectionIdx = 0;

private Exception _connectionCallbackException;
private Exception _connectionRecoveryException;
private Exception _channelCallbackException;

protected readonly RabbitMQCtl _rabbitMQCtl;

protected ConnectionFactory _connFactory;
Expand All @@ -77,7 +81,12 @@ public abstract class IntegrationFixture : IAsyncLifetime

static IntegrationFixture()
{

#if NET6_0_OR_GREATER
S_Random = Random.Shared;
#else
S_Random = new Random();
#endif
s_isRunningInCI = InitIsRunningInCI();
s_isVerbose = InitIsVerbose();

Expand Down Expand Up @@ -146,8 +155,10 @@ public virtual async Task InitializeAsync()

if (IsVerbose)
{
AddCallbackHandlers();
AddCallbackShutdownHandlers();
}

AddCallbackExceptionHandlers();
}

if (_connFactory.AutomaticRecoveryEnabled)
Expand Down Expand Up @@ -181,42 +192,122 @@ public virtual async Task DisposeAsync()
_channel = null;
_conn = null;
}

DisposeAssertions();
}

protected virtual void DisposeAssertions()
{
if (_connectionRecoveryException != null)
{
Assert.Fail($"unexpected connection recovery exception: {_connectionRecoveryException}");
}

if (_connectionCallbackException != null)
{
Assert.Fail($"unexpected connection callback exception: {_connectionCallbackException}");
}

if (_channelCallbackException != null)
{
Assert.Fail($"unexpected channel callback exception: {_channelCallbackException}");
}
}

protected virtual void AddCallbackHandlers()
protected void AddCallbackExceptionHandlers()
{
if (_conn != null)
{
_conn.CallbackException += (o, ea) =>
_conn.ConnectionRecoveryError += (s, ea) =>
{
_output.WriteLine("{0} connection callback exception: {1}",
_testDisplayName, ea.Exception);
_connectionRecoveryException = ea.Exception;

if (IsVerbose)
{
try
{
_output.WriteLine($"{0} connection recovery exception: {1}",
_testDisplayName, _connectionRecoveryException);
}
catch (InvalidOperationException)
{
}
}
};

_conn.ConnectionShutdown += (o, ea) =>
_conn.CallbackException += (o, ea) =>
{
HandleConnectionShutdown(_conn, ea, (args) =>
_connectionCallbackException = ea.Exception;

if (IsVerbose)
{
_output.WriteLine("{0} connection shutdown, args: {1}",
_testDisplayName, args);
});
try
{
_output.WriteLine("{0} connection callback exception: {1}",
_testDisplayName, _connectionCallbackException);
}
catch (InvalidOperationException)
{
}
}
};
}

if (_channel != null)
{
_channel.CallbackException += (o, ea) =>
{
_output.WriteLine("{0} channel callback exception: {1}",
_testDisplayName, ea.Exception);
_channelCallbackException = ea.Exception;

if (IsVerbose)
{
try
{
_output.WriteLine("{0} channel callback exception: {1}",
_testDisplayName, _channelCallbackException);
}
catch (InvalidOperationException)
{
}
}
};
}
}

protected void AddCallbackShutdownHandlers()
{
if (_conn != null)
{
_conn.ConnectionShutdown += (o, ea) =>
{
HandleConnectionShutdown(_conn, ea, (args) =>
{
try
{
_output.WriteLine("{0} connection shutdown, args: {1}",
_testDisplayName, args);
}
catch (InvalidOperationException)
{
}
});
};
}

if (_channel != null)
{
_channel.ChannelShutdown += (o, ea) =>
{
HandleChannelShutdown(_channel, ea, (args) =>
{
_output.WriteLine("{0} channel shutdown, args: {1}",
_testDisplayName, args);
try
{
_output.WriteLine("{0} channel shutdown, args: {1}",
_testDisplayName, args);
}
catch (InvalidOperationException)
{
}
});
};
}
Expand Down Expand Up @@ -405,6 +496,11 @@ protected static Task AssertRanToCompletion(IEnumerable<Task> tasks)
return DoAssertRanToCompletion(tasks);
}

internal static void AssertRecordedQueues(AutorecoveringConnection c, int n)
{
Assert.Equal(n, c.RecordedQueuesCount);
}

protected static Task WaitAsync(TaskCompletionSource<bool> tcs, string desc)
{
return WaitAsync(tcs, WaitSpan, desc);
Expand Down Expand Up @@ -524,11 +620,7 @@ protected static string GetUniqueString(ushort length)
protected static byte[] GetRandomBody(ushort size = 1024)
{
var body = new byte[size];
#if NET6_0_OR_GREATER
Random.Shared.NextBytes(body);
#else
S_Random.NextBytes(body);
#endif
return body;
}

Expand All @@ -543,7 +635,7 @@ protected static TaskCompletionSource<bool> PrepareForRecovery(IConnection conn)
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

AutorecoveringConnection aconn = conn as AutorecoveringConnection;
aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true);
aconn.RecoverySucceeded += (source, ea) => tcs.TrySetResult(true);

return tcs;
}
Expand Down
10 changes: 3 additions & 7 deletions projects/Test/Common/TestConnectionRecoveryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class TestConnectionRecoveryBase : IntegrationFixture
protected const ushort TotalMessageCount = 16384;
protected const ushort CloseAtCount = 16;

public TestConnectionRecoveryBase(ITestOutputHelper output) : base(output)
public TestConnectionRecoveryBase(ITestOutputHelper output, bool dispatchConsumersAsync = false)
: base(output, dispatchConsumersAsync: dispatchConsumersAsync)
{
_messageBody = GetRandomBody(4096);
}
Expand Down Expand Up @@ -107,11 +108,6 @@ internal void AssertRecordedExchanges(AutorecoveringConnection c, int n)
Assert.Equal(n, c.RecordedExchangesCount);
}

internal void AssertRecordedQueues(AutorecoveringConnection c, int n)
{
Assert.Equal(n, c.RecordedQueuesCount);
}

internal Task<AutorecoveringConnection> CreateAutorecoveringConnectionAsync()
{
return CreateAutorecoveringConnectionAsync(RecoveryInterval);
Expand Down Expand Up @@ -226,7 +222,7 @@ protected static TaskCompletionSource<bool> PrepareForShutdown(IConnection conn)
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

AutorecoveringConnection aconn = conn as AutorecoveringConnection;
aconn.ConnectionShutdown += (c, args) => tcs.SetResult(true);
aconn.ConnectionShutdown += (c, args) => tcs.TrySetResult(true);

return tcs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ namespace Test.Integration.ConnectionRecovery
{
public class TestConnectionRecovery : TestConnectionRecoveryBase
{
public TestConnectionRecovery(ITestOutputHelper output) : base(output)
public TestConnectionRecovery(ITestOutputHelper output)
: base(output, dispatchConsumersAsync: true)
{
}

Expand All @@ -60,8 +61,9 @@ Task MessageReceived(object sender, BasicDeliverEventArgs e)
return Task.CompletedTask;
}

string exchangeName = $"ex-gh-1035-{Guid.NewGuid()}";
string queueName = $"q-gh-1035-{Guid.NewGuid()}";
var guid = Guid.NewGuid();
string exchangeName = $"ex-gh-1035-{guid}";
string queueName = $"q-gh-1035-{guid}";

await _channel.ExchangeDeclareAsync(exchange: exchangeName,
type: "fanout", durable: false, autoDelete: true,
Expand All @@ -78,6 +80,9 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,

_channel = await _conn.CreateChannelAsync();

// NB: add this for debugging
// AddCallbackHandlers();

await _channel.ExchangeDeclareAsync(exchange: exchangeName,
type: "fanout", durable: false, autoDelete: true,
arguments: null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ namespace Test.Integration.ConnectionRecovery
{
public class TestConsumerRecovery : TestConnectionRecoveryBase
{
public TestConsumerRecovery(ITestOutputHelper output) : base(output)
public TestConsumerRecovery(ITestOutputHelper output)
: base(output, dispatchConsumersAsync: true)
{
}

Expand All @@ -57,7 +58,7 @@ public async Task TestConsumerRecoveryWithManyConsumers()
}

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecovery += (prev, current) => tcs.SetResult(true);
((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecovery += (prev, current) => tcs.TrySetResult(true);

await CloseAndWaitForRecoveryAsync();
await WaitAsync(tcs, "consumer tag change after recovery");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
{
string q = Guid.NewGuid().ToString();
await _channel.QueueDeclareAsync(q, false, false, true);
var dummy = new AsyncEventingBasicConsumer(_channel);
var dummy = new EventingBasicConsumer(_channel);
string tag = await _channel.BasicConsumeAsync(q, true, dummy);
await _channel.BasicCancelAsync(tag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task TestRecoveryEventHandlersOnChannel()
public async Task TestRecoveringConsumerHandlerOnConnection(int iterations)
{
string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName;
var cons = new AsyncEventingBasicConsumer(_channel);
var cons = new EventingBasicConsumer(_channel);
await _channel.BasicConsumeAsync(q, true, cons);

int counter = 0;
Expand All @@ -100,7 +100,7 @@ public async Task TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePas
{
var myArgs = new Dictionary<string, object> { { "first-argument", "some-value" } };
string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName;
var cons = new AsyncEventingBasicConsumer(_channel);
var cons = new EventingBasicConsumer(_channel);
string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: myArgs);

bool ctagMatches = false;
Expand Down
Loading

0 comments on commit 10e58e4

Please sign in to comment.