Skip to content

Commit

Permalink
Added OnSaveChanges callback for implementing UoW (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega authored Aug 24, 2017
1 parent 82f4e66 commit 54bb250
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/SqlPersistence.Tests/APIApprovals.Approve.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{
System.Data.Common.DbConnection Connection { get; }
System.Data.Common.DbTransaction Transaction { get; }
void OnSaveChanges(System.Func<NServiceBus.Persistence.Sql.ISqlStorageSession, System.Threading.Tasks.Task> callback);
}
[System.ObsoleteAttribute("Not for public use")]
public class static OutboxCommandBuilder
Expand Down
74 changes: 74 additions & 0 deletions src/SqlPersistence.Tests/Saga/SagaPersisterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,80 @@ void DropAndCreate(SagaDefinition definition, string endpointName)
}
}

[Test]
public async Task CallbackIsInvoked()
{
var callbackInvoked = false;
using (var connection = dbConnection())
using (var transaction = connection.BeginTransaction())
using (var storageSession = new StorageSession(connection, transaction, true, null))
{
storageSession.OnSaveChanges(s =>
{
callbackInvoked = true;
return Task.FromResult(0);
});
await storageSession.CompleteAsync();
}
Assert.IsTrue(callbackInvoked);
}

[Test]
public async Task CallbackThrows()
{
var exceptionThrown = false;
var id = Guid.NewGuid();
var sagaData = new SagaWithCorrelation.SagaData
{
Id = id,
OriginalMessageId = "theOriginalMessageId",
Originator = "theOriginator",
SimpleProperty = "PropertyValue",
CorrelationProperty = "theCorrelationProperty"
};

var persister = SetUp(nameof(CallbackThrows));
var definition = new SagaDefinition(
tableSuffix: "SagaWithCorrelation",
name: "SagaWithCorrelation",
correlationProperty: new CorrelationProperty
(
name: "CorrelationProperty",
type: CorrelationPropertyType.String
)
);
DropAndCreate(definition, nameof(CallbackThrows));

using (var connection = dbConnection())
using (var transaction = connection.BeginTransaction())
using (var storageSession = new StorageSession(connection, transaction, true, null))
{
await persister.Save(sagaData, storageSession, "theProperty").ConfigureAwait(false);
storageSession.OnSaveChanges(s =>
{
throw new Exception("Simulated");
});
try
{
await storageSession.CompleteAsync();
}
catch (Exception)
{
exceptionThrown = true;
}
}

Assert.IsTrue(exceptionThrown);

using (var connection = dbConnection())
using (var transaction = connection.BeginTransaction())
using (var storageSession = new StorageSession(connection, transaction, true, null))
{
var savedEntity = await persister.Get<SagaWithCorrelation.SagaData>(id, storageSession).ConfigureAwait(false);
Assert.IsNull(savedEntity.Data);
}
}

async Task<SagaWithCorrelation.SagaData> SaveAsync(Guid id, string endpointName)
{
var sagaData = new SagaWithCorrelation.SagaData
Expand Down
8 changes: 8 additions & 0 deletions src/SqlPersistence/SynchronizedStorage/ISqlStorageSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

namespace NServiceBus.Persistence.Sql
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Exposes the current <see cref="DbTransaction"/> and <see cref="DbConnection"/>.
/// <seealso cref="SqlPersistenceStorageSessionExtensions.SqlPersistenceSession"/>
Expand All @@ -17,5 +20,10 @@ public interface ISqlStorageSession
/// The current <see cref="DbConnection"/>.
/// </summary>
DbConnection Connection { get; }

/// <summary>
/// Registers a callback to be called before completing the session.
/// </summary>
void OnSaveChanges(Func<ISqlStorageSession, Task> callback);
}
}
20 changes: 17 additions & 3 deletions src/SqlPersistence/SynchronizedStorage/StorageSession.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System.Data.Common;
using System;
using System.Data.Common;
using System.Threading.Tasks;
using NServiceBus.Persistence;
using NServiceBus.Persistence.Sql;

class StorageSession : CompletableSynchronizedStorageSession, ISqlStorageSession
{
bool ownsTransaction;
Func<ISqlStorageSession, Task> onSaveChangesCallback;

public StorageSession(DbConnection connection, DbTransaction transaction, bool ownsTransaction, SagaInfoCache infoCache)
{
Expand All @@ -20,16 +22,28 @@ public StorageSession(DbConnection connection, DbTransaction transaction, bool o
internal SagaInfoCache InfoCache { get; }
public DbTransaction Transaction { get; }
public DbConnection Connection { get; }
public void OnSaveChanges(Func<ISqlStorageSession, Task> callback)
{
Guard.AgainstNull(nameof(callback), callback);
if (onSaveChangesCallback != null)
{
throw new Exception("Save changes callback for this session has already been registered.");
}
onSaveChangesCallback = callback;
}

public Task CompleteAsync()
public async Task CompleteAsync()
{
if (onSaveChangesCallback != null)
{
await onSaveChangesCallback(this).ConfigureAwait(false);
}
if (ownsTransaction)
{
Transaction?.Commit();
Transaction?.Dispose();
Connection.Dispose();
}
return Task.FromResult(0);
}

public void Dispose()
Expand Down

0 comments on commit 54bb250

Please sign in to comment.