Skip to content

Commit

Permalink
Merge pull request #1483 from Particular/integrate-postgres-transport
Browse files Browse the repository at this point in the history
  • Loading branch information
dvdstelt authored Jun 19, 2024
2 parents 5811fcd + 5397ac2 commit f434fb0
Show file tree
Hide file tree
Showing 19 changed files with 882 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.0.0" GeneratePathProperty="true" />
<PackageReference Include="NServiceBus.Transport.SqlServer" Version="8.0.0" />
<PackageReference Include="NServiceBus.Transport.SqlServer" Version="8.1.2" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
</ItemGroup>
Expand Down
7 changes: 7 additions & 0 deletions src/NServiceBus.Persistence.Sql.sln
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TransactionalSession.MsSqlS
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AuroraSetup", "AuroraSetup\AuroraSetup.csproj", "{B1B4FF6C-354C-4B73-B383-CD1D4F76400C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PostgreSqlTransportAcceptanceTests", "PostgreSqlTransportAcceptanceTests\PostgreSqlTransportAcceptanceTests.csproj", "{4078B14A-565B-4A53-AF51-B4995E30466A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -156,6 +158,10 @@ Global
{B1B4FF6C-354C-4B73-B383-CD1D4F76400C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B1B4FF6C-354C-4B73-B383-CD1D4F76400C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B1B4FF6C-354C-4B73-B383-CD1D4F76400C}.Release|Any CPU.Build.0 = Release|Any CPU
{4078B14A-565B-4A53-AF51-B4995E30466A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4078B14A-565B-4A53-AF51-B4995E30466A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4078B14A-565B-4A53-AF51-B4995E30466A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4078B14A-565B-4A53-AF51-B4995E30466A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -171,6 +177,7 @@ Global
{20DA1B6B-651B-480F-BEA9-3E92673C5793} = {8E942C0C-AAAB-4962-87F6-3D0F584003F2}
{AB33F7C1-FF29-4958-B176-81AC227451A5} = {8E942C0C-AAAB-4962-87F6-3D0F584003F2}
{B1B4FF6C-354C-4B73-B383-CD1D4F76400C} = {8E942C0C-AAAB-4962-87F6-3D0F584003F2}
{4078B14A-565B-4A53-AF51-B4995E30466A} = {8E942C0C-AAAB-4962-87F6-3D0F584003F2}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {90D6A734-35FA-4702-BDFD-74EADABDE821}
Expand Down
27 changes: 27 additions & 0 deletions src/PostgreSqlTransportAcceptanceTests/.editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[*.cs]

# Justification: Test project
dotnet_diagnostic.CA2007.severity = none

# may be enabled in future
dotnet_diagnostic.PS0018.severity = none # A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext

# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
dotnet_diagnostic.NSB0002.severity = none

# Justification: No saga analyzers here
dotnet_diagnostic.NSB0003.severity = none
dotnet_diagnostic.NSB0004.severity = none
dotnet_diagnostic.NSB0005.severity = none
dotnet_diagnostic.NSB0006.severity = none
dotnet_diagnostic.NSB0007.severity = none
dotnet_diagnostic.NSB0008.severity = none
dotnet_diagnostic.NSB0009.severity = none
dotnet_diagnostic.NSB0010.severity = none
dotnet_diagnostic.NSB0011.severity = none
dotnet_diagnostic.NSB0012.severity = none
dotnet_diagnostic.NSB0013.severity = none
dotnet_diagnostic.NSB0014.severity = none
dotnet_diagnostic.NSB0015.severity = none
dotnet_diagnostic.NSB0016.severity = none
dotnet_diagnostic.NSB0017.severity = none
2 changes: 2 additions & 0 deletions src/PostgreSqlTransportAcceptanceTests/.globalconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# may be enabled in future
dotnet_diagnostic.PS0018.severity = suggestion # A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Npgsql;
using NServiceBus;
using NServiceBus.AcceptanceTesting.Support;

public class ConfigureEndpointPostgreSqlTransport : IConfigureEndpointTestExecution
{
public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
{
transport = new PostgreSqlTransport(async cancellationToken =>
{
var conn = PostgreSqlConnectionBuilder.Build();
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
return conn;
});

configuration.UseTransport(transport);

return Task.CompletedTask;
}

public async Task Cleanup()
{
using (var conn = PostgreSqlConnectionBuilder.Build())
{
conn.Open();

var testingData = transport.GetType().GetProperty("Testing", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(transport);


var commandTextBuilder = new StringBuilder();

//No clean-up for send-only endpoints
if (testingData.GetType().GetProperty("ReceiveAddresses", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(testingData) is string[] queueAddresses)
{
foreach (var address in queueAddresses)
{
commandTextBuilder.AppendLine($"DROP TABLE IF EXISTS {address};");
}
}

//Null-check because if an exception is thrown before startup these fields might be empty
if (testingData.GetType().GetProperty("DelayedDeliveryQueue", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(testingData) is string delayedQueueAddress)
{
commandTextBuilder.AppendLine($"DROP TABLE IF EXISTS {delayedQueueAddress};");
}

var subscriptionTableName = testingData.GetType().GetProperty("SubscriptionTable", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(testingData) as string;

if (!string.IsNullOrEmpty(subscriptionTableName))
{
commandTextBuilder.AppendLine($"DROP TABLE IF EXISTS {subscriptionTableName};");
}

var commandText = commandTextBuilder.ToString();
if (!string.IsNullOrEmpty(commandText))
{
await TryDeleteTables(conn, commandText);
}
}
}

static async Task TryDeleteTables(NpgsqlConnection conn, string commandText)
{
try
{
using (var comm = conn.CreateCommand())
{
comm.CommandText = commandText;
await comm.ExecuteNonQueryAsync().ConfigureAwait(false);
}
}
catch (Exception e)
{
if (!e.Message.Contains("it does not exist or you do not have permission"))
{
throw;
}
}
}

PostgreSqlTransport transport;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Npgsql;
using NpgsqlTypes;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Persistence.Sql.ScriptBuilder;
using NServiceBus.Settings;
using NUnit.Framework;

public class ConfigureEndpointSqlPersistence : IConfigureEndpointTestExecution
{
SetupAndTeardownDatabase setupFeature;

public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
{
if (configuration.IsSendOnly())
{
return Task.CompletedTask;
}

//Why is it 19? Answer: because we constrain the tablePrefix in PostgreSQL to 20 and we add '_' to the prefix later on
var tablePrefix = TestTableNameCleaner.Clean(endpointName, 19);
Console.WriteLine($"Using EndpointName='{endpointName}', TablePrefix='{tablePrefix}'");

configuration.RegisterStartupTask(sp =>
{
setupFeature = new SetupAndTeardownDatabase(
TestContext.CurrentContext.Test.ID,
sp.GetRequiredService<IReadOnlySettings>(),
tablePrefix,
PostgreSqlConnectionBuilder.Build,
BuildSqlDialect.PostgreSql,
e => e.Message.Contains("duplicate key value violates unique constraint"));

return setupFeature;
});

var persistence = configuration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(PostgreSqlConnectionBuilder.Build);
var sqlDialect = persistence.SqlDialect<SqlDialect.PostgreSql>();
persistence.TablePrefix($"{tablePrefix}_");
sqlDialect.JsonBParameterModifier(parameter =>
{
var npgsqlParameter = (NpgsqlParameter)parameter;
npgsqlParameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
});

var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();
persistence.DisableInstaller();
return Task.CompletedTask;
}

public Task Cleanup() => setupFeature != null ? setupFeature.ManualStop(CancellationToken.None) : Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
namespace NServiceBus.AcceptanceTests;

using System;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using NUnit.Framework;
using NUnit.Framework.Interfaces;
using NUnit.Framework.Internal;

/// <summary>
/// Base class for all the NSB test that sets up our conventions
/// </summary>
[TestFixture]
public abstract partial class NServiceBusAcceptanceTest
{
[SetUp]
public void SetUp()
{
Conventions.EndpointNamingConvention = t =>
{
var classAndEndpoint = t.FullName.Split('.').Last();

var testName = classAndEndpoint.Split('+').First();

testName = testName.Replace("When_", "");

var endpointBuilder = classAndEndpoint.Split('+').Last();

testName = Thread.CurrentThread.CurrentCulture.TextInfo.ToTitleCase(testName);

testName = testName.Replace("_", "");

var fullTestName = testName + "#" + endpointBuilder;

// Max length for table name is 63. We need to reserve space for the:
// - ".delayed" - suffix (8)
// - "_Seq_seq" suffix for auto-created sequence backing up the Seq column (8)
// - hashcode (8)
// In summary, we can use 63-8-8-8=39
var charactersToConsider = int.Min(fullTestName.Length, 39);

return $"{fullTestName.Substring(0, charactersToConsider)}{CreateDeterministicHash(fullTestName):X8}";
};
}

public static uint CreateDeterministicHash(string input)
{
using (var provider = MD5.Create())
{
var inputBytes = Encoding.Default.GetBytes(input);
var hashBytes = provider.ComputeHash(inputBytes);
// generate a guid from the hash:
return BitConverter.ToUInt32(hashBytes, 0) % 1000000;
}
}

[TearDown]
public void TearDown()
{
if (!TestExecutionContext.CurrentContext.TryGetRunDescriptor(out var runDescriptor))
{
return;
}

var scenarioContext = runDescriptor.ScenarioContext;

if (Environment.GetEnvironmentVariable("CI") != "true" || Environment.GetEnvironmentVariable("VERBOSE_TEST_LOGGING")?.ToLower() == "true")
{
TestContext.WriteLine($@"Test settings:
{string.Join(Environment.NewLine, runDescriptor.Settings.Select(setting => $" {setting.Key}: {setting.Value}"))}");

TestContext.WriteLine($@"Context:
{string.Join(Environment.NewLine, scenarioContext.GetType().GetProperties().Select(p => $"{p.Name} = {p.GetValue(scenarioContext, null)}"))}");
}

if (TestExecutionContext.CurrentContext.CurrentResult.ResultState == ResultState.Failure || TestExecutionContext.CurrentContext.CurrentResult.ResultState == ResultState.Error)
{
TestContext.WriteLine(string.Empty);
TestContext.WriteLine($"Log entries (log level: {scenarioContext.LogLevel}):");
TestContext.WriteLine("--- Start log entries ---------------------------------------------------");
foreach (var logEntry in scenarioContext.Logs)
{
TestContext.WriteLine($"{logEntry.Timestamp:T} {logEntry.Level} {logEntry.Endpoint ?? TestContext.CurrentContext.Test.Name}: {logEntry.Message}");
}
TestContext.WriteLine("--- End log entries ---------------------------------------------------");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>..\NServiceBusTests.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\AcceptanceTestHelper\AcceptanceTestHelper.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.0.0" GeneratePathProperty="true" />
<PackageReference Include="NServiceBus.Transport.PostgreSql" Version="8.1.2" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
</ItemGroup>

<ItemGroup Condition="'$(PkgNServiceBus_AcceptanceTests_Sources)' != ''">
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\*.cs" />
<Compile Include="$(PkgNServiceBus_AcceptanceTests_Sources)\contentFiles\cs\$(TargetFramework)\**\EndpointTemplates\*.cs" />
<Compile Include="$(PkgNServiceBus_AcceptanceTests_Sources)\contentFiles\cs\$(TargetFramework)\**\ScenarioDescriptors\*.cs" />
</ItemGroup>

</Project>
25 changes: 25 additions & 0 deletions src/PostgreSqlTransportAcceptanceTests/TestSuiteConstraints.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[assembly: PostgreSqlTest]

namespace NServiceBus.AcceptanceTests
{
using AcceptanceTesting.Support;

public partial class TestSuiteConstraints
{
public bool SupportsDtc => false;
public bool SupportsCrossQueueTransactions => true;
public bool SupportsNativePubSub => true;
public bool SupportsDelayedDelivery => true;
public bool SupportsOutbox => true;
public bool SupportsPurgeOnStartup => true;
public IConfigureEndpointTestExecution CreateTransportConfiguration()
{
return new ConfigureEndpointPostgreSqlTransport();
}

public IConfigureEndpointTestExecution CreatePersistenceConfiguration()
{
return new ConfigureEndpointSqlPersistence();
}
}
}
Loading

0 comments on commit f434fb0

Please sign in to comment.