Skip to content

Commit

Permalink
Modify TestPublisherConfirmationThrottling to use toxiproxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Oct 22, 2024
1 parent e02734d commit 35f2757
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 69 deletions.
2 changes: 1 addition & 1 deletion projects/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
-->
<PackageVersion Include="System.IO.Pipelines" Version="8.0.0" />
<PackageVersion Include="System.Net.Http" Version="4.3.4" />
<PackageVersion Include="System.Threading.RateLimiting" Version="7.0.1" />
<PackageVersion Include="System.Threading.RateLimiting" Version="8.0.0" />
<PackageVersion Include="WireMock.Net" Version="1.5.62" />
<PackageVersion Include="xunit" Version="2.9.0" />
<PackageVersion Include="xunit.abstractions" Version="2.0.3" />
Expand Down
68 changes: 0 additions & 68 deletions projects/Test/Integration/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,73 +322,5 @@ public async Task TestPropertiesRoundtrip_Headers()
Assert.Equal(sendBody, consumeBody);
Assert.Equal("World", response);
}

[Fact]
public async Task TestPublisherConfirmationThrottling()
{
const int MaxOutstandingConfirms = 4;

var channelOpts = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
};

var channelCreatedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var messagesPublishedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Task publishTask = Task.Run(async () =>
{
ConnectionFactory cf = CreateConnectionFactory();

await using (IConnection conn = await cf.CreateConnectionAsync())
{
await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
{
QueueDeclareOk q = await ch.QueueDeclareAsync();

channelCreatedTcs.SetResult(true);

int publishCount = 0;
/*
* Note: if batchSize equals MaxOutstandingConfirms,
* a delay is added per-publish and this test takes much longer
* to run. TODO figure out how the heck to test that
*/
int batchSize = MaxOutstandingConfirms / 2;
try
{
while (publishCount < 128)
{
var publishBatch = new List<ValueTask>();
for (int i = 0; i < batchSize; i++)
{
publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
}

foreach (ValueTask pt in publishBatch)
{
await pt;
publishCount++;
}

publishBatch.Clear();
publishBatch = null;
}

messagesPublishedTcs.SetResult(true);
}
catch (Exception ex)
{
messagesPublishedTcs.SetException(ex);
}
}
}
});

await channelCreatedTcs.Task.WaitAsync(WaitSpan);
await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
await publishTask.WaitAsync(WaitSpan);
}
}
}
104 changes: 104 additions & 0 deletions projects/Test/Integration/TestToxiproxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Integration;
using RabbitMQ.Client;
Expand Down Expand Up @@ -284,6 +286,108 @@ public async Task TestTcpReset_GH1464()
await recoveryTask;
}

[SkippableFact]
[Trait("Category", "Toxiproxy")]
public async Task TestPublisherConfirmationThrottling()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");

const int TotalMessageCount = 64;
const int MaxOutstandingConfirms = 8;
const int BatchSize = MaxOutstandingConfirms * 2;

using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
await pm.InitializeAsync();

ConnectionFactory cf = CreateConnectionFactory();
cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort);
cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
cf.AutomaticRecoveryEnabled = true;

var channelOpts = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
};

var channelCreatedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var messagesPublishedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
long publishCount = 0;
Task publishTask = Task.Run(async () =>
{
await using (IConnection conn = await cf.CreateConnectionAsync())
{
await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
{
QueueDeclareOk q = await ch.QueueDeclareAsync();

channelCreatedTcs.SetResult(true);

try
{
var publishBatch = new List<ValueTask>();
while (publishCount < TotalMessageCount)
{
for (int i = 0; i < BatchSize; i++)
{
publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
}

foreach (ValueTask pt in publishBatch)
{
await pt;
Interlocked.Increment(ref publishCount);
}

publishBatch.Clear();
}

messagesPublishedTcs.SetResult(true);
}
catch (Exception ex)
{
messagesPublishedTcs.SetException(ex);
}
}
}
});

await channelCreatedTcs.Task;

const string toxicName = "rmq-localhost-bandwidth";
var bandwidthToxic = new BandwidthToxic();
bandwidthToxic.Name = toxicName;
bandwidthToxic.Attributes.Rate = 0;
bandwidthToxic.Toxicity = 1.0;
bandwidthToxic.Stream = ToxicDirection.DownStream;

await Task.Delay(TimeSpan.FromSeconds(1));

Task<BandwidthToxic> addToxicTask = pm.AddToxicAsync(bandwidthToxic);

while (true)
{
long publishCount0 = Interlocked.Read(ref publishCount);
await Task.Delay(TimeSpan.FromSeconds(5));
long publishCount1 = Interlocked.Read(ref publishCount);

if (publishCount0 == publishCount1)
{
// Publishing has "settled" due to being blocked
break;
}
}

await addToxicTask.WaitAsync(WaitSpan);
await pm.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan);

await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
await publishTask.WaitAsync(WaitSpan);

Assert.Equal(TotalMessageCount, publishCount);
}

private bool AreToxiproxyTestsEnabled
{
get
Expand Down

0 comments on commit 35f2757

Please sign in to comment.