Skip to content

Commit

Permalink
Disabling delayed delivery throws exception on startup (#1423) (#1429)
Browse files Browse the repository at this point in the history
* fix variable can be null if disabledelayeddelivery is true

* Add transport test to verify that transport functions when delayed delivery is turned off

* Cleanup

* Simplify

---------

Co-authored-by: Phil Bastian <[email protected]>
  • Loading branch information
andreasohlund and PhilBastian authored Sep 25, 2024
1 parent ebc6e71 commit 3f053a3
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -49,46 +51,45 @@ public async Task Cleanup(CancellationToken cancellationToken = default)
return;
}

if (string.IsNullOrWhiteSpace(connectionString) == false)
if (string.IsNullOrWhiteSpace(connectionString))
{
return;
}

var queues = new List<string> { errorQueueName, inputQueueName };

if (!postgreSqlTransport.DisableDelayedDelivery)
{
var delayedDeliveryQueueName = postgreSqlTransport.Testing.DelayedDeliveryQueue
.Replace("\"public\".", string.Empty)
.Replace("\"", string.Empty);

var queues = new[]
{
errorQueueName,
inputQueueName,
delayedDeliveryQueueName
};
queues.Add(delayedDeliveryQueueName);
}

using var conn = new NpgsqlConnection(connectionString);
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
using var conn = new NpgsqlConnection(connectionString);
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);

foreach (var queue in queues)
foreach (var queue in queues.Where(q => !string.IsNullOrWhiteSpace(q)))
{
using (var comm = conn.CreateCommand())
{
if (string.IsNullOrWhiteSpace(queue) == false)
{
using (var comm = conn.CreateCommand())
{
comm.CommandText = $"DROP TABLE IF EXISTS \"public\".\"{queue}\"; " +
$"DROP SEQUENCE IF EXISTS \"public\".\"{queue}_seq_seq\";";

await comm.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}
comm.CommandText = $"DROP TABLE IF EXISTS \"public\".\"{queue}\"; " +
$"DROP SEQUENCE IF EXISTS \"public\".\"{queue}_seq_seq\";";

await comm.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}

var subscriptionTableName = postgreSqlTransport.Testing.SubscriptionTable;
var subscriptionTableName = postgreSqlTransport.Testing.SubscriptionTable;

if (!string.IsNullOrEmpty(subscriptionTableName))
if (!string.IsNullOrEmpty(subscriptionTableName))
{
using (var comm = conn.CreateCommand())
{
using (var comm = conn.CreateCommand())
{
comm.CommandText = $"DROP TABLE IF EXISTS {subscriptionTableName};";
comm.CommandText = $"DROP TABLE IF EXISTS {subscriptionTableName};";

await comm.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
await comm.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public void SetUp()
testCancellationTokenSource = Debugger.IsAttached ? new CancellationTokenSource() : new CancellationTokenSource(TestTimeout);
receiver = null;
registrations = [];
CustomizeTransportDefinition = _ => { };
}
protected static IConfigureTransportInfrastructure CreateConfigurer()
{
Expand Down Expand Up @@ -114,18 +115,19 @@ protected async Task Initialize(OnMessage onMessage, OnError onError, TransportT
},
true);

var transport = configurer.CreateTransportDefinition();
var transportDefinition = configurer.CreateTransportDefinition();

IgnoreUnsupportedTransactionModes(transport, transactionMode);
CustomizeTransportDefinition(transportDefinition);
IgnoreUnsupportedTransactionModes(transportDefinition, transactionMode);

if (OperatingSystem.IsWindows() && transactionMode == TransportTransactionMode.TransactionScope)
{
TransactionManager.ImplicitDistributedTransactions = true;
}

transport.TransportTransactionMode = transactionMode;
transportDefinition.TransportTransactionMode = transactionMode;

transportInfrastructure = await configurer.Configure(transport, hostSettings, new QueueAddress(InputQueueName), ErrorQueueName, cancellationToken);
transportInfrastructure = await configurer.Configure(transportDefinition, hostSettings, new QueueAddress(InputQueueName), ErrorQueueName, cancellationToken);

receiver = transportInfrastructure.Receivers.Single().Value;

Expand Down Expand Up @@ -301,6 +303,7 @@ public static uint CreateDeterministicHash(string input)
protected string ErrorQueueName;
protected static TransportTestLoggerFactory LogFactory;
protected static TimeSpan TestTimeout = TimeSpan.FromSeconds(30);
protected Action<TransportDefinition> CustomizeTransportDefinition;

string testId;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace NServiceBus.TransportTests
{
using System.Threading.Tasks;
using NUnit.Framework;
using Transport;

public class When_delayed_delivery_disabled : NServiceBusTransportTest
{
[TestCase(TransportTransactionMode.None)]
[TestCase(TransportTransactionMode.ReceiveOnly)]
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
[TestCase(TransportTransactionMode.TransactionScope)]
public async Task Should_work(TransportTransactionMode transactionMode)
{
CustomizeTransportDefinition = definition =>
{
((PostgreSqlTransport)definition).DisableDelayedDelivery = true;
};

var onReceived = CreateTaskCompletionSource<MessageContext>();

await StartPump(
(context, _) =>
{
onReceived.SetResult(context);
return Task.CompletedTask;
},
(_, __) => Task.FromResult(ErrorHandleResult.Handled),
transactionMode);

await SendMessage(InputQueueName);

var ctx = await onReceived.Task;

Assert.That(ctx, Is.Not.Null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async Task ConfigureReceiveInfrastructure(CancellationToken cancellationToken)

//Create delayed delivery infrastructure
CanonicalQueueAddress delayedQueueCanonicalAddress = null;
if (transport.DisableDelayedDelivery == false)
if (!transport.DisableDelayedDelivery)
{
var delayedDelivery = transport.DelayedDelivery;

Expand Down Expand Up @@ -213,8 +213,7 @@ async Task ConfigureReceiveInfrastructure(CancellationToken cancellationToken)

var queueNameExceedsLimit = queuesToCreate.Any(q => Encoding.UTF8.GetBytes(QueueAddress.Parse(q).Table).Length > TableQueueNameLimit);

var delayedQueueNameExceedsLimit =
Encoding.UTF8.GetBytes(delayedQueueCanonicalAddress.Table).Length > TableQueueNameLimit;
var delayedQueueNameExceedsLimit = delayedQueueCanonicalAddress != null && (Encoding.UTF8.GetBytes(delayedQueueCanonicalAddress.Table).Length > TableQueueNameLimit);

if (queueNameExceedsLimit || delayedQueueNameExceedsLimit)
{
Expand Down

0 comments on commit 3f053a3

Please sign in to comment.