-
-
Notifications
You must be signed in to change notification settings - Fork 363
/
Copy pathAddInputQueueNameToIncomingStepContext.cs
84 lines (67 loc) · 2.67 KB
/
AddInputQueueNameToIncomingStepContext.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using NUnit.Framework;
using Rebus.Activation;
using Rebus.Config;
using Rebus.Pipeline;
using Rebus.Retry.Simple;
using Rebus.Tests.Contracts;
using Rebus.Tests.Contracts.Extensions;
using Rebus.Transport;
using Rebus.Transport.InMem;
using static Rebus.Tests.Examples.AddInputQueueNameToIncomingStepContext;
#pragma warning disable CS4014
#pragma warning disable CS1998
namespace Rebus.Tests.Examples;
[TestFixture]
public class AddInputQueueNameToIncomingStepContext : FixtureBase
{
[Test]
public async Task DemonstratesHowTheInputQueueNameCanBeRetrieved()
{
var unguessableQueueName = Guid.NewGuid().ToString();
var futureStrings = new ConcurrentQueue<string>();
using var activator = new BuiltinHandlerActivator();
activator.Handle<string>(async (_, messageContext, _) =>
{
var incomingStepContext = messageContext.IncomingStepContext;
var inputQueueName = incomingStepContext.Load<InputQueueName>();
futureStrings.Enqueue(inputQueueName?.QueueName);
});
Configure.With(activator)
.Transport(t => t.UseInMemoryTransport(new(), unguessableQueueName))
.Options(o => o.AddInputQueueNameToStepContext())
.Start();
await activator.Bus.SendLocal("HEJ");
await futureStrings.WaitUntil(q => q.Count == 1);
Assert.That(futureStrings.First(), Is.EqualTo(unguessableQueueName));
}
public record InputQueueName(string QueueName);
}
static class NiftyRebusConfigurationExtensions
{
public static void AddInputQueueNameToStepContext(this OptionsConfigurer configurer)
{
configurer.Decorate<IPipeline>(c =>
{
var pipeline = c.Get<IPipeline>();
var inputQueueName = c.Get<ITransport>().Address;
var step = new AddInputQueueNameToPipelineStep(new(inputQueueName));
return new PipelineStepInjector(pipeline)
.OnReceive(step, PipelineRelativePosition.After, typeof(DefaultRetryStep));
});
}
[StepDocumentation("Adds the bus' own input queue name to the incoming step context in the form of an InputQueueName object containing the queue name.")]
class AddInputQueueNameToPipelineStep : IIncomingStep
{
readonly InputQueueName _inputQueueName;
public AddInputQueueNameToPipelineStep(InputQueueName inputQueueName) => _inputQueueName = inputQueueName;
public async Task Process(IncomingStepContext context, Func<Task> next)
{
context.Save(_inputQueueName);
await next();
}
}
}