diff --git a/projects/Directory.Packages.props b/projects/Directory.Packages.props
index bfb3b7869..2a5f3a6ce 100644
--- a/projects/Directory.Packages.props
+++ b/projects/Directory.Packages.props
@@ -16,7 +16,7 @@
-->
-
+
diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs
index 2ef059e74..107460fd8 100644
--- a/projects/Test/Integration/TestBasicPublish.cs
+++ b/projects/Test/Integration/TestBasicPublish.cs
@@ -322,73 +322,5 @@ public async Task TestPropertiesRoundtrip_Headers()
Assert.Equal(sendBody, consumeBody);
Assert.Equal("World", response);
}
-
- [Fact]
- public async Task TestPublisherConfirmationThrottling()
- {
- const int MaxOutstandingConfirms = 4;
-
- var channelOpts = new CreateChannelOptions
- {
- PublisherConfirmationsEnabled = true,
- PublisherConfirmationTrackingEnabled = true,
- OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
- };
-
- var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- var messagesPublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- Task publishTask = Task.Run(async () =>
- {
- ConnectionFactory cf = CreateConnectionFactory();
-
- await using (IConnection conn = await cf.CreateConnectionAsync())
- {
- await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
- {
- QueueDeclareOk q = await ch.QueueDeclareAsync();
-
- channelCreatedTcs.SetResult(true);
-
- int publishCount = 0;
- /*
- * Note: if batchSize equals MaxOutstandingConfirms,
- * a delay is added per-publish and this test takes much longer
- * to run. TODO figure out how the heck to test that
- */
- int batchSize = MaxOutstandingConfirms / 2;
- try
- {
- while (publishCount < 128)
- {
- var publishBatch = new List();
- for (int i = 0; i < batchSize; i++)
- {
- publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
- }
-
- foreach (ValueTask pt in publishBatch)
- {
- await pt;
- publishCount++;
- }
-
- publishBatch.Clear();
- publishBatch = null;
- }
-
- messagesPublishedTcs.SetResult(true);
- }
- catch (Exception ex)
- {
- messagesPublishedTcs.SetException(ex);
- }
- }
- }
- });
-
- await channelCreatedTcs.Task.WaitAsync(WaitSpan);
- await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
- await publishTask.WaitAsync(WaitSpan);
- }
}
}
diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs
index dc445147a..d6567ee1b 100644
--- a/projects/Test/Integration/TestToxiproxy.cs
+++ b/projects/Test/Integration/TestToxiproxy.cs
@@ -30,7 +30,9 @@
//---------------------------------------------------------------------------
using System;
+using System.Collections.Generic;
using System.Net;
+using System.Threading;
using System.Threading.Tasks;
using Integration;
using RabbitMQ.Client;
@@ -284,6 +286,108 @@ public async Task TestTcpReset_GH1464()
await recoveryTask;
}
+ [SkippableFact]
+ [Trait("Category", "Toxiproxy")]
+ public async Task TestPublisherConfirmationThrottling()
+ {
+ Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
+
+ const int TotalMessageCount = 64;
+ const int MaxOutstandingConfirms = 8;
+ const int BatchSize = MaxOutstandingConfirms * 2;
+
+ using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
+ await pm.InitializeAsync();
+
+ ConnectionFactory cf = CreateConnectionFactory();
+ cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort);
+ cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
+ cf.AutomaticRecoveryEnabled = true;
+
+ var channelOpts = new CreateChannelOptions
+ {
+ PublisherConfirmationsEnabled = true,
+ PublisherConfirmationTrackingEnabled = true,
+ OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
+ };
+
+ var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var messagesPublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ long publishCount = 0;
+ Task publishTask = Task.Run(async () =>
+ {
+ await using (IConnection conn = await cf.CreateConnectionAsync())
+ {
+ await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
+ {
+ QueueDeclareOk q = await ch.QueueDeclareAsync();
+
+ channelCreatedTcs.SetResult(true);
+
+ try
+ {
+ var publishBatch = new List();
+ while (publishCount < TotalMessageCount)
+ {
+ for (int i = 0; i < BatchSize; i++)
+ {
+ publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
+ }
+
+ foreach (ValueTask pt in publishBatch)
+ {
+ await pt;
+ Interlocked.Increment(ref publishCount);
+ }
+
+ publishBatch.Clear();
+ }
+
+ messagesPublishedTcs.SetResult(true);
+ }
+ catch (Exception ex)
+ {
+ messagesPublishedTcs.SetException(ex);
+ }
+ }
+ }
+ });
+
+ await channelCreatedTcs.Task;
+
+ const string toxicName = "rmq-localhost-bandwidth";
+ var bandwidthToxic = new BandwidthToxic();
+ bandwidthToxic.Name = toxicName;
+ bandwidthToxic.Attributes.Rate = 0;
+ bandwidthToxic.Toxicity = 1.0;
+ bandwidthToxic.Stream = ToxicDirection.DownStream;
+
+ await Task.Delay(TimeSpan.FromSeconds(1));
+
+ Task addToxicTask = pm.AddToxicAsync(bandwidthToxic);
+
+ while (true)
+ {
+ long publishCount0 = Interlocked.Read(ref publishCount);
+ await Task.Delay(TimeSpan.FromSeconds(5));
+ long publishCount1 = Interlocked.Read(ref publishCount);
+
+ if (publishCount0 == publishCount1)
+ {
+ // Publishing has "settled" due to being blocked
+ break;
+ }
+ }
+
+ await addToxicTask.WaitAsync(WaitSpan);
+ await pm.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan);
+
+ await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
+ await publishTask.WaitAsync(WaitSpan);
+
+ Assert.Equal(TotalMessageCount, publishCount);
+ }
+
private bool AreToxiproxyTestsEnabled
{
get