-
Notifications
You must be signed in to change notification settings - Fork 595
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Separated out connection recovery tests
This will allow these slow tests to run in parallel (cherry picked from commit d3d7195)
- Loading branch information
1 parent
bf86447
commit 79659d9
Showing
11 changed files
with
1,164 additions
and
741 deletions.
There are no files selected for viewing
174 changes: 174 additions & 0 deletions
174
projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
// This source code is dual-licensed under the Apache License, version | ||
// 2.0, and the Mozilla Public License, version 2.0. | ||
// | ||
// The APL v2.0: | ||
// | ||
//--------------------------------------------------------------------------- | ||
// Copyright (c) 2007-2020 VMware, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
//--------------------------------------------------------------------------- | ||
// | ||
// The MPL v2.0: | ||
// | ||
//--------------------------------------------------------------------------- | ||
// This Source Code Form is subject to the terms of the Mozilla Public | ||
// License, v. 2.0. If a copy of the MPL was not distributed with this | ||
// file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
// | ||
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. | ||
//--------------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Threading.Tasks; | ||
using RabbitMQ.Client; | ||
using RabbitMQ.Client.Impl; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
using QueueDeclareOk = RabbitMQ.Client.QueueDeclareOk; | ||
|
||
namespace Test.Integration.ConnectionRecovery | ||
{ | ||
public class TestBasicAckAndBasicNack : TestConnectionRecoveryBase | ||
{ | ||
private readonly string _queueName; | ||
|
||
public TestBasicAckAndBasicNack(ITestOutputHelper output) : base(output) | ||
{ | ||
_queueName = $"{nameof(TestBasicAckAndBasicNack)}-{Guid.NewGuid()}"; | ||
} | ||
|
||
public override async Task DisposeAsync() | ||
{ | ||
ConnectionFactory cf = CreateConnectionFactory(); | ||
cf.ClientProvidedName += "-TearDown"; | ||
using (IConnection conn = await cf.CreateConnectionAsync()) | ||
{ | ||
using (IChannel ch = await conn.CreateChannelAsync()) | ||
{ | ||
await ch.QueueDeleteAsync(_queueName); | ||
await ch.CloseAsync(); | ||
} | ||
await conn.CloseAsync(); | ||
} | ||
|
||
await base.DisposeAsync(); | ||
} | ||
|
||
[Fact] | ||
public async Task TestBasicAckAfterChannelRecovery() | ||
{ | ||
var allMessagesSeenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
var cons = new AckingBasicConsumer(_channel, TotalMessageCount, allMessagesSeenTcs); | ||
|
||
QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); | ||
string queueName = q.QueueName; | ||
Assert.Equal(queueName, _queueName); | ||
|
||
await _channel.BasicQosAsync(0, 1, false); | ||
await _channel.BasicConsumeAsync(queueName, false, cons); | ||
|
||
TaskCompletionSource<bool> sl = PrepareForShutdown(_conn); | ||
TaskCompletionSource<bool> rl = PrepareForRecovery(_conn); | ||
|
||
await PublishMessagesWhileClosingConnAsync(queueName); | ||
|
||
await WaitAsync(sl, "connection shutdown"); | ||
await WaitAsync(rl, "connection recovery"); | ||
await WaitAsync(allMessagesSeenTcs, "all messages seen"); | ||
} | ||
|
||
[Fact] | ||
public async Task TestBasicNackAfterChannelRecovery() | ||
{ | ||
var allMessagesSeenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
var cons = new NackingBasicConsumer(_channel, TotalMessageCount, allMessagesSeenTcs); | ||
|
||
QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); | ||
string queueName = q.QueueName; | ||
Assert.Equal(queueName, _queueName); | ||
|
||
await _channel.BasicQosAsync(0, 1, false); | ||
await _channel.BasicConsumeAsync(queueName, false, cons); | ||
|
||
TaskCompletionSource<bool> sl = PrepareForShutdown(_conn); | ||
TaskCompletionSource<bool> rl = PrepareForRecovery(_conn); | ||
|
||
await PublishMessagesWhileClosingConnAsync(queueName); | ||
|
||
await WaitAsync(sl, "connection shutdown"); | ||
await WaitAsync(rl, "connection recovery"); | ||
await WaitAsync(allMessagesSeenTcs, "all messages seen"); | ||
} | ||
|
||
[Fact] | ||
public async Task TestBasicRejectAfterChannelRecovery() | ||
{ | ||
var allMessagesSeenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
var cons = new RejectingBasicConsumer(_channel, TotalMessageCount, allMessagesSeenTcs); | ||
|
||
string queueName = (await _channel.QueueDeclareAsync(_queueName, false, false, false)).QueueName; | ||
Assert.Equal(queueName, _queueName); | ||
|
||
await _channel.BasicQosAsync(0, 1, false); | ||
await _channel.BasicConsumeAsync(queueName, false, cons); | ||
|
||
TaskCompletionSource<bool> sl = PrepareForShutdown(_conn); | ||
TaskCompletionSource<bool> rl = PrepareForRecovery(_conn); | ||
|
||
await PublishMessagesWhileClosingConnAsync(queueName); | ||
|
||
await WaitAsync(sl, "connection shutdown"); | ||
await WaitAsync(rl, "connection recovery"); | ||
await WaitAsync(allMessagesSeenTcs, "all messages seen"); | ||
} | ||
|
||
[Fact] | ||
public async Task TestBasicAckAfterBasicGetAndChannelRecovery() | ||
{ | ||
string q = GenerateQueueName(); | ||
await _channel.QueueDeclareAsync(q, false, false, false); | ||
// create an offset | ||
await _channel.BasicPublishAsync("", q, _messageBody); | ||
await Task.Delay(50); | ||
BasicGetResult g = await _channel.BasicGetAsync(q, false); | ||
await CloseAndWaitForRecoveryAsync(); | ||
Assert.True(_conn.IsOpen); | ||
Assert.True(_channel.IsOpen); | ||
// ack the message after recovery - this should be out of range and ignored | ||
await _channel.BasicAckAsync(g.DeliveryTag, false); | ||
// do a sync operation to 'check' there is no channel exception | ||
await _channel.BasicGetAsync(q, false); | ||
} | ||
|
||
[Fact] | ||
public async Task TestBasicAckEventHandlerRecovery() | ||
{ | ||
await _channel.ConfirmSelectAsync(); | ||
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
((AutorecoveringChannel)_channel).BasicAcks += (m, args) => tcs.SetResult(true); | ||
((AutorecoveringChannel)_channel).BasicNacks += (m, args) => tcs.SetResult(true); | ||
|
||
await CloseAndWaitForRecoveryAsync(); | ||
await CloseAndWaitForRecoveryAsync(); | ||
Assert.True(_channel.IsOpen); | ||
|
||
await WithTemporaryNonExclusiveQueueAsync(_channel, (ch, q) => | ||
{ | ||
return ch.BasicPublishAsync("", q, _messageBody).AsTask(); | ||
}); | ||
|
||
await WaitAsync(tcs, "basic acks/nacks"); | ||
} | ||
} | ||
} |
84 changes: 84 additions & 0 deletions
84
projects/Test/Integration/ConnectionRecovery/TestBasicConnectionRecovery.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
// This source code is dual-licensed under the Apache License, version | ||
// 2.0, and the Mozilla Public License, version 2.0. | ||
// | ||
// The APL v2.0: | ||
// | ||
//--------------------------------------------------------------------------- | ||
// Copyright (c) 2007-2020 VMware, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
//--------------------------------------------------------------------------- | ||
// | ||
// The MPL v2.0: | ||
// | ||
//--------------------------------------------------------------------------- | ||
// This Source Code Form is subject to the terms of the Mozilla Public | ||
// License, v. 2.0. If a copy of the MPL was not distributed with this | ||
// file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
// | ||
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. | ||
//--------------------------------------------------------------------------- | ||
|
||
using System.Threading.Tasks; | ||
using RabbitMQ.Client; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
|
||
namespace Test.Integration.ConnectionRecovery | ||
{ | ||
public class TestBasicConnectionRecovery : TestConnectionRecoveryBase | ||
{ | ||
public TestBasicConnectionRecovery(ITestOutputHelper output) : base(output) | ||
{ | ||
} | ||
|
||
[Fact] | ||
public async Task TestBasicConnectionRecoveryTest() | ||
{ | ||
Assert.True(_conn.IsOpen); | ||
await CloseAndWaitForRecoveryAsync(); | ||
Assert.True(_conn.IsOpen); | ||
} | ||
|
||
[Fact] | ||
public async Task TestBasicChannelRecovery() | ||
{ | ||
Assert.True(_channel.IsOpen); | ||
await CloseAndWaitForRecoveryAsync(); | ||
Assert.True(_channel.IsOpen); | ||
} | ||
|
||
[Fact] | ||
public Task TestClientNamedQueueRecovery() | ||
{ | ||
string s = "dotnet-client.test.recovery.q1"; | ||
return WithTemporaryNonExclusiveQueueAsync(_channel, async (m, q) => | ||
{ | ||
await CloseAndWaitForRecoveryAsync(); | ||
await AssertQueueRecoveryAsync(m, q, false); | ||
await _channel.QueueDeleteAsync(q); | ||
}, s); | ||
} | ||
|
||
[Fact] | ||
public Task TestClientNamedQueueRecoveryNoWait() | ||
{ | ||
string s = "dotnet-client.test.recovery.q1-nowait"; | ||
return WithTemporaryExclusiveQueueNoWaitAsync(_channel, async (ch, q) => | ||
{ | ||
await CloseAndWaitForRecoveryAsync(); | ||
await AssertExclusiveQueueRecoveryAsync(ch, q); | ||
}, s); | ||
} | ||
} | ||
} |
113 changes: 113 additions & 0 deletions
113
projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
// This source code is dual-licensed under the Apache License, version | ||
// 2.0, and the Mozilla Public License, version 2.0. | ||
// | ||
// The APL v2.0: | ||
// | ||
//--------------------------------------------------------------------------- | ||
// Copyright (c) 2007-2020 VMware, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
//--------------------------------------------------------------------------- | ||
// | ||
// The MPL v2.0: | ||
// | ||
//--------------------------------------------------------------------------- | ||
// This Source Code Form is subject to the terms of the Mozilla Public | ||
// License, v. 2.0. If a copy of the MPL was not distributed with this | ||
// file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
// | ||
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. | ||
//--------------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using RabbitMQ.Client; | ||
using RabbitMQ.Client.Events; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
using QueueDeclareOk = RabbitMQ.Client.QueueDeclareOk; | ||
|
||
namespace Test.Integration.ConnectionRecovery | ||
{ | ||
public class TestConnectionRecovery : TestConnectionRecoveryBase | ||
{ | ||
public TestConnectionRecovery(ITestOutputHelper output) : base(output) | ||
{ | ||
} | ||
|
||
[Fact] | ||
public async Task TestBindingRecovery_GH1035() | ||
{ | ||
const string routingKey = "unused"; | ||
byte[] body = GetRandomBody(); | ||
|
||
var receivedMessageSemaphore = new SemaphoreSlim(0, 1); | ||
|
||
Task MessageReceived(object sender, BasicDeliverEventArgs e) | ||
{ | ||
receivedMessageSemaphore.Release(); | ||
return Task.CompletedTask; | ||
} | ||
|
||
string exchangeName = $"ex-gh-1035-{Guid.NewGuid()}"; | ||
string queueName = $"q-gh-1035-{Guid.NewGuid()}"; | ||
|
||
await _channel.ExchangeDeclareAsync(exchange: exchangeName, | ||
type: "fanout", durable: false, autoDelete: true, | ||
arguments: null); | ||
|
||
QueueDeclareOk q0 = await _channel.QueueDeclareAsync(queue: queueName, exclusive: true); | ||
Assert.Equal(queueName, q0); | ||
|
||
await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey); | ||
|
||
await _channel.CloseAsync(); | ||
_channel.Dispose(); | ||
_channel = null; | ||
|
||
_channel = await _conn.CreateChannelAsync(); | ||
|
||
await _channel.ExchangeDeclareAsync(exchange: exchangeName, | ||
type: "fanout", durable: false, autoDelete: true, | ||
arguments: null); | ||
|
||
QueueDeclareOk q1 = await _channel.QueueDeclareAsync(queue: queueName, exclusive: true); | ||
Assert.Equal(queueName, q1.QueueName); | ||
|
||
await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey); | ||
|
||
var c = new AsyncEventingBasicConsumer(_channel); | ||
c.Received += MessageReceived; | ||
await _channel.BasicConsumeAsync(queue: queueName, autoAck: true, consumer: c); | ||
|
||
using (IChannel pubCh = await _conn.CreateChannelAsync()) | ||
{ | ||
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body); | ||
await pubCh.CloseAsync(); | ||
} | ||
|
||
Assert.True(await receivedMessageSemaphore.WaitAsync(WaitSpan)); | ||
|
||
await CloseAndWaitForRecoveryAsync(); | ||
|
||
using (IChannel pubCh = await _conn.CreateChannelAsync()) | ||
{ | ||
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body); | ||
await pubCh.CloseAsync(); | ||
} | ||
|
||
Assert.True(await receivedMessageSemaphore.WaitAsync(WaitSpan)); | ||
} | ||
} | ||
} |
Oops, something went wrong.