SQL Server Transport Native is a shim providing low-level access to the NServiceBus SQL Server Transport with no NServiceBus or SQL Server Transport reference required.
Already a Patron? skip past this section
It is expected that all developers either become a Patron or have a Tidelift Subscription to use NServiceBusExtensions. Go to licensing FAQ
Support this project by becoming a Sponsor. The company avatar will show up here with a website link. The avatar will also be added to all GitHub repositories under the NServiceBusExtensions organization.
Thanks to all the backing developers. Support this project by becoming a patron.
Support is available via a Tidelift Subscription.
- Usage scenarios
- Main Queue
- Delayed Queue
- Headers
- Subscriptions
- Deduplication
- ConnectionHelpers
- SqlServer.HttpPassthrough
- Security contact information
- https://nuget.org/packages/NServiceBus.SqlServer.Native/
- https://nuget.org/packages/NServiceBus.SqlServer.HttpPassthrough/
- https://nuget.org/packages/NServiceBus.SqlServer.Deduplication/
- Error or Audit queue handling: Allows to consume messages from error and audit queues, for example to move them to a long-term archive. NServiceBus expects to have a queue per message type, so NServiceBus endpoints are not suitable for processing error or audit queues. SQL Native allows manipulation or consumption of queues containing multiple types of messages.
- Corrupted or malformed messages: Allows to process poison messages which can't be deserialized by NServiceBus. In SQL Native message headers and body are treated as a raw string and byte array, so corrupted or malformed messages can be read and manipulated in code to correct any problems.
- Deployment or decommission: Allows to perform common operational activities, similar to operations scripts. Running installers requires starting a full endpoint. This is not always ideal during the execution of a deployment or decommission. SQL Native allows creating or deleting of queues with no running endpoint, and with significantly less code. This also makes it a better candidate for usage in deployment scripting languages like PowerShell.
- Bulk operations: SQL Native supports sending and receiving of multiple messages within a single
SQLConnection
andSQLTransaction
. - Explicit connection and transaction management: NServiceBus abstracts the
SQLConnection
andSQLTransaction
creation and management. SQL Native allows any consuming code to manage the scope and settings of both theSQLConnection
andSQLTransaction
. - Message pass through: SQL Native reduces the amount of boilerplate code and simplifies development.
Queue management for the native delayed delivery functionality.
See also SQL Server Transport - SQL statements.
The queue can be created using the following:
var manager = new QueueManager("endpointTable", sqlConnection);
await manager.Create();
The queue can be deleted using the following:
var manager = new QueueManager("endpointTable", sqlConnection);
await manager.Drop();
Sending to the main transport queue.
Sending a single message.
var manager = new QueueManager("endpointTable", sqlConnection);
var message = new OutgoingMessage(
id: Guid.NewGuid(),
headers: headers,
bodyBytes: body);
await manager.Send(message);
Sending a batch of messages.
var manager = new QueueManager("endpointTable", sqlConnection);
var messages = new List<OutgoingMessage>
{
new(
id: Guid.NewGuid(),
headers: headers1,
bodyBytes: body1),
new(
id: Guid.NewGuid(),
headers: headers2,
bodyBytes: body2),
};
await manager.Send(messages);
"Reading" a message returns the data from the database without deleting it.
Reading a single message.
var manager = new QueueManager("endpointTable", sqlConnection);
var message = await manager.Read(rowVersion: 10);
if (message != null)
{
Console.WriteLine(message.Headers);
if (message.Body != null)
{
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine(bodyText);
}
}
Reading a batch of messages.
var manager = new QueueManager("endpointTable", sqlConnection);
var result = await manager.Read(
size: 5,
startRowVersion: 10,
action: async message =>
{
Console.WriteLine(message.Headers);
if (message.Body == null)
{
return;
}
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine(bodyText);
});
Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);
For many scenarios, it is likely to be necessary to keep track of the last message RowVersion
that was read. A lightweight implementation of the functionality is provided by RowVersionTracker
. RowVersionTracker
stores the current RowVersion
in a table containing a single column and row.
var versionTracker = new RowVersionTracker();
// create table
await versionTracker.CreateTable(sqlConnection);
// save row version
await versionTracker.Save(sqlConnection, newRowVersion);
// get row version
var startingRow = await versionTracker.Get(sqlConnection);
Note that this is only one possible implementation of storing the current RowVersion
.
For scenarios where continual processing (reading and executing some code with the result) of incoming messages is required, MessageProcessingLoop
can be used.
An example use case is monitoring an error queue. Some action should be taken when a message appears in the error queue, but it should remain in that queue in case it needs to be retried.
Note that in the below snippet, the above RowVersionTracker
is used for tracking the current RowVersion
.
var rowVersionTracker = new RowVersionTracker();
var startingRow = await rowVersionTracker.Get(sqlConnection);
static async Task Callback(
DbTransaction transaction,
IncomingMessage message,
CancellationToken cancellation)
{
if (message.Body == null)
{
return;
}
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine($"Message received in error message:\r\n{bodyText}");
}
static void ErrorCallback(Exception exception)
{
Environment.FailFast("Message processing loop failed", exception);
}
Task<DbTransaction> TransactionBuilder(CancellationToken cancellation)
{
return ConnectionHelpers.BeginTransaction(connectionString, cancellation);
}
Task PersistRowVersion(
DbTransaction transaction,
long rowVersion,
CancellationToken token)
{
return rowVersionTracker.Save(sqlConnection, rowVersion, token);
}
var processingLoop = new MessageProcessingLoop(
table: "error",
delay: TimeSpan.FromSeconds(1),
transactionBuilder: TransactionBuilder,
callback: Callback,
errorCallback: ErrorCallback,
startingRow: startingRow,
persistRowVersion: PersistRowVersion);
processingLoop.Start();
Console.ReadKey();
await processingLoop.Stop();
"Consuming" a message returns the data from the database and also deletes that message.
Consume a single message.
var manager = new QueueManager("endpointTable", sqlConnection);
var message = await manager.Consume();
if (message != null)
{
Console.WriteLine(message.Headers);
if (message.Body != null)
{
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine(bodyText);
}
}
Consuming a batch of messages.
var manager = new QueueManager("endpointTable", sqlConnection);
var result = await manager.Consume(
size: 5,
action: async message =>
{
Console.WriteLine(message.Headers);
if (message.Body == null)
{
return;
}
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine(bodyText);
});
Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);
For scenarios where continual consumption (consuming and executing some code with the result) of incoming messages is required, MessageConsumingLoop
can be used.
An example use case is monitoring an audit queue. Some action should be taken when a message appears in the audit queue, and it should be purged from the queue to free up the storage space.
static async Task Callback(
DbTransaction transaction,
IncomingMessage message,
CancellationToken cancellation)
{
if (message.Body != null)
{
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine($"Reply received:\r\n{bodyText}");
}
}
Task<DbTransaction> TransactionBuilder(CancellationToken cancellation)
{
return ConnectionHelpers.BeginTransaction(connectionString, cancellation);
}
static void ErrorCallback(Exception exception)
{
Environment.FailFast("Message consuming loop failed", exception);
}
// start consuming
var consumingLoop = new MessageConsumingLoop(
table: "endpointTable",
delay: TimeSpan.FromSeconds(1),
transactionBuilder: TransactionBuilder,
callback: Callback,
errorCallback: ErrorCallback);
consumingLoop.Start();
// stop consuming
await consumingLoop.Stop();
Queue management for the native delayed delivery functionality.
See also SQL Server Transport - SQL statements.
The queue can be created using the following:
var manager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
await manager.Create();
The queue can be deleted using the following:
var manager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
await manager.Drop();
Sending a single message.
var manager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
var message = new OutgoingDelayedMessage(
due: DateTime.UtcNow.AddDays(1),
headers: headers,
bodyBytes: body);
await manager.Send(message);
Sending a batch of messages.
var manager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
var messages = new List<OutgoingDelayedMessage>
{
new(
due: DateTime.UtcNow.AddDays(1),
headers: headers1,
bodyBytes: body1),
new(
due: DateTime.UtcNow.AddDays(1),
headers: headers2,
bodyBytes: body2),
};
await manager.Send(messages);
"Reading" a message returns the data from the database without deleting it.
Reading a single message.
var manager = new DelayedQueueManager("endpointTable", sqlConnection);
var message = await manager.Read(rowVersion: 10);
if (message != null)
{
Console.WriteLine(message.Headers);
if (message.Body != null)
{
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine(bodyText);
}
}
Reading a batch of messages.
var manager = new DelayedQueueManager("endpointTable", sqlConnection);
var result = await manager.Read(
size: 5,
startRowVersion: 10,
action: async message =>
{
Console.WriteLine(message.Headers);
if (message.Body == null)
{
return;
}
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine(bodyText);
});
Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);
"Consuming" a message returns the data from the database and also deletes that message.
Consume a single message.
var manager = new DelayedQueueManager("endpointTable", sqlConnection);
var message = await manager.Consume();
if (message != null)
{
Console.WriteLine(message.Headers);
if (message.Body != null)
{
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine(bodyText);
}
}
Consuming a batch of messages.
var manager = new DelayedQueueManager("endpointTable", sqlConnection);
var result = await manager.Consume(
size: 5,
action: async message =>
{
Console.WriteLine(message.Headers);
if (message.Body == null)
{
return;
}
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync();
Console.WriteLine(bodyText);
});
Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);
There is a headers helpers class NServiceBus.Transport.SqlServerNative.Headers
.
It contains several header related utilities.
Queue management for the native publish subscribe functionality.
The table can be created using the following:
var manager = new SubscriptionManager("SubscriptionRouting", sqlConnection);
await manager.Create();
The table can be deleted using the following:
var manager = new SubscriptionManager("SubscriptionRouting", sqlConnection);
await manager.Drop();
Some scenarios, such as HTTP message pass through, require message deduplication.
The table can be created using the following:
var manager = new DedupeManager(sqlConnection, "DeduplicationTable");
await manager.Create();
The table can be deleted using the following:
var manager = new DedupeManager(sqlConnection, "DeduplicationTable");
await manager.Drop();
Sending to the main transport queue with deduplication.
Sending a single message with deduplication.
var manager = new QueueManager(
"endpointTable",
sqlConnection,
"DeduplicationTable");
var message = new OutgoingMessage(
id: Guid.NewGuid(),
headers: headers,
bodyBytes: body);
await manager.Send(message);
Sending a batch of messages with deduplication.
var manager = new QueueManager(
"endpointTable",
sqlConnection,
"DeduplicationTable");
var messages = new List<OutgoingMessage>
{
new(
id: Guid.NewGuid(),
headers: headers1,
bodyBytes: body1),
new(
id: Guid.NewGuid(),
headers: headers2,
bodyBytes: body2),
};
await manager.Send(messages);
Deduplication records need to live for a period of time after the initial corresponding message has been send. In this way an subsequent message, with the same message id, can be ignored. This necessitates a periodic cleanup process of deduplication records. This is achieved by using DeduplicationCleanerJob
:
At application startup, start an instance of DeduplicationCleanerJob
.
var cleaner = new DedupeCleanerJob(
table: "Deduplication",
connectionBuilder: cancellation =>
{
return ConnectionHelpers.OpenConnection(connectionString, cancellation);
},
criticalError: _ => { },
expireWindow: TimeSpan.FromHours(1),
frequencyToRunCleanup: TimeSpan.FromMinutes(10));
cleaner.Start();
Then at application shutdown stop the instance.
await cleaner.Stop();
Serialize a Dictionary<string, string>
to a JSON string.
var headers = new Dictionary<string, string>
{
{Headers.EnclosedMessageTypes, "SendMessage"}
};
var serialized = Headers.Serialize(headers);
Deserialize a JSON string to a Dictionary<string, string>
.
var headers = Headers.DeSerialize(headersString);
Contains all the string constants copied from NServiceBus.Headers
.
A copy of the timestamp format methods ToWireFormattedString
and ToUtcDateTime
.
The APIs of this extension target either a SQLConnection
and SQLTransaction
. Given that in configuration those values are often expressed as a connection string, ConnectionHelpers
supports converting that string to a SQLConnection
or SQLTransaction
. It provides two methods OpenConnection
and BeginTransaction
with the effective implementation of those methods being:
public static async Task<DbConnection> OpenConnection(
string connectionString,
CancellationToken cancellation)
{
var connection = new SqlConnection(connectionString);
try
{
await connection.OpenAsync(cancellation);
return connection;
}
catch
{
connection.Dispose();
throw;
}
}
public static async Task<DbTransaction> BeginTransaction(
string connectionString,
CancellationToken cancellation)
{
var connection = await OpenConnection(connectionString, cancellation);
return connection.BeginTransaction();
}
SQL HTTP Passthrough provides a bridge between an HTTP stream (via JavaScript on a web page) and the SQL Server transport.
To report a security vulnerability, use the Tidelift security contact. Tidelift will coordinate the fix and disclosure.
Spear designed by Aldric Rodríguez from The Noun Project.