Skip to content

Commit

Permalink
Update MsSql Outbox Get and GetAsync (#1762)
Browse files Browse the repository at this point in the history
Updated the MsSql outbox to return null if message is not found in the outbox #1758

renamed MsSqlOutboxSync to MsSqlOutbox as it implements both Sync and Async

Co-authored-by: Ian Cooper <[email protected]>
  • Loading branch information
preardon and iancooper authored Oct 17, 2021
1 parent ff3ccd0 commit 1c2d24b
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ namespace Paramore.Brighter.Outbox.MsSql
/// <summary>
/// Class MsSqlOutbox.
/// </summary>
public class MsSqlOutboxSync :
public class MsSqlOutbox :
IAmAnOutboxSync<Message>,
IAmAnOutboxAsync<Message>,
IAmAnOutboxViewer<Message>,
IAmAnOutboxViewerAsync<Message>
{
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MsSqlOutboxSync>();
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MsSqlOutbox>();

private const int MsSqlDuplicateKeyError_UniqueIndexViolation = 2601;
private const int MsSqlDuplicateKeyError_UniqueConstraintViolation = 2627;
Expand All @@ -62,22 +62,22 @@ public class MsSqlOutboxSync :
public bool ContinueOnCapturedContext { get; set; }

/// <summary>
/// Initializes a new instance of the <see cref="MsSqlOutboxSync" /> class.
/// Initializes a new instance of the <see cref="MsSqlOutbox" /> class.
/// </summary>
/// <param name="configuration">The configuration.</param>
/// <param name="connectionProvider">The connection factory.</param>
public MsSqlOutboxSync(MsSqlConfiguration configuration, IMsSqlConnectionProvider connectionProvider)
public MsSqlOutbox(MsSqlConfiguration configuration, IMsSqlConnectionProvider connectionProvider)
{
_configuration = configuration;
ContinueOnCapturedContext = false;
_connectionProvider = connectionProvider;
}

/// <summary>
/// Initializes a new instance of the <see cref="MsSqlOutboxSync" /> class.
/// Initializes a new instance of the <see cref="MsSqlOutbox" /> class.
/// </summary>
/// <param name="configuration">The configuration.</param>
public MsSqlOutboxSync(MsSqlConfiguration configuration) : this(configuration, new MsSqlSqlAuthConnectionProvider(configuration))
public MsSqlOutbox(MsSqlConfiguration configuration) : this(configuration, new MsSqlSqlAuthConnectionProvider(configuration))
{
}

Expand Down Expand Up @@ -690,7 +690,7 @@ private Message MapFunction(SqlDataReader dr)
}
dr.Close();

return message ?? new Message();
return message;
}

private async Task<Message> MapFunctionAsync(SqlDataReader dr)
Expand All @@ -702,7 +702,7 @@ private async Task<Message> MapFunctionAsync(SqlDataReader dr)
}
dr.Close();

return message ?? new Message();
return message;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ public static IBrighterBuilder UseMsSqlTransactionConnectionProvider(
return brighterBuilder;
}

private static MsSqlOutboxSync BuildMsSqlOutbox(IServiceProvider provider)
private static MsSqlOutbox BuildMsSqlOutbox(IServiceProvider provider)
{
var connectionProvider = provider.GetService<IMsSqlConnectionProvider>();
var config = provider.GetService<MsSqlConfiguration>();

return new MsSqlOutboxSync(config, connectionProvider);
return new MsSqlOutbox(config, connectionProvider);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,24 @@ public class MsSqlOutboxMessageAlreadyExistsAsyncTests : IDisposable
{
private Exception _exception;
private readonly Message _messageEarliest;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;
private readonly MsSqlTestHelper _msSqlTestHelper;

public MsSqlOutboxMessageAlreadyExistsAsyncTests()
{
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
_messageEarliest = new Message(new MessageHeader(Guid.NewGuid(), "test_topic", MessageType.MT_DOCUMENT), new MessageBody("message body"));
}

[Fact]
public async Task When_The_Message_Is_Already_In_The_Outbox_Async()
{
await _sqlOutboxSync.AddAsync(_messageEarliest);
await _sqlOutbox.AddAsync(_messageEarliest);

_exception = await Catch.ExceptionAsync(() => _sqlOutboxSync.AddAsync(_messageEarliest));
_exception = await Catch.ExceptionAsync(() => _sqlOutbox.AddAsync(_messageEarliest));

//should ignore the duplcate key and still succeed
_exception.Should().BeNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,23 @@ public class MsSqlOutboxMessageAlreadyExistsTests : IDisposable
{
private Exception _exception;
private readonly Message _messageEarliest;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;
private readonly MsSqlTestHelper _msSqlTestHelper;

public MsSqlOutboxMessageAlreadyExistsTests()
{
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
_messageEarliest = new Message(new MessageHeader(Guid.NewGuid(), "test_topic", MessageType.MT_DOCUMENT), new MessageBody("message body"));
_sqlOutboxSync.Add(_messageEarliest);
_sqlOutbox.Add(_messageEarliest);
}

[Fact]
public void When_The_Message_Is_Already_In_The_Outbox()
{
_exception = Catch.Exception(() => _sqlOutboxSync.Add(_messageEarliest));
_exception = Catch.Exception(() => _sqlOutbox.Add(_messageEarliest));

//should ignore the duplcate key and still succeed
_exception.Should().BeNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,28 @@ public class MsSqlOutboxRangeRequestTests : IDisposable
private readonly string _TopicFirstMessage = "test_topic";
private readonly string _TopicLastMessage = "test_topic3";
private IEnumerable<Message> _messages;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;

public MsSqlOutboxRangeRequestTests()
{
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
var messageEarliest = new Message(new MessageHeader(Guid.NewGuid(), _TopicFirstMessage, MessageType.MT_DOCUMENT), new MessageBody("message body"));
var message1 = new Message(new MessageHeader(Guid.NewGuid(), "test_topic2", MessageType.MT_DOCUMENT), new MessageBody("message body2"));
var message2 = new Message(new MessageHeader(Guid.NewGuid(), _TopicLastMessage, MessageType.MT_DOCUMENT), new MessageBody("message body3"));
_sqlOutboxSync.Add(messageEarliest);
_sqlOutbox.Add(messageEarliest);
Task.Delay(100);
_sqlOutboxSync.Add(message1);
_sqlOutbox.Add(message1);
Task.Delay(100);
_sqlOutboxSync.Add(message2);
_sqlOutbox.Add(message2);
}

[Fact]
public void When_There_Are_Multiple_Messages_In_The_Outbox_And_A_Range_Is_Fetched()
{
_messages = _sqlOutboxSync.Get(1, 3);
_messages = _sqlOutbox.Get(1, 3);

//should fetch 1 message
_messages.Should().HaveCount(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ public class MsSqlOutboxRangeRequestAsyncTests : IDisposable
private readonly Message _message1;
private readonly Message _message2;
private readonly Message _messageEarliest;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;

public MsSqlOutboxRangeRequestAsyncTests()
{
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
_messageEarliest = new Message(new MessageHeader(Guid.NewGuid(), _TopicFirstMessage, MessageType.MT_DOCUMENT), new MessageBody("message body"));
_message1 = new Message(new MessageHeader(Guid.NewGuid(), "test_topic2", MessageType.MT_DOCUMENT), new MessageBody("message body2"));
_message2 = new Message(new MessageHeader(Guid.NewGuid(), _TopicLastMessage, MessageType.MT_DOCUMENT), new MessageBody("message body3"));
Expand All @@ -59,13 +59,13 @@ public MsSqlOutboxRangeRequestAsyncTests()
[Fact]
public async Task When_There_Are_Multiple_Messages_In_The_Outbox_And_A_Range_Is_Fetched_Async()
{
await _sqlOutboxSync.AddAsync(_messageEarliest);
await _sqlOutbox.AddAsync(_messageEarliest);
await Task.Delay(100);
await _sqlOutboxSync.AddAsync(_message1);
await _sqlOutbox.AddAsync(_message1);
await Task.Delay(100);
await _sqlOutboxSync.AddAsync(_message2);
await _sqlOutbox.AddAsync(_message2);

_messages = await _sqlOutboxSync.GetAsync(1, 3);
_messages = await _sqlOutbox.GetAsync(1, 3);

//should fetch 1 message
_messages.Should().HaveCount(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ namespace Paramore.Brighter.MSSQL.Tests.Outbox
public class OutstandingMessagesTests
{
private readonly Message _dispatchedMessage;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;
private readonly MsSqlTestHelper _msSqlTestHelper;

public OutstandingMessagesTests()
{
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
_dispatchedMessage = new Message(new MessageHeader(Guid.NewGuid(), "test_topic", MessageType.MT_DOCUMENT), new MessageBody("message body"));
_sqlOutboxSync.Add(_dispatchedMessage);
_sqlOutbox.Add(_dispatchedMessage);

//wait to create an oustanding period
Task.Delay(1000).Wait();
Expand All @@ -31,7 +31,7 @@ public OutstandingMessagesTests()
[Fact]
public void When_there_is_an_outstanding_message_in_the_outbox()
{
var outstandingMessage = _sqlOutboxSync.OutstandingMessages(100).SingleOrDefault();
var outstandingMessage = _sqlOutbox.OutstandingMessages(100).SingleOrDefault();

outstandingMessage.Should().NotBeNull();
outstandingMessage.Id.Should().Be(_dispatchedMessage.Id);
Expand All @@ -40,7 +40,7 @@ public void When_there_is_an_outstanding_message_in_the_outbox()
[Fact]
public async Task When_there_is_an_outstanding_message_in_the_outbox_async()
{
var outstandingMessage = (await _sqlOutboxSync.OutstandingMessagesAsync(100)).SingleOrDefault();
var outstandingMessage = (await _sqlOutbox.OutstandingMessagesAsync(100)).SingleOrDefault();

outstandingMessage.Should().NotBeNull();
outstandingMessage.Id.Should().Be(_dispatchedMessage.Id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ public class MsSqlOutboxEmptyStoreTests : IDisposable
{
private readonly MsSqlTestHelper _msSqlTestHelper;
private readonly Message _messageEarliest;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;
private Message _storedMessage;

public MsSqlOutboxEmptyStoreTests()
{
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
_messageEarliest = new Message(new MessageHeader(Guid.NewGuid(), "test_topic", MessageType.MT_DOCUMENT), new MessageBody("message body"));
}

[Fact]
public void When_There_Is_No_Message_In_The_Sql_Outbox()
{
_storedMessage = _sqlOutboxSync.Get(_messageEarliest.Id);
_storedMessage = _sqlOutbox.Get(_messageEarliest.Id);

//should return an empty message
_storedMessage.Header.MessageType.Should().Be(MessageType.MT_NONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ public class MsSqlOutboxEmptyStoreAsyncTests : IDisposable
{
private readonly MsSqlTestHelper _msSqlTestHelper;
private readonly Message _messageEarliest;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;
private Message _storedMessage;

public MsSqlOutboxEmptyStoreAsyncTests()
{
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
_messageEarliest = new Message(new MessageHeader(Guid.NewGuid(), "test_topic", MessageType.MT_DOCUMENT), new MessageBody("message body"));
}

[Fact]
public async Task When_There_Is_No_Message_In_The_Sql_Outbox_Async()
{
_storedMessage = await _sqlOutboxSync.GetAsync(_messageEarliest.Id);
_storedMessage = await _sqlOutbox.GetAsync(_messageEarliest.Id);

//should return a empty message
_storedMessage.Header.MessageType.Should().Be(MessageType.MT_NONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SqlOutboxWritingMessageTests : IDisposable
private readonly string _key4 = "name4";
private readonly string _key5 = "name5";
private readonly Message _message;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;
private Message _storedMessage;
private readonly string _value1 = "value1";
private readonly string _value2 = "value2";
Expand All @@ -53,7 +53,7 @@ public SqlOutboxWritingMessageTests()
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
var messageHeader = new MessageHeader(
messageId:Guid.NewGuid(),
topic: "test_topic",
Expand All @@ -71,13 +71,13 @@ public SqlOutboxWritingMessageTests()
messageHeader.Bag.Add(_key5, _value5);

_message = new Message(messageHeader, new MessageBody("message body"));
_sqlOutboxSync.Add(_message);
_sqlOutbox.Add(_message);
}

[Fact]
public void When_Writing_A_Message_To_The_MSSQL_Outbox()
{
_storedMessage = _sqlOutboxSync.Get(_message.Id);
_storedMessage = _sqlOutbox.Get(_message.Id);

//should read the message from the sql outbox
_storedMessage.Body.Value.Should().Be(_message.Body.Value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SqlOutboxWritingMessageAsyncTests : IDisposable
private readonly string _key4 = "name4";
private readonly string _key5 = "name5";
private readonly Message _message;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;
private Message _storedMessage;
private readonly string _value1 = "value1";
private readonly string _value2 = "value2";
Expand All @@ -53,7 +53,7 @@ public SqlOutboxWritingMessageAsyncTests ()
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
var messageHeader = new MessageHeader(
messageId:Guid.NewGuid(),
topic: "test_topic",
Expand All @@ -71,15 +71,15 @@ public SqlOutboxWritingMessageAsyncTests ()
messageHeader.Bag.Add(_key5, _value5);

_message = new Message(messageHeader, new MessageBody("message body"));
_sqlOutboxSync.Add(_message);
_sqlOutbox.Add(_message);
}

[Fact]
public async Task When_Writing_A_Message_To_The_Outbox_Async()
{
await _sqlOutboxSync.AddAsync(_message);
await _sqlOutbox.AddAsync(_message);

_storedMessage = await _sqlOutboxSync.GetAsync(_message.Id);
_storedMessage = await _sqlOutbox.GetAsync(_message.Id);
//should read the message from the sql outbox
_storedMessage.Body.Value.Should().Be(_message.Body.Value);
//should read the header from the sql outbox
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,28 @@ public class SqlOutboxWritingMessagesTests : IDisposable
private readonly Message _messageEarliest;
private readonly Message _messageLatest;
private IEnumerable<Message> _retrievedMessages;
private readonly MsSqlOutboxSync _sqlOutboxSync;
private readonly MsSqlOutbox _sqlOutbox;

public SqlOutboxWritingMessagesTests()
{
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutboxSync = new MsSqlOutboxSync(_msSqlTestHelper.OutboxConfiguration);
_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
_messageEarliest = new Message(new MessageHeader(Guid.NewGuid(), "Test", MessageType.MT_COMMAND, DateTime.UtcNow.AddHours(-3)), new MessageBody("Body"));
_sqlOutboxSync.Add(_messageEarliest);
_sqlOutbox.Add(_messageEarliest);

var message2 = new Message(new MessageHeader(Guid.NewGuid(), "Test2", MessageType.MT_COMMAND, DateTime.UtcNow.AddHours(-2)), new MessageBody("Body2"));
_sqlOutboxSync.Add(message2);
_sqlOutbox.Add(message2);

_messageLatest = new Message(new MessageHeader(Guid.NewGuid(), "Test3", MessageType.MT_COMMAND, DateTime.UtcNow.AddHours(-1)), new MessageBody("Body3"));
_sqlOutboxSync.Add(_messageLatest);
_sqlOutbox.Add(_messageLatest);
}

[Fact]
public void When_Writing_Messages_To_The_Outbox()
{
_retrievedMessages = _sqlOutboxSync.Get();
_retrievedMessages = _sqlOutbox.Get();

//should read first message last from the outbox
_retrievedMessages.Last().Id.Should().Be(_messageEarliest.Id);
Expand Down
Loading

0 comments on commit 1c2d24b

Please sign in to comment.