Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UseContext to respond pipe #290

Open
wants to merge 3 commits into
base: 2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using RawRabbit.Operations.Respond.Context;
using RawRabbit.Pipe;

namespace RawRabbit.Enrichers.MessageContext.Respond
{
public static class PipeContextExtensions
{
public const string PipebasedContextFunc = "Respond:MessageContext:PipebasedContext";
private const string MessageContextType = "Respond:MessageContext:Type";

public static IPipeContext AddMessageContextType<TMessageContext>(this IPipeContext context)
Expand All @@ -17,5 +19,16 @@ public static Type GetMessageContextType(this IPipeContext context)
{
return context.Get(MessageContextType, typeof(object));
}

public static IRespondContext UseMessageContext(this IRespondContext context, Func<IPipeContext, object> contextFunc)
{
context.Properties.TryAdd(PipebasedContextFunc, contextFunc);
return context;
}

public static Func<IPipeContext, object> GetMessageContextResolver(this IPipeContext context)
{
return context.Get<Func<IPipeContext, object>>(PipebasedContextFunc);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ public static class RespondMessageContextExtension
BodyTypeFunc = context => context.GetRequestMessageType()
})
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(RespondStage.MessageDeserialized))
.Use<HandlerInvocationMiddleware >(ResponseHandlerOptionFactory.Create(new HandlerInvocationOptions
.Use<HandlerInvocationMiddleware>(ResponseHandlerOptionFactory.Create(new HandlerInvocationOptions
{
HandlerArgsFunc = context => new[] { context.GetMessage(), context.GetMessageContext() }
HandlerArgsFunc = context => new[]
{
context.GetMessage(),
context.GetMessageContextResolver()?.Invoke(context) ?? context.GetMessageContext()
}
}))
})
.Use<HeaderDeserializationMiddleware>(new HeaderDeserializationOptions
Expand Down Expand Up @@ -67,7 +71,7 @@ public static Task<IPipeContext> RespondAsync<TRequest, TResponse, TMessageConte
return client
.InvokeAsync(RespondPipe, ctx =>
{
Func<object[], Task> genericHandler = args => (handler((TRequest) args[0], (TMessageContext) args[1])
Func<object[], Task> genericHandler = args => (handler((TRequest)args[0], (TMessageContext)args[1])
.ContinueWith(tResponse =>
{
if (tResponse.IsFaulted)
Expand All @@ -76,7 +80,10 @@ public static Task<IPipeContext> RespondAsync<TRequest, TResponse, TMessageConte
}, ct));
ctx.Properties.Add(RespondKey.IncomingMessageType, typeof(TRequest));
ctx.Properties.Add(RespondKey.OutgoingMessageType, typeof(TResponse));
ctx.AddMessageContextType<TMessageContext>();
if (!ctx.Properties.ContainsKey(Enrichers.MessageContext.Respond.PipeContextExtensions.PipebasedContextFunc))
{
ctx.AddMessageContextType<TMessageContext>();
}
ctx.Properties.Add(PipeKey.MessageHandler, genericHandler);
context?.Invoke(new RespondContext(ctx));
}, ct);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using RawRabbit.Common;
using RawRabbit.IntegrationTests.TestMessages;
using RawRabbit.Operations.Subscribe.Context;
using Xunit;

namespace RawRabbit.IntegrationTests.PublishAndSubscribe
{
public class ErrorOnExceptionTests
{
[Fact]
public async Task All_Good_In_Custom_Subscription()
{
using (var publisher = RawRabbitFactory.CreateTestClient())
using (var subscriber = RawRabbitFactory.CreateTestClient())
using (var deadletterSubscriber = RawRabbitFactory.CreateTestClient())
{
var messageId = Guid.NewGuid().ToString();

await subscriber
.CustomSubscribeAsync<BasicMessage>(
async message =>
{
throw new DivideByZeroException();
} )
.ConfigureAwait(false);

var errorExchange = new NamingConventions().ErrorExchangeNamingConvention();

var tcs = new TaskCompletionSource<BasicMessage>();

await deadletterSubscriber
.SubscribeAsync<BasicMessage>(
async message =>
{
if (message.Prop == messageId)
tcs.TrySetResult(message);
},
pipe => pipe
.UseSubscribeConfiguration(cfg => cfg
.FromDeclaredQueue(q => q.WithName("q"))
.OnDeclaredExchange(ex => ex.WithName(errorExchange))
)
);

await publisher
.PublishAsync(new BasicMessage { Prop = messageId })
.ConfigureAwait(false);

using (new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token.Register(() => tcs.TrySetCanceled()))
{
await tcs.Task.ConfigureAwait(false);
}
}
}

[Fact]
public async Task Continuation_Swallows_Exception_In_Existing()
{
using (var publisher = RawRabbitFactory.CreateTestClient())
using (var subscriber = RawRabbitFactory.CreateTestClient())
using (var deadletterSubscriber = RawRabbitFactory.CreateTestClient())
{
var messageId = Guid.NewGuid().ToString();

await subscriber
.SubscribeAsync<BasicMessage>(
async message =>
{
throw new DivideByZeroException();
})
.ConfigureAwait(false);

var errorExchange = new NamingConventions().ErrorExchangeNamingConvention();

var tcs = new TaskCompletionSource<BasicMessage>();

await deadletterSubscriber
.SubscribeAsync<BasicMessage>(
async message =>
{
if (message.Prop == messageId)
tcs.TrySetResult(message);
},
pipe => pipe
.UseSubscribeConfiguration(cfg => cfg
.FromDeclaredQueue(q => q.WithName("q"))
.OnDeclaredExchange(ex => ex.WithName(errorExchange))
)
);

await publisher
.PublishAsync(new BasicMessage { Prop = messageId })
.ConfigureAwait(false);

using (new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token.Register(() => tcs.TrySetCanceled()))
{
await Assert.ThrowsAsync<TaskCanceledException>(async () => await tcs.Task.ConfigureAwait(false)).ConfigureAwait(false);
}
}
}
}

public static class Ex
{
public static Task CustomSubscribeAsync<TMessage>(this IBusClient client, Func<TMessage, Task> subscribeMethod,
Action<ISubscribeContext> context = null, CancellationToken ct = default(CancellationToken))
{
async Task<Acknowledgement> Subscribe(TMessage message)
{
await subscribeMethod(message).ConfigureAwait(false);
return new Ack();
}

return client.SubscribeAsync<TMessage>(Subscribe, context, ct);
}
}
}