Skip to content

Commit

Permalink
Populate originator property on saga creation (#457) (#464)
Browse files Browse the repository at this point in the history
* add failing tests

* Set originator value when creating new saga
  • Loading branch information
timbussmann authored Dec 6, 2022
1 parent da5eaf8 commit e15fb26
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 8 deletions.
84 changes: 84 additions & 0 deletions src/NServiceBus.Testing.Tests/Sagas/ReplyToOriginator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
namespace NServiceBus.Testing.Tests.Sagas
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NUnit.Framework;

[TestFixture]
public class ReplyToOriginator
{
[Test]
public async Task ReplyToOriginatorShouldReplyToInitialOriginator()
{
const string originatorAddress = "expectedReplyAddress";

var saga = new TestableSaga<ReplyingSaga, ReplyingSagaData>();
// the Originator value is populated by the header value, not the context property
await saga.Handle(new StartSagaMessage() { CorrelationProperty = Guid.NewGuid() }, messageHeaders: new Dictionary<string, string>()
{
{Headers.ReplyToAddress, originatorAddress}
});
var result = await saga.HandleQueuedMessage();

var reply = result.Context.RepliedMessages.SingleOrDefault();
Assert.NotNull(reply);
Assert.AreEqual(originatorAddress, reply.Options.GetDestination());
Assert.AreEqual(originatorAddress, reply.Message<ReplyMessage>().OriginatorAddress);
}

[Test]
public async Task OriginatorShouldBeSetByDefault()
{
// ensure the testing API also works without explicitly defining a replyTo header value
var saga = new TestableSaga<ReplyingSaga, ReplyingSagaData>();

await saga.Handle(new StartSagaMessage() { CorrelationProperty = Guid.NewGuid() });
var result = await saga.HandleQueuedMessage();

var reply = result.Context.RepliedMessages.SingleOrDefault();
Assert.NotNull(reply);
string replyAddress = reply.Options.GetDestination();
Assert.NotNull(replyAddress);
Assert.AreEqual(replyAddress, reply.Message<ReplyMessage>().OriginatorAddress);
}

class ReplyingSaga : NServiceBus.Saga<ReplyingSagaData>, IAmStartedByMessages<StartSagaMessage>, IHandleMessages<SendReplyMessage>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ReplyingSagaData> mapper) => mapper
.ConfigureMapping<StartSagaMessage>(m => m.CorrelationProperty)
.ToSaga(s => s.CorrelationProperty);

public Task Handle(StartSagaMessage message, IMessageHandlerContext context)
{
return context.SendLocal(new SendReplyMessage());
}

public Task Handle(SendReplyMessage message, IMessageHandlerContext context)
{
return ReplyToOriginator(context, new ReplyMessage { OriginatorAddress = Data.Originator });
}
}

class ReplyingSagaData : ContainSagaData
{
public Guid CorrelationProperty { get; set; }
}

class StartSagaMessage : IMessage
{
public Guid CorrelationProperty { get; set; }
}

class SendReplyMessage : ICommand
{
public Guid CorrelationProperty { get; set; }
}

class ReplyMessage : IMessage
{
public string OriginatorAddress { get; set; }
}
}
}
17 changes: 9 additions & 8 deletions src/NServiceBus.Testing/Sagas/TestableSaga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async Task<HandleResult> InnerHandle(QueuedSagaMessage message, string handleMet
{
await session.Open(context.Extensions, context.CancellationToken).ConfigureAwait(false);

var (data, isNew, mappedValue) = await LoadSagaData(message, session, context.Extensions, context.CancellationToken).ConfigureAwait(false);
var (data, isNew, mappedValue) = await LoadSagaData(message, session, context).ConfigureAwait(false);
saga.Entity = data;

await sagaMapper.InvokeHandlerMethod(saga, handleMethodName, message, context).ConfigureAwait(false);
Expand Down Expand Up @@ -279,16 +279,14 @@ async Task<HandleResult> HandleTimeout(OutgoingMessage<object, SendOptions> time
}

async Task<(TSagaData Data, bool IsNew, object MappedValue)> LoadSagaData(
QueuedSagaMessage message, ISynchronizedStorageSession session, ContextBag contextBag, CancellationToken cancellationToken)
QueuedSagaMessage message, ISynchronizedStorageSession session, TestableMessageHandlerContext context)
{
var messageMetadata = sagaMapper.GetMessageMetadata(message.Type);
TSagaData sagaData;

if (message.Headers != null &&
message.Headers.TryGetValue(Headers.SagaId, out var sagaIdString) &&
Guid.TryParse(sagaIdString, out Guid sagaId))
if (message.Headers.TryGetValue(Headers.SagaId, out var sagaIdString) && Guid.TryParse(sagaIdString, out Guid sagaId))
{
sagaData = await persister.Get<TSagaData>(sagaId, session, contextBag, cancellationToken).ConfigureAwait(false);
sagaData = await persister.Get<TSagaData>(sagaId, session, context.Extensions, context.CancellationToken).ConfigureAwait(false);
if (sagaData != null)
{
return (sagaData, false, null);
Expand All @@ -297,7 +295,7 @@ async Task<HandleResult> HandleTimeout(OutgoingMessage<object, SendOptions> time

var messageMappedValue = sagaMapper.GetMessageMappedValue(message);

sagaData = await persister.Get<TSagaData>(sagaMapper.CorrelationPropertyName, messageMappedValue, session, contextBag, cancellationToken).ConfigureAwait(false);
sagaData = await persister.Get<TSagaData>(sagaMapper.CorrelationPropertyName, messageMappedValue, session, context.Extensions, context.CancellationToken).ConfigureAwait(false);

if (sagaData != null)
{
Expand All @@ -306,7 +304,10 @@ async Task<HandleResult> HandleTimeout(OutgoingMessage<object, SendOptions> time

if (messageMetadata.IsAllowedToStartSaga)
{
sagaData = new TSagaData { Id = Guid.NewGuid() };
var originatorAddress = message.Headers.TryGetValue(Headers.ReplyToAddress, out var replyAddress)
? replyAddress
: context.ReplyToAddress; // This property has a default value set even when the header isn't set to require less setup for testing
sagaData = new TSagaData { Id = Guid.NewGuid(), Originator = originatorAddress };
sagaMapper.SetCorrelationPropertyValue(sagaData, messageMappedValue);
return (sagaData, true, messageMappedValue);
}
Expand Down

0 comments on commit e15fb26

Please sign in to comment.