How to know which queue a message is received on? #1086
-
In my program, I register multiple queues using services.AddRebus(... ,key="<name>"). They all use the same message handler. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Unfortunately no – at least not at the moment. Being able to host multiple Rebus instances in the same container instance is a fairly new thing, so it has never really been relevant to provide this information anywhere, but I can definitely see how it could be useful in some cases. To fix your issue right away, you can extend your Rebus instances with a small plugin that makes the queue name available. It can be done like this – first, create a nice extension method that adds the queue name to the incoming step context in the form of an public static class NiftyRebusConfigurationExtensions
{
public static void AddInputQueueNameToStepContext(this OptionsConfigurer configurer)
{
configurer.Decorate<IPipeline>(c =>
{
var pipeline = c.Get<IPipeline>();
var inputQueueName = c.Get<ITransport>().Address;
var step = new AddInputQueueNameToPipelineStep(new(inputQueueName));
return new PipelineStepInjector(pipeline)
.OnReceive(step, PipelineRelativePosition.After, typeof(DefaultRetryStep));
});
}
[StepDocumentation("Adds the bus' own input queue name to the incoming step context in the form of an InputQueueName object containing the queue name.")]
class AddInputQueueNameToPipelineStep : IIncomingStep
{
readonly InputQueueName _inputQueueName;
public AddInputQueueNameToPipelineStep(InputQueueName inputQueueName) => _inputQueueName = inputQueueName;
public async Task Process(IncomingStepContext context, Func<Task> next)
{
context.Save(_inputQueueName);
await next();
}
}
} and then be sure to enable it on your Rebus instances: services.AddRebus(
configure => configure
.(...)
.Options(o => o.AddInputQueueNameToStepContext())
); With that done, you can retrieve the input queue name via public class MyMessageHandler : IHandleMessages<MyMessage>
{
readonly IMessageContext _messageContext;
public MyMessageHandler(IMessageContext messageContext) => _messageContext = messageContext;
public async Task Handle(MyMessage message)
{
var incomingStepContext = _messageContext.IncomingStepContext;
var queueName = incomingStepContext.Load<InputQueueName>()?.QueueName
?? throw new ApplicationException("Did not find the expected queue name object in the step context");
}
} You can see a working (but slightly different) example here: https://github.com/rebus-org/Rebus/blob/master/Rebus.Tests/Examples/AddInputQueueNameToIncomingStepContext.cs |
Beta Was this translation helpful? Give feedback.
Unfortunately no – at least not at the moment.
Being able to host multiple Rebus instances in the same container instance is a fairly new thing, so it has never really been relevant to provide this information anywhere, but I can definitely see how it could be useful in some cases.
To fix your issue right away, you can extend your Rebus instances with a small plugin that makes the queue name available. It can be done like this – first, create a nice extension method that adds the queue name to the incoming step context in the form of an
InputQueueName
object: