diff --git a/Brighter.sln b/Brighter.sln
index 1dc116a904..0e5341c19c 100644
--- a/Brighter.sln
+++ b/Brighter.sln
@@ -247,7 +247,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Greetings_MySqlMigrations",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GreetingsScopedReceiverConsole", "samples\ASBTaskQueue\GreetingsScopedReceiverConsole\GreetingsScopedReceiverConsole.csproj", "{9D9F08A9-66EE-4AA2-8F11-2FA662EAADE2}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GreetingsWorker", "samples\ASBTaskQueue\GreetingsWorker\GreetingsWorker.csproj", "{93589653-2B49-4818-BE98-FE6F16EC72EC}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GreetingsWorker", "samples\ASBTaskQueue\GreetingsWorker\GreetingsWorker.csproj", "{93589653-2B49-4818-BE98-FE6F16EC72EC}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Paramore.Brighter.PostgreSql", "src\Paramore.Brighter.PostgreSql\Paramore.Brighter.PostgreSql.csproj", "{08E6D0F8-B6CE-454F-8761-77731D99F743}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.PostgreSql.EntityFrameworkCore", "src\Paramore.Brighter.PostgreSql.EntityFrameworkCore\Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj", "{AA85493A-4120-4DA0-BAA5-CBF34D238A64}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -1327,6 +1331,30 @@ Global
{93589653-2B49-4818-BE98-FE6F16EC72EC}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{93589653-2B49-4818-BE98-FE6F16EC72EC}.Release|x86.ActiveCfg = Release|Any CPU
{93589653-2B49-4818-BE98-FE6F16EC72EC}.Release|x86.Build.0 = Release|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|x86.Build.0 = Debug|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Any CPU.Build.0 = Release|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|x86.ActiveCfg = Release|Any CPU
+ {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|x86.Build.0 = Release|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|x86.Build.0 = Debug|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Any CPU.Build.0 = Release|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|x86.ActiveCfg = Release|Any CPU
+ {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/Paramore.Brighter.Inbox.Postgres/Paramore.Brighter.Inbox.Postgres.csproj b/src/Paramore.Brighter.Inbox.Postgres/Paramore.Brighter.Inbox.Postgres.csproj
index f5743fbd06..3ae8575457 100644
--- a/src/Paramore.Brighter.Inbox.Postgres/Paramore.Brighter.Inbox.Postgres.csproj
+++ b/src/Paramore.Brighter.Inbox.Postgres/Paramore.Brighter.Inbox.Postgres.csproj
@@ -1,15 +1,18 @@
-
- netstandard2.0
-
+
+ This is an implementation of the inbox used for decoupled invocation of commands by Paramore.Brighter, using PostgreSql
+ netstandard2.0
+ MySql;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability
+
-
-
-
+
+
+
+
-
-
-
+
+
+
diff --git a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs
index 691d0afa11..565c24030a 100644
--- a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs
+++ b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs
@@ -34,12 +34,14 @@ THE SOFTWARE. */
using NpgsqlTypes;
using Paramore.Brighter.Inbox.Exceptions;
using Paramore.Brighter.Logging;
+using Paramore.Brighter.PostgreSql;
namespace Paramore.Brighter.Inbox.Postgres
{
public class PostgresSqlInbox : IAmAnInbox, IAmAnInboxAsync
{
private readonly PostgresSqlInboxConfiguration _configuration;
+ private readonly IPostgreSqlConnectionProvider _connectionProvider;
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger();
///
/// If false we the default thread synchronization context to run any continuation, if true we re-use the original
@@ -51,38 +53,44 @@ public class PostgresSqlInbox : IAmAnInbox, IAmAnInboxAsync
///
public bool ContinueOnCapturedContext { get; set; }
- public PostgresSqlInbox(PostgresSqlInboxConfiguration postgresSqlInboxConfiguration)
- {
- _configuration = postgresSqlInboxConfiguration;
- ContinueOnCapturedContext = false;
- }
-
+ public PostgresSqlInbox(PostgresSqlInboxConfiguration configuration, IPostgreSqlConnectionProvider connectionProvider = null)
+ {
+ _configuration = configuration;
+ _connectionProvider = connectionProvider;
+ ContinueOnCapturedContext = false;
+ }
public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest
{
- var parameters = InitAddDbParameters(command, contextKey);
-
- using (var connection = GetConnection())
- {
- connection.Open();
- var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds);
- try
- {
- sqlcmd.ExecuteNonQuery();
- }
- catch (PostgresException sqlException)
- {
- if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation)
- {
- s_logger.LogWarning(
- "PostgresSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing",
- command.Id);
- return;
- }
-
- throw;
- }
- }
+ var connectionProvider = GetConnectionProvider();
+ var parameters = InitAddDbParameters(command, contextKey);
+ var connection = GetOpenConnection(connectionProvider);
+
+ try
+ {
+ using (var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds))
+ {
+ sqlcmd.ExecuteNonQuery();
+ }
+ }
+ catch (PostgresException sqlException)
+ {
+ if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation)
+ {
+ s_logger.LogWarning(
+ "PostgresSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing",
+ command.Id);
+ return;
+ }
+ throw;
+ }
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ connection.Dispose();
+ else if (!connectionProvider.HasOpenTransaction)
+ connection.Close();
+ }
}
public T Get(Guid id, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest
@@ -90,8 +98,8 @@ public T Get(Guid id, string contextKey, int timeoutInMilliseconds = -1) wher
var sql = $"SELECT * FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey";
var parameters = new[]
{
- CreateNpgsqlParameter("CommandId", id),
- CreateNpgsqlParameter("ContextKey", contextKey)
+ InitNpgsqlParameter("CommandId", id),
+ InitNpgsqlParameter("ContextKey", contextKey)
};
return ExecuteCommand(command => ReadCommand(command.ExecuteReader(), id), sql, timeoutInMilliseconds, parameters);
@@ -102,38 +110,45 @@ public bool Exists(Guid id, string contextKey, int timeoutInMilliseconds = -1
var sql = $"SELECT DISTINCT CommandId FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey FETCH FIRST 1 ROWS ONLY";
var parameters = new[]
{
- CreateNpgsqlParameter("CommandId", id),
- CreateNpgsqlParameter("ContextKey", contextKey)
+ InitNpgsqlParameter("CommandId", id),
+ InitNpgsqlParameter("ContextKey", contextKey)
};
return ExecuteCommand(command => command.ExecuteReader().HasRows, sql, timeoutInMilliseconds, parameters);
}
public async Task AddAsync(T command, string contextKey, int timeoutInMilliseconds = -1,
- CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest
+ CancellationToken cancellationToken = default) where T : class, IRequest
{
+ var connectionProvider = GetConnectionProvider();
var parameters = InitAddDbParameters(command, contextKey);
+ var connection = await GetOpenConnectionAsync(connectionProvider, cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
- using (var connection = GetConnection())
+ try
{
- await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
- var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds);
- try
+ using (var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds))
{
await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
}
- catch (PostgresException sqlException)
+ }
+ catch (PostgresException sqlException)
+ {
+ if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation)
{
- if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation)
- {
- s_logger.LogWarning(
- "PostgresSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing",
- command.Id);
- return;
- }
-
- throw;
+ s_logger.LogWarning(
+ "PostgresSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing",
+ command.Id);
+ return;
}
+
+ throw;
+ }
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ await connection.DisposeAsync().ConfigureAwait(ContinueOnCapturedContext);
+ else if (!connectionProvider.HasOpenTransaction)
+ await connection.CloseAsync().ConfigureAwait(ContinueOnCapturedContext);
}
}
@@ -143,8 +158,8 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise
var parameters = new[]
{
- CreateNpgsqlParameter("CommandId", id),
- CreateNpgsqlParameter("ContextKey", contextKey)
+ InitNpgsqlParameter("CommandId", id),
+ InitNpgsqlParameter("ContextKey", contextKey)
};
return await ExecuteCommandAsync(
@@ -161,8 +176,8 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise
var sql = $"SELECT DISTINCT CommandId FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey FETCH FIRST 1 ROWS ONLY";
var parameters = new[]
{
- CreateNpgsqlParameter("CommandId", id),
- CreateNpgsqlParameter("ContextKey", contextKey)
+ InitNpgsqlParameter("CommandId", id),
+ InitNpgsqlParameter("ContextKey", contextKey)
};
return await ExecuteCommandAsync(
@@ -178,12 +193,42 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise
.ConfigureAwait(ContinueOnCapturedContext);
}
- private NpgsqlConnection GetConnection()
+ private IPostgreSqlConnectionProvider GetConnectionProvider(IAmABoxTransactionConnectionProvider transactionConnectionProvider = null)
{
- return new NpgsqlConnection(_configuration.ConnectionString);
+ var connectionProvider = _connectionProvider ?? new PostgreSqlNpgsqlConnectionProvider(_configuration);
+
+ if (transactionConnectionProvider != null)
+ {
+ if (transactionConnectionProvider is IPostgreSqlTransactionConnectionProvider provider)
+ connectionProvider = provider;
+ else
+ throw new Exception($"{nameof(transactionConnectionProvider)} does not implement interface {nameof(IPostgreSqlTransactionConnectionProvider)}.");
+ }
+
+ return connectionProvider;
}
- private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value)
+ private NpgsqlConnection GetOpenConnection(IPostgreSqlConnectionProvider connectionProvider)
+ {
+ NpgsqlConnection connection = connectionProvider.GetConnection();
+
+ if (connection.State != ConnectionState.Open)
+ connection.Open();
+
+ return connection;
+ }
+
+ private async Task GetOpenConnectionAsync(IPostgreSqlConnectionProvider connectionProvider, CancellationToken cancellationToken = default)
+ {
+ NpgsqlConnection connection = await connectionProvider.GetConnectionAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
+
+ if (connection.State != ConnectionState.Open)
+ await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
+
+ return connection;
+ }
+
+ private NpgsqlParameter InitNpgsqlParameter(string parametername, object value)
{
if (value != null)
return new NpgsqlParameter(parametername, value);
@@ -194,10 +239,9 @@ private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value
private DbCommand InitAddDbCommand(DbConnection connection, DbParameter[] parameters, int timeoutInMilliseconds)
{
var command = connection.CreateCommand();
- var sql = string.Format(
+ command.CommandText = string.Format(
"INSERT INTO {0} (CommandID, CommandType, CommandBody, Timestamp, ContextKey) VALUES (@CommandID, @CommandType, @CommandBody, @Timestamp, @ContextKey)",
_configuration.InBoxTableName);
- command.CommandText = sql;
command.Parameters.AddRange(parameters);
return command;
}
@@ -207,11 +251,11 @@ private DbParameter[] InitAddDbParameters(T command, string contextKey) where
var commandJson = JsonSerializer.Serialize(command, JsonSerialisationOptions.Options);
var parameters = new[]
{
- CreateNpgsqlParameter("CommandID", command.Id),
- CreateNpgsqlParameter("CommandType", typeof (T).Name),
- CreateNpgsqlParameter("CommandBody", commandJson),
+ InitNpgsqlParameter("CommandID", command.Id),
+ InitNpgsqlParameter("CommandType", typeof (T).Name),
+ InitNpgsqlParameter("CommandBody", commandJson),
new NpgsqlParameter("Timestamp", NpgsqlDbType.TimestampTz) {Value = DateTimeOffset.UtcNow},
- CreateNpgsqlParameter("ContextKey", contextKey)
+ InitNpgsqlParameter("ContextKey", contextKey)
};
return parameters;
}
@@ -219,16 +263,28 @@ private DbParameter[] InitAddDbParameters(T command, string contextKey) where
private T ExecuteCommand(Func execute, string sql, int timeoutInMilliseconds,
params DbParameter[] parameters)
{
- using (var connection = GetConnection())
- using (var command = connection.CreateCommand())
+ var connectionProvider = GetConnectionProvider();
+ var connection = GetOpenConnection(connectionProvider);
+
+ try
{
- if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds;
- command.CommandText = sql;
- command.Parameters.AddRange(parameters);
+ using (var command = connection.CreateCommand())
+ {
+ if (timeoutInMilliseconds != -1)
+ command.CommandTimeout = timeoutInMilliseconds;
- connection.Open();
- var item = execute(command);
- return item;
+ command.CommandText = sql;
+ command.Parameters.AddRange(parameters);
+
+ return execute(command);
+ }
+ }
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ connection.Dispose();
+ else if (!connectionProvider.HasOpenTransaction)
+ connection.Close();
}
}
@@ -236,19 +292,31 @@ private async Task ExecuteCommandAsync(
Func> execute,
string sql,
int timeoutInMilliseconds,
- CancellationToken cancellationToken = default(CancellationToken),
+ CancellationToken cancellationToken = default,
params DbParameter[] parameters)
{
- using (var connection = GetConnection())
- using (var command = connection.CreateCommand())
+ var connectionProvider = GetConnectionProvider();
+ var connection = await GetOpenConnectionAsync(connectionProvider, cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
+
+ try
{
- if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds;
- command.CommandText = sql;
- command.Parameters.AddRange(parameters);
+ using (var command = connection.CreateCommand())
+ {
+ if (timeoutInMilliseconds != -1)
+ command.CommandTimeout = timeoutInMilliseconds;
- await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
- var item = await execute(command).ConfigureAwait(ContinueOnCapturedContext);
- return item;
+ command.CommandText = sql;
+ command.Parameters.AddRange(parameters);
+
+ return await execute(command).ConfigureAwait(ContinueOnCapturedContext);
+ }
+ }
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ await connection.DisposeAsync().ConfigureAwait(ContinueOnCapturedContext);
+ else if (!connectionProvider.HasOpenTransaction)
+ await connection.CloseAsync().ConfigureAwait(ContinueOnCapturedContext);
}
}
diff --git a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInboxConfiguration.cs b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInboxConfiguration.cs
index 7d9194c192..31ff311cd9 100644
--- a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInboxConfiguration.cs
+++ b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInboxConfiguration.cs
@@ -23,28 +23,26 @@ THE SOFTWARE. */
#endregion
+using Paramore.Brighter.PostgreSql;
namespace Paramore.Brighter.Inbox.Postgres
{
- public class PostgresSqlInboxConfiguration
+ public class PostgresSqlInboxConfiguration : PostgreSqlConfiguration
{
- public PostgresSqlInboxConfiguration(string connectionString, string tableName)
+ public PostgresSqlInboxConfiguration(string connectionString, string tableName) : base(connectionString)
{
- ConnectionString = connectionString;
InBoxTableName = tableName;
}
- ///
- /// Gets the connection string.
- ///
- /// The connection string.
- public string ConnectionString { get; private set; }
+ public PostgresSqlInboxConfiguration(string tableName) : base(null)
+ {
+ InBoxTableName = tableName;
+ }
///
/// Gets the name of the outbox table.
///
/// The name of the outbox table.
- public string InBoxTableName { get; private set; }
-
+ public string InBoxTableName { get; }
}
}
diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/Paramore.Brighter.Outbox.PostgreSql.csproj b/src/Paramore.Brighter.Outbox.PostgreSql/Paramore.Brighter.Outbox.PostgreSql.csproj
index 6c78d8a9b6..636ba175fd 100644
--- a/src/Paramore.Brighter.Outbox.PostgreSql/Paramore.Brighter.Outbox.PostgreSql.csproj
+++ b/src/Paramore.Brighter.Outbox.PostgreSql/Paramore.Brighter.Outbox.PostgreSql.csproj
@@ -7,6 +7,7 @@
+
diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxConfiguration.cs b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxConfiguration.cs
index b593c5ccc3..73a7a44c3c 100644
--- a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxConfiguration.cs
+++ b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxConfiguration.cs
@@ -22,36 +22,35 @@ THE SOFTWARE. */
#endregion
-using System;
-using System.Collections.Generic;
-using System.Text;
+using Paramore.Brighter.PostgreSql;
namespace Paramore.Brighter.Outbox.PostgreSql
{
- public class PostgreSqlOutboxConfiguration
+ public class PostgreSqlOutboxConfiguration : PostgreSqlConfiguration
{
///
/// Initialises a new instance of
///
/// The Subscription String
/// Name of the OutBox table
- public PostgreSqlOutboxConfiguration(string connectionstring,string outBoxTablename)
+ public PostgreSqlOutboxConfiguration(string connectionstring, string outBoxTablename) : base(connectionstring)
{
- ConnectionString = connectionstring;
OutboxTableName = outBoxTablename;
}
///
- /// Gets the subscription string.
+ /// Initialises a new instance of
///
- /// The subscription string.
- public string ConnectionString { get; private set; }
+ /// Name of the OutBox table
+ public PostgreSqlOutboxConfiguration(string outBoxTablename) : base(null)
+ {
+ OutboxTableName = outBoxTablename;
+ }
///
/// Gets the name of the outbox table.
///
/// The name of the outbox table.
- public string OutboxTableName { get; private set; }
-
+ public string OutboxTableName { get; }
}
}
diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs
index 8bf59bfaa7..2ec70def27 100644
--- a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs
+++ b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs
@@ -31,13 +31,15 @@ THE SOFTWARE. */
using Npgsql;
using NpgsqlTypes;
using Paramore.Brighter.Logging;
+using Paramore.Brighter.PostgreSql;
namespace Paramore.Brighter.Outbox.PostgreSql
{
public class PostgreSqlOutboxSync : IAmAnOutboxSync, IAmAnOutboxViewer
{
- private static readonly ILogger s_logger = ApplicationLogging.CreateLogger();
private readonly PostgreSqlOutboxConfiguration _configuration;
+ private readonly IPostgreSqlConnectionProvider _connectionProvider;
+ private static readonly ILogger s_logger = ApplicationLogging.CreateLogger();
public bool ContinueOnCapturedContext
{
@@ -48,10 +50,11 @@ public bool ContinueOnCapturedContext
///
/// Initialises a new instance of class.
///
- /// PostgreSql Configuration.
- public PostgreSqlOutboxSync(PostgreSqlOutboxConfiguration configuration)
+ /// PostgreSql Outbox Configuration.
+ public PostgreSqlOutboxSync(PostgreSqlOutboxConfiguration configuration, IPostgreSqlConnectionProvider connectionProvider = null)
{
_configuration = configuration;
+ _connectionProvider = connectionProvider;
}
///
@@ -61,30 +64,36 @@ public PostgreSqlOutboxSync(PostgreSqlOutboxConfiguration configuration)
/// The time allowed for the write in milliseconds; on a -1 default
public void Add(Message message, int outBoxTimeout = -1, IAmABoxTransactionConnectionProvider transactionConnectionProvider = null)
{
+ var connectionProvider = GetConnectionProvider(transactionConnectionProvider);
var parameters = InitAddDbParameters(message);
- using (var connection = GetConnection())
+ var connection = GetOpenConnection(connectionProvider);
+
+ try
{
- connection.Open();
using (var command = InitAddDbCommand(connection, parameters))
{
- try
- {
- command.ExecuteNonQuery();
- }
- catch (PostgresException sqlException)
- {
- if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation)
- {
- s_logger.LogWarning(
- "PostgresSQLOutbox: A duplicate Message with the MessageId {Id} was inserted into the Outbox, ignoring and continuing",
- message.Id);
- return;
- }
-
- throw;
- }
+ command.ExecuteNonQuery();
}
}
+ catch (PostgresException sqlException)
+ {
+ if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation)
+ {
+ s_logger.LogWarning(
+ "PostgresSQLOutbox: A duplicate Message with the MessageId {Id} was inserted into the Outbox, ignoring and continuing",
+ message.Id);
+ return;
+ }
+
+ throw;
+ }
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ connection.Dispose();
+ else if (!connectionProvider.HasOpenTransaction)
+ connection.Close();
+ }
}
///
@@ -103,22 +112,30 @@ public IEnumerable DispatchedMessages(
int outboxTimeout = -1,
Dictionary args = null)
{
- using (var connection = GetConnection())
- using (var command = connection.CreateCommand())
- {
- CreatePagedDispatchedCommand(command, millisecondsDispatchedSince, pageSize, pageNumber);
+ var connectionProvider = GetConnectionProvider();
+ var connection = GetOpenConnection(connectionProvider);
- connection.Open();
+ try
+ {
+ using (var command = InitPagedDispatchedCommand(connection, millisecondsDispatchedSince, pageSize, pageNumber))
+ {
+ var messages = new List();
- var dbDataReader = command.ExecuteReader();
+ using (var dbDataReader = command.ExecuteReader())
+ {
+ while (dbDataReader.Read())
+ messages.Add(MapAMessage(dbDataReader));
+ }
- var messages = new List();
- while (dbDataReader.Read())
- {
- messages.Add(MapAMessage(dbDataReader));
+ return messages;
}
-
- return messages;
+ }
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ connection.Dispose();
+ else if (!connectionProvider.HasOpenTransaction)
+ connection.Close();
}
}
@@ -131,26 +148,35 @@ public IEnumerable DispatchedMessages(
/// A list of messages
public IList Get(int pageSize = 100, int pageNumber = 1, Dictionary args = null)
{
- using (var connection = GetConnection())
- using (var command = connection.CreateCommand())
- {
- CreatePagedReadCommand(command, _configuration, pageSize, pageNumber);
+ var connectionProvider = GetConnectionProvider();
+ var connection = GetOpenConnection(connectionProvider);
- connection.Open();
+ try
+ {
+ using (var command = InitPagedReadCommand(connection, pageSize, pageNumber))
+ {
+ var messages = new List();
- var dbDataReader = command.ExecuteReader();
+ using (var dbDataReader = command.ExecuteReader())
+ {
+ while (dbDataReader.Read())
+ {
+ messages.Add(MapAMessage(dbDataReader));
+ }
+ }
- var messages = new List();
- while (dbDataReader.Read())
- {
- messages.Add(MapAMessage(dbDataReader));
+ return messages;
}
-
- return messages;
+ }
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ connection.Dispose();
+ else if (!connectionProvider.HasOpenTransaction)
+ connection.Close();
}
}
-
///
/// Gets the specified message identifier.
///
@@ -162,7 +188,7 @@ public Message Get(Guid messageId, int outBoxTimeout = -1)
var sql = string.Format(
"SELECT Id, MessageId, Topic, MessageType, Timestamp, Correlationid, ReplyTo, ContentType, HeaderBag, Body FROM {0} WHERE MessageId = @MessageId",
_configuration.OutboxTableName);
- var parameters = new[] {CreateNpgsqlParameter("MessageId", messageId)};
+ var parameters = new[] { InitNpgsqlParameter("MessageId", messageId) };
return ExecuteCommand(command => MapFunction(command.ExecuteReader()), sql, outBoxTimeout, parameters);
}
@@ -174,14 +200,23 @@ public Message Get(Guid messageId, int outBoxTimeout = -1)
/// When was the message dispatched, defaults to UTC now
public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary args = null)
{
- using (var connection = GetConnection())
+ var connectionProvider = GetConnectionProvider();
+ var connection = GetOpenConnection(connectionProvider);
+
+ try
{
- connection.Open();
using (var command = InitMarkDispatchedCommand(connection, id, dispatchedAt))
{
command.ExecuteNonQuery();
}
}
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ connection.Dispose();
+ else if (!connectionProvider.HasOpenTransaction)
+ connection.Close();
+ }
}
///
@@ -190,34 +225,69 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, DictionaryHow long ago as the message sent?
/// How many messages to return at once?
/// Which page number of messages
- /// Additional parameters required for search, if any
- /// A list of messages that are outstanding for dispatch
- public IEnumerable OutstandingMessages(
- double millSecondsSinceSent,
- int pageSize = 100,
- int pageNumber = 1,
- Dictionary args = null)
+ /// Additional parameters required for search, if any
+ /// A list of messages that are outstanding for dispatch
+ public IEnumerable OutstandingMessages(
+ double millSecondsSinceSent,
+ int pageSize = 100,
+ int pageNumber = 1,
+ Dictionary args = null)
{
- using (var connection = GetConnection())
- using (var command = connection.CreateCommand())
- {
- CreatePagedOutstandingCommand(command, millSecondsSinceSent, pageSize, pageNumber);
+ var connectionProvider = GetConnectionProvider();
+ var connection = GetOpenConnection(connectionProvider);
- connection.Open();
+ try
+ {
+ using (var command = InitPagedOutstandingCommand(connection, millSecondsSinceSent, pageSize, pageNumber))
+ {
+ var messages = new List();
- var dbDataReader = command.ExecuteReader();
+ using (var dbDataReader = command.ExecuteReader())
+ {
+ while (dbDataReader.Read())
+ {
+ messages.Add(MapAMessage(dbDataReader));
+ }
+ }
- var messages = new List();
- while (dbDataReader.Read())
- {
- messages.Add(MapAMessage(dbDataReader));
+ return messages;
}
+ }
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ connection.Dispose();
+ else if (!connectionProvider.HasOpenTransaction)
+ connection.Close();
+ }
+ }
- return messages;
+ private IPostgreSqlConnectionProvider GetConnectionProvider(IAmABoxTransactionConnectionProvider transactionConnectionProvider = null)
+ {
+ var connectionProvider = _connectionProvider ?? new PostgreSqlNpgsqlConnectionProvider(_configuration);
+
+ if (transactionConnectionProvider != null)
+ {
+ if (transactionConnectionProvider is IPostgreSqlTransactionConnectionProvider provider)
+ connectionProvider = provider;
+ else
+ throw new Exception($"{nameof(transactionConnectionProvider)} does not implement interface {nameof(IPostgreSqlTransactionConnectionProvider)}.");
}
+
+ return connectionProvider;
+ }
+
+ private NpgsqlConnection GetOpenConnection(IPostgreSqlConnectionProvider connectionProvider)
+ {
+ NpgsqlConnection connection = connectionProvider.GetConnection();
+
+ if (connection.State != ConnectionState.Open)
+ connection.Open();
+
+ return connection;
}
- private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value)
+ private NpgsqlParameter InitNpgsqlParameter(string parametername, object value)
{
if (value != null)
return new NpgsqlParameter(parametername, value);
@@ -225,121 +295,133 @@ private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value
return new NpgsqlParameter(parametername, DBNull.Value);
}
- private void CreatePagedDispatchedCommand(NpgsqlCommand command, double millisecondsDispatchedSince,
+ private NpgsqlCommand InitPagedDispatchedCommand(NpgsqlConnection connection, double millisecondsDispatchedSince,
int pageSize, int pageNumber)
{
- var pagingSqlFormat =
- "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
+ var command = connection.CreateCommand();
+
var parameters = new[]
{
- CreateNpgsqlParameter("PageNumber", pageNumber),
- CreateNpgsqlParameter("PageSize", pageSize),
- CreateNpgsqlParameter("OutstandingSince", -1 * millisecondsDispatchedSince)
+ InitNpgsqlParameter("PageNumber", pageNumber),
+ InitNpgsqlParameter("PageSize", pageSize),
+ InitNpgsqlParameter("OutstandingSince", -1 * millisecondsDispatchedSince)
};
- var sql = string.Format(pagingSqlFormat, _configuration.OutboxTableName);
+ var pagingSqlFormat =
+ "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
- command.CommandText = sql;
+ command.CommandText = string.Format(pagingSqlFormat, _configuration.OutboxTableName);
command.Parameters.AddRange(parameters);
+
+ return command;
}
- private void CreatePagedReadCommand(NpgsqlCommand command, PostgreSqlOutboxConfiguration configuration,
- int pageSize, int pageNumber)
+ private NpgsqlCommand InitPagedReadCommand(NpgsqlConnection connection, int pageSize, int pageNumber)
{
- var pagingSqlFormat =
- "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
+ var command = connection.CreateCommand();
+
var parameters = new[]
{
- CreateNpgsqlParameter("PageNumber", pageNumber),
- CreateNpgsqlParameter("PageSize", pageSize)
+ InitNpgsqlParameter("PageNumber", pageNumber),
+ InitNpgsqlParameter("PageSize", pageSize)
};
- var sql = string.Format(pagingSqlFormat, _configuration.OutboxTableName);
+ var pagingSqlFormat =
+ "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
- command.CommandText = sql;
+ command.CommandText = string.Format(pagingSqlFormat, _configuration.OutboxTableName);
command.Parameters.AddRange(parameters);
+
+ return command;
}
- private void CreatePagedOutstandingCommand(NpgsqlCommand command, double milliSecondsSinceAdded, int pageSize,
+ private NpgsqlCommand InitPagedOutstandingCommand(NpgsqlConnection connection, double milliSecondsSinceAdded, int pageSize,
int pageNumber)
{
- var pagingSqlFormat =
- "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC";
+ var command = connection.CreateCommand();
+
var parameters = new[]
{
- CreateNpgsqlParameter("PageNumber", pageNumber),
- CreateNpgsqlParameter("PageSize", pageSize),
- CreateNpgsqlParameter("OutstandingSince", milliSecondsSinceAdded)
+ InitNpgsqlParameter("PageNumber", pageNumber),
+ InitNpgsqlParameter("PageSize", pageSize),
+ InitNpgsqlParameter("OutstandingSince", milliSecondsSinceAdded)
};
- var sql = string.Format(pagingSqlFormat, _configuration.OutboxTableName);
+ var pagingSqlFormat =
+ "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC";
- command.CommandText = sql;
+ command.CommandText = string.Format(pagingSqlFormat, _configuration.OutboxTableName);
command.Parameters.AddRange(parameters);
- }
-
- private T ExecuteCommand(Func execute, string sql, int messageStoreTimeout,
- NpgsqlParameter[] parameters)
- {
- using (var connection = GetConnection())
- using (var command = connection.CreateCommand())
- {
- command.CommandText = sql;
- command.Parameters.AddRange(parameters);
-
- if (messageStoreTimeout != -1) command.CommandTimeout = messageStoreTimeout;
- connection.Open();
- return execute(command);
- }
- }
-
- private NpgsqlConnection GetConnection()
- {
- return new NpgsqlConnection(_configuration.ConnectionString);
- }
-
- private NpgsqlCommand InitAddDbCommand(NpgsqlConnection connection, NpgsqlParameter[] parameters)
- {
- var command = connection.CreateCommand();
- var sql = string.Format(
- "INSERT INTO {0} (MessageId, MessageType, Topic, Timestamp, CorrelationId, ReplyTo, ContentType, HeaderBag, Body) VALUES (@MessageId, @MessageType, @Topic, @Timestamp::timestamptz, @CorrelationId, @ReplyTo, @ContentType, @HeaderBag, @Body)",
- _configuration.OutboxTableName);
- command.CommandText = sql;
- command.Parameters.AddRange(parameters);
return command;
}
private NpgsqlParameter[] InitAddDbParameters(Message message)
{
var bagjson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
- var parameters = new NpgsqlParameter[]
+ return new NpgsqlParameter[]
{
- CreateNpgsqlParameter("MessageId", message.Id),
- CreateNpgsqlParameter("MessageType", message.Header.MessageType.ToString()),
- CreateNpgsqlParameter("Topic", message.Header.Topic),
+ InitNpgsqlParameter("MessageId", message.Id),
+ InitNpgsqlParameter("MessageType", message.Header.MessageType.ToString()),
+ InitNpgsqlParameter("Topic", message.Header.Topic),
new NpgsqlParameter("Timestamp", NpgsqlDbType.TimestampTz) {Value = message.Header.TimeStamp},
- CreateNpgsqlParameter("CorrelationId", message.Header.CorrelationId),
- CreateNpgsqlParameter("ReplyTo", message.Header.ReplyTo),
- CreateNpgsqlParameter("ContentType", message.Header.ContentType),
- CreateNpgsqlParameter("HeaderBag", bagjson),
- CreateNpgsqlParameter("Body", message.Body.Value)
+ InitNpgsqlParameter("CorrelationId", message.Header.CorrelationId),
+ InitNpgsqlParameter("ReplyTo", message.Header.ReplyTo),
+ InitNpgsqlParameter("ContentType", message.Header.ContentType),
+ InitNpgsqlParameter("HeaderBag", bagjson),
+ InitNpgsqlParameter("Body", message.Body.Value)
};
- return parameters;
}
private NpgsqlCommand InitMarkDispatchedCommand(NpgsqlConnection connection, Guid messageId,
DateTime? dispatchedAt)
{
var command = connection.CreateCommand();
- var sql =
- $"UPDATE {_configuration.OutboxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @MessageId";
- command.CommandText = sql;
- command.Parameters.Add(CreateNpgsqlParameter("MessageId", messageId));
- command.Parameters.Add(CreateNpgsqlParameter("DispatchedAt", dispatchedAt));
+ command.CommandText = $"UPDATE {_configuration.OutboxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @MessageId";
+ command.Parameters.Add(InitNpgsqlParameter("MessageId", messageId));
+ command.Parameters.Add(InitNpgsqlParameter("DispatchedAt", dispatchedAt));
return command;
}
+ private T ExecuteCommand(Func execute, string sql, int messageStoreTimeout,
+ NpgsqlParameter[] parameters)
+ {
+ var connectionProvider = GetConnectionProvider();
+ var connection = GetOpenConnection(connectionProvider);
+
+ try
+ {
+ using (var command = connection.CreateCommand())
+ {
+ command.CommandText = sql;
+ command.Parameters.AddRange(parameters);
+
+ if (messageStoreTimeout != -1)
+ command.CommandTimeout = messageStoreTimeout;
+
+ return execute(command);
+ }
+ }
+ finally
+ {
+ if (!connectionProvider.IsSharedConnection)
+ connection.Dispose();
+ else if (!connectionProvider.HasOpenTransaction)
+ connection.Close();
+ }
+ }
+
+ private NpgsqlCommand InitAddDbCommand(NpgsqlConnection connection, NpgsqlParameter[] parameters)
+ {
+ var command = connection.CreateCommand();
+
+ var addSqlFormat = "INSERT INTO {0} (MessageId, MessageType, Topic, Timestamp, CorrelationId, ReplyTo, ContentType, HeaderBag, Body) VALUES (@MessageId, @MessageType, @Topic, @Timestamp::timestamptz, @CorrelationId, @ReplyTo, @ContentType, @HeaderBag, @Body)";
+
+ command.CommandText = string.Format(addSqlFormat, _configuration.OutboxTableName);
+ command.Parameters.AddRange(parameters);
+
+ return command;
+ }
private Message MapFunction(IDataReader reader)
{
@@ -363,11 +445,11 @@ private Message MapAMessage(IDataReader dr)
var contentType = GetContentType(dr);
var header = new MessageHeader(
- messageId:id,
- topic:topic,
- messageType:messageType,
- timeStamp:timeStamp,
- handledCount:0,
+ messageId: id,
+ topic: topic,
+ messageType: messageType,
+ timeStamp: timeStamp,
+ handledCount: 0,
delayedMilliseconds: 0,
correlationId: correlationId,
replyTo: replyTo,
@@ -405,7 +487,8 @@ private static Guid GetMessageId(IDataReader dr)
private string GetContentType(IDataReader dr)
{
var ordinal = dr.GetOrdinal("ContentType");
- if (dr.IsDBNull(ordinal)) return null;
+ if (dr.IsDBNull(ordinal))
+ return null;
var replyTo = dr.GetString(ordinal);
return replyTo;
@@ -413,11 +496,12 @@ private string GetContentType(IDataReader dr)
private string GetReplyTo(IDataReader dr)
{
- var ordinal = dr.GetOrdinal("ReplyTo");
- if (dr.IsDBNull(ordinal)) return null;
+ var ordinal = dr.GetOrdinal("ReplyTo");
+ if (dr.IsDBNull(ordinal))
+ return null;
- var replyTo = dr.GetString(ordinal);
- return replyTo;
+ var replyTo = dr.GetString(ordinal);
+ return replyTo;
}
private static Dictionary GetContextBag(IDataReader dr)
@@ -431,7 +515,8 @@ private static Dictionary GetContextBag(IDataReader dr)
private Guid? GetCorrelationId(IDataReader dr)
{
var ordinal = dr.GetOrdinal("CorrelationId");
- if (dr.IsDBNull(ordinal)) return null;
+ if (dr.IsDBNull(ordinal))
+ return null;
var correlationId = dr.GetGuid(ordinal);
return correlationId;
@@ -447,7 +532,4 @@ private static DateTime GetTimeStamp(IDataReader dr)
}
}
-
-
}
-
diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs b/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs
index f8ff53093e..16637e5e10 100644
--- a/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs
+++ b/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs
@@ -1,26 +1,70 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Paramore.Brighter.Extensions.DependencyInjection;
+using Paramore.Brighter.PostgreSql;
namespace Paramore.Brighter.Outbox.PostgreSql
{
public static class ServiceCollectionExtensions
{
public static IBrighterBuilder UsePostgreSqlOutbox(
- this IBrighterBuilder brighterBuilder, PostgreSqlOutboxConfiguration configuration, ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
+ this IBrighterBuilder brighterBuilder, PostgreSqlOutboxConfiguration configuration, Type connectionProvider = null, ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
{
+ if (brighterBuilder is null)
+ throw new ArgumentNullException($"{nameof(brighterBuilder)} cannot be null.", nameof(brighterBuilder));
+
+ if (configuration is null)
+ throw new ArgumentNullException($"{nameof(configuration)} cannot be null.", nameof(configuration));
+
brighterBuilder.Services.AddSingleton(configuration);
+ if (connectionProvider is object)
+ {
+ if (!typeof(IPostgreSqlConnectionProvider).IsAssignableFrom(connectionProvider))
+ throw new Exception($"Unable to register provider of type {connectionProvider.GetType().Name}. Class does not implement interface {nameof(IPostgreSqlConnectionProvider)}.");
+
+ brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IPostgreSqlConnectionProvider), connectionProvider, serviceLifetime));
+ }
+
brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxSync), BuildPostgreSqlOutboxSync, serviceLifetime));
return brighterBuilder;
}
+ ///
+ /// Use this transaction provider to ensure that the Outbox and the Entity Store are correct
+ ///
+ /// Allows extension method
+ /// What is the type of the connection provider. Must implement interface IPostgreSqlTransactionConnectionProvider
+ /// What is the lifetime of registered interfaces
+ /// Allows fluent syntax
+ /// This is paired with Use Outbox (above) when required
+ /// Registers the following
+ /// -- IAmABoxTransactionConnectionProvider: the provider of a connection for any existing transaction
+ public static IBrighterBuilder UsePostgreSqlTransactionConnectionProvider(
+ this IBrighterBuilder brighterBuilder, Type connectionProvider,
+ ServiceLifetime serviceLifetime = ServiceLifetime.Scoped)
+ {
+ if (brighterBuilder is null)
+ throw new ArgumentNullException($"{nameof(brighterBuilder)} cannot be null.", nameof(brighterBuilder));
+
+ if (connectionProvider is null)
+ throw new ArgumentNullException($"{nameof(connectionProvider)} cannot be null.", nameof(connectionProvider));
+
+ if (!typeof(IPostgreSqlTransactionConnectionProvider).IsAssignableFrom(connectionProvider))
+ throw new Exception($"Unable to register provider of type {connectionProvider.GetType().Name}. Class does not implement interface {nameof(IPostgreSqlTransactionConnectionProvider)}.");
+
+ brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmABoxTransactionConnectionProvider), connectionProvider, serviceLifetime));
+
+ return brighterBuilder;
+ }
+
private static PostgreSqlOutboxSync BuildPostgreSqlOutboxSync(IServiceProvider provider)
{
var config = provider.GetService();
+ var connectionProvider = provider.GetService();
- return new PostgreSqlOutboxSync(config);
+ return new PostgreSqlOutboxSync(config, connectionProvider);
}
}
}
diff --git a/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj b/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj
new file mode 100644
index 0000000000..6736d47c44
--- /dev/null
+++ b/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj
@@ -0,0 +1,22 @@
+
+
+
+ Sam Rumley
+ Common components required to get a PostgreSql connection from Entity Framework Core.
+ netstandard2.1;net6.0
+ RabbitMQ;AMQP;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/PostgreSqlEntityFrameworkConnectionProvider.cs b/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/PostgreSqlEntityFrameworkConnectionProvider.cs
new file mode 100644
index 0000000000..9f14c6d1e8
--- /dev/null
+++ b/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/PostgreSqlEntityFrameworkConnectionProvider.cs
@@ -0,0 +1,60 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Storage;
+using Npgsql;
+
+namespace Paramore.Brighter.PostgreSql.EntityFrameworkCore
+{
+ ///
+ /// A connection provider that uses the same connection as EF Core
+ ///
+ /// The Db Context to take the connection from
+ public class PostgreSqlEntityFrameworkConnectionProvider : IPostgreSqlTransactionConnectionProvider where T : DbContext
+ {
+ private readonly T _context;
+
+ ///
+ /// Constructs and instance from a database context
+ ///
+ /// The database context to use
+ public PostgreSqlEntityFrameworkConnectionProvider(T context)
+ {
+ _context = context;
+ }
+
+ ///
+ /// Get the current connection of the database context
+ ///
+ /// The NpgsqlConnection that is in use
+ public NpgsqlConnection GetConnection()
+ {
+ return (NpgsqlConnection)_context.Database.GetDbConnection();
+ }
+
+ ///
+ /// Get the current connection of the database context
+ ///
+ /// A cancellation token
+ ///
+ public Task GetConnectionAsync(CancellationToken cancellationToken = default)
+ {
+ var tcs = new TaskCompletionSource();
+ tcs.SetResult((NpgsqlConnection)_context.Database.GetDbConnection());
+ return tcs.Task;
+ }
+
+ ///
+ /// Get the ambient Transaction
+ ///
+ /// The NpgsqlTransaction
+ public NpgsqlTransaction GetTransaction()
+ {
+ return (NpgsqlTransaction)_context.Database.CurrentTransaction?.GetDbTransaction();
+ }
+
+ public bool HasOpenTransaction { get => _context.Database.CurrentTransaction != null; }
+
+ public bool IsSharedConnection { get => true; }
+ }
+}
diff --git a/src/Paramore.Brighter.PostgreSql/IPostgreSqlConnectionProvider.cs b/src/Paramore.Brighter.PostgreSql/IPostgreSqlConnectionProvider.cs
new file mode 100644
index 0000000000..c0df3cdf5f
--- /dev/null
+++ b/src/Paramore.Brighter.PostgreSql/IPostgreSqlConnectionProvider.cs
@@ -0,0 +1,19 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Npgsql;
+
+namespace Paramore.Brighter.PostgreSql
+{
+ public interface IPostgreSqlConnectionProvider
+ {
+ NpgsqlConnection GetConnection();
+
+ Task GetConnectionAsync(CancellationToken cancellationToken = default);
+
+ NpgsqlTransaction GetTransaction();
+
+ bool HasOpenTransaction { get; }
+
+ bool IsSharedConnection { get; }
+ }
+}
diff --git a/src/Paramore.Brighter.PostgreSql/IPostgreSqlTransactionConnectionProvider.cs b/src/Paramore.Brighter.PostgreSql/IPostgreSqlTransactionConnectionProvider.cs
new file mode 100644
index 0000000000..82df857aca
--- /dev/null
+++ b/src/Paramore.Brighter.PostgreSql/IPostgreSqlTransactionConnectionProvider.cs
@@ -0,0 +1,4 @@
+namespace Paramore.Brighter.PostgreSql
+{
+ public interface IPostgreSqlTransactionConnectionProvider : IPostgreSqlConnectionProvider, IAmABoxTransactionConnectionProvider { }
+}
diff --git a/src/Paramore.Brighter.PostgreSql/Paramore.Brighter.PostgreSql.csproj b/src/Paramore.Brighter.PostgreSql/Paramore.Brighter.PostgreSql.csproj
new file mode 100644
index 0000000000..8f2e074ee9
--- /dev/null
+++ b/src/Paramore.Brighter.PostgreSql/Paramore.Brighter.PostgreSql.csproj
@@ -0,0 +1,18 @@
+
+
+
+ netstandard2.0
+ Sam Rumley
+ Common components required to connect to PostgreSql for inbox and outbox.
+ RabbitMQ;AMQP;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Paramore.Brighter.PostgreSql/PostgreSqlConfiguration.cs b/src/Paramore.Brighter.PostgreSql/PostgreSqlConfiguration.cs
new file mode 100644
index 0000000000..649f613e04
--- /dev/null
+++ b/src/Paramore.Brighter.PostgreSql/PostgreSqlConfiguration.cs
@@ -0,0 +1,12 @@
+namespace Paramore.Brighter.PostgreSql
+{
+ public abstract class PostgreSqlConfiguration
+ {
+ protected PostgreSqlConfiguration(string connectionString)
+ {
+ ConnectionString = connectionString;
+ }
+
+ public string ConnectionString { get; }
+ }
+}
diff --git a/src/Paramore.Brighter.PostgreSql/PostgreSqlNpgsqlConnectionProvider.cs b/src/Paramore.Brighter.PostgreSql/PostgreSqlNpgsqlConnectionProvider.cs
new file mode 100644
index 0000000000..0eb773f027
--- /dev/null
+++ b/src/Paramore.Brighter.PostgreSql/PostgreSqlNpgsqlConnectionProvider.cs
@@ -0,0 +1,42 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Npgsql;
+
+namespace Paramore.Brighter.PostgreSql
+{
+ public class PostgreSqlNpgsqlConnectionProvider : IPostgreSqlConnectionProvider
+ {
+ private readonly string _connectionString;
+
+ public PostgreSqlNpgsqlConnectionProvider(PostgreSqlConfiguration configuration)
+ {
+ if (string.IsNullOrWhiteSpace(configuration?.ConnectionString))
+ throw new ArgumentNullException(nameof(configuration.ConnectionString));
+
+ _connectionString = configuration.ConnectionString;
+ }
+
+ public NpgsqlConnection GetConnection()
+ {
+ return new NpgsqlConnection(_connectionString);
+ }
+
+ public async Task GetConnectionAsync(CancellationToken cancellationToken = default)
+ {
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ tcs.SetResult(GetConnection());
+ return await tcs.Task;
+ }
+
+ public NpgsqlTransaction GetTransaction()
+ {
+ //This connection factory does not support transactions
+ return null;
+ }
+
+ public bool HasOpenTransaction { get => false; }
+
+ public bool IsSharedConnection { get => false; }
+ }
+}