diff --git a/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs b/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs index fa57a4930..148d5eec0 100644 --- a/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -49,46 +51,45 @@ public async Task Cleanup(CancellationToken cancellationToken = default) return; } - if (string.IsNullOrWhiteSpace(connectionString) == false) + if (string.IsNullOrWhiteSpace(connectionString)) + { + return; + } + + var queues = new List { errorQueueName, inputQueueName }; + + if (!postgreSqlTransport.DisableDelayedDelivery) { var delayedDeliveryQueueName = postgreSqlTransport.Testing.DelayedDeliveryQueue .Replace("\"public\".", string.Empty) .Replace("\"", string.Empty); - var queues = new[] - { - errorQueueName, - inputQueueName, - delayedDeliveryQueueName - }; + queues.Add(delayedDeliveryQueueName); + } - using var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(cancellationToken).ConfigureAwait(false); + using var conn = new NpgsqlConnection(connectionString); + await conn.OpenAsync(cancellationToken).ConfigureAwait(false); - foreach (var queue in queues) + foreach (var queue in queues.Where(q => !string.IsNullOrWhiteSpace(q))) + { + using (var comm = conn.CreateCommand()) { - if (string.IsNullOrWhiteSpace(queue) == false) - { - using (var comm = conn.CreateCommand()) - { - comm.CommandText = $"DROP TABLE IF EXISTS \"public\".\"{queue}\"; " + - $"DROP SEQUENCE IF EXISTS \"public\".\"{queue}_seq_seq\";"; - - await comm.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); - } - } + comm.CommandText = $"DROP TABLE IF EXISTS \"public\".\"{queue}\"; " + + $"DROP SEQUENCE IF EXISTS \"public\".\"{queue}_seq_seq\";"; + + await comm.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } + } - var subscriptionTableName = postgreSqlTransport.Testing.SubscriptionTable; + var subscriptionTableName = postgreSqlTransport.Testing.SubscriptionTable; - if (!string.IsNullOrEmpty(subscriptionTableName)) + if (!string.IsNullOrEmpty(subscriptionTableName)) + { + using (var comm = conn.CreateCommand()) { - using (var comm = conn.CreateCommand()) - { - comm.CommandText = $"DROP TABLE IF EXISTS {subscriptionTableName};"; + comm.CommandText = $"DROP TABLE IF EXISTS {subscriptionTableName};"; - await comm.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); - } + await comm.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } } } diff --git a/src/NServiceBus.Transport.PostgreSql.TransportTests/NServiceBusTransportTest.cs b/src/NServiceBus.Transport.PostgreSql.TransportTests/NServiceBusTransportTest.cs index 7f189a8b7..f5bdab242 100644 --- a/src/NServiceBus.Transport.PostgreSql.TransportTests/NServiceBusTransportTest.cs +++ b/src/NServiceBus.Transport.PostgreSql.TransportTests/NServiceBusTransportTest.cs @@ -35,6 +35,7 @@ public void SetUp() testCancellationTokenSource = Debugger.IsAttached ? new CancellationTokenSource() : new CancellationTokenSource(TestTimeout); receiver = null; registrations = []; + CustomizeTransportDefinition = _ => { }; } protected static IConfigureTransportInfrastructure CreateConfigurer() { @@ -114,18 +115,19 @@ protected async Task Initialize(OnMessage onMessage, OnError onError, TransportT }, true); - var transport = configurer.CreateTransportDefinition(); + var transportDefinition = configurer.CreateTransportDefinition(); - IgnoreUnsupportedTransactionModes(transport, transactionMode); + CustomizeTransportDefinition(transportDefinition); + IgnoreUnsupportedTransactionModes(transportDefinition, transactionMode); if (OperatingSystem.IsWindows() && transactionMode == TransportTransactionMode.TransactionScope) { TransactionManager.ImplicitDistributedTransactions = true; } - transport.TransportTransactionMode = transactionMode; + transportDefinition.TransportTransactionMode = transactionMode; - transportInfrastructure = await configurer.Configure(transport, hostSettings, new QueueAddress(InputQueueName), ErrorQueueName, cancellationToken); + transportInfrastructure = await configurer.Configure(transportDefinition, hostSettings, new QueueAddress(InputQueueName), ErrorQueueName, cancellationToken); receiver = transportInfrastructure.Receivers.Single().Value; @@ -301,6 +303,7 @@ public static uint CreateDeterministicHash(string input) protected string ErrorQueueName; protected static TransportTestLoggerFactory LogFactory; protected static TimeSpan TestTimeout = TimeSpan.FromSeconds(30); + protected Action CustomizeTransportDefinition; string testId; diff --git a/src/NServiceBus.Transport.PostgreSql.TransportTests/When_delayed_delivery_disabled.cs b/src/NServiceBus.Transport.PostgreSql.TransportTests/When_delayed_delivery_disabled.cs new file mode 100644 index 000000000..6cccd43d9 --- /dev/null +++ b/src/NServiceBus.Transport.PostgreSql.TransportTests/When_delayed_delivery_disabled.cs @@ -0,0 +1,38 @@ +namespace NServiceBus.TransportTests +{ + using System.Threading.Tasks; + using NUnit.Framework; + using Transport; + + public class When_delayed_delivery_disabled : NServiceBusTransportTest + { + [TestCase(TransportTransactionMode.None)] + [TestCase(TransportTransactionMode.ReceiveOnly)] + [TestCase(TransportTransactionMode.SendsAtomicWithReceive)] + [TestCase(TransportTransactionMode.TransactionScope)] + public async Task Should_work(TransportTransactionMode transactionMode) + { + CustomizeTransportDefinition = definition => + { + ((PostgreSqlTransport)definition).DisableDelayedDelivery = true; + }; + + var onReceived = CreateTaskCompletionSource(); + + await StartPump( + (context, _) => + { + onReceived.SetResult(context); + return Task.CompletedTask; + }, + (_, __) => Task.FromResult(ErrorHandleResult.Handled), + transactionMode); + + await SendMessage(InputQueueName); + + var ctx = await onReceived.Task; + + Assert.That(ctx, Is.Not.Null); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs b/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs index 351180f28..a09a84b3c 100644 --- a/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs @@ -151,7 +151,7 @@ async Task ConfigureReceiveInfrastructure(CancellationToken cancellationToken) //Create delayed delivery infrastructure CanonicalQueueAddress delayedQueueCanonicalAddress = null; - if (transport.DisableDelayedDelivery == false) + if (!transport.DisableDelayedDelivery) { var delayedDelivery = transport.DelayedDelivery; @@ -213,8 +213,7 @@ async Task ConfigureReceiveInfrastructure(CancellationToken cancellationToken) var queueNameExceedsLimit = queuesToCreate.Any(q => Encoding.UTF8.GetBytes(QueueAddress.Parse(q).Table).Length > TableQueueNameLimit); - var delayedQueueNameExceedsLimit = - Encoding.UTF8.GetBytes(delayedQueueCanonicalAddress.Table).Length > TableQueueNameLimit; + var delayedQueueNameExceedsLimit = delayedQueueCanonicalAddress != null && (Encoding.UTF8.GetBytes(delayedQueueCanonicalAddress.Table).Length > TableQueueNameLimit); if (queueNameExceedsLimit || delayedQueueNameExceedsLimit) {