Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web and EF Core Example #2002

Merged
merged 19 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 95 additions & 76 deletions .idea/.idea.Brighter/.idea/httpRequests/http-requests-log.http

Large diffs are not rendered by default.

22 changes: 20 additions & 2 deletions samples/WebAPI_EFCore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ We 'depend inwards' i.e. **GreetingsAdapters -> GreetingsPorts -> GreetingsEntit

The assemblies migrations: **Greetings_MySqlMigrations** and **Greetings_SqliteMigrations** hold generated code to configure the Db. Consider this adapter layer code - the use of separate modules allows us to switch migration per enviroment.

### GreetingsWatcher
### SalutationAnalytics

This just listens for a GreetingMade message and dumps it to the console. It demonstrates listening to a queue. In this case it is too trivial for a ports & adapters separation, but that approach could be used if there was an entity layer and domain logic.
This listens for a GreetingMade message and stores it. It demonstrates listening to a queue. It also demonstrates the use of scopes provided by Brighter's ServiceActivator, which work with EFCore. These support writing to an Outbox when this component raises a message in turn.

We don't listen to that message, and without any listeners the RMQ will drop the message we send, as it has no queues to give it to. We don't listen because we would just be repeating what we have shown here.

We also add an Inbox here. The Inbox can be used to de-duplicate messages. In messaging the guarantee is 'at least once' if you use a technique such as an Outbox to ensure sending. This means we may receive a message twice. We either need, as in this case, to use an Inbox to de-duplicate, or we need to be idempotent such that receiving the message multiple times would result in the same outcome.


## Build and Deploy
Expand All @@ -54,6 +58,20 @@ We provide a docker compose file to allow you to run a 'Production' environment

and so on

### Sqlite Database Read-Only Errors

A Sqlite database will only have permissions for the process that created it. This can result in you receiving read-only errors between invocations of the sample. You either need to alter the permissions on your Db, or delete it between runs.

Maintainers, please don't check the Sqlite files into source control.

### Queue Creation and Dropped Messages

Queues are created by consumers. This is because publishers don't know who consumes them, and thus don't create their queues. This means that if you run a producer, such as GreetingsWeb, and use tests.http to push in greetings, although a message will be published to RMQ, it won't have a queue to be delivered to and will be dropped, unless you have first run the SalutationAnalytics worker to create the queue.

Generally, the rule of thumb is: start the consumer and *then* start the producer.

You can spot this by looking in the [RMQ Management console](http://localhost:1567) and noting that no queue is bound to the routing key in th exchange.

## Tests

We provide a tests.http file (supported by both JetBrains Rider and VS Code with the REST Client plugin) to allow you to test operations.
Expand Down
5 changes: 4 additions & 1 deletion samples/WebAPI_EFCore/SalutationAnalytics/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
using Paramore.Brighter.MessagingGateway.RMQ;
using Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection;
using Paramore.Brighter.ServiceActivator.Extensions.Hosting;
using Polly.Registry;
using SalutationAnalytics.Database;
using SalutationPorts.EntityGateway;
using SalutationPorts.Policies;
using SalutationPorts.Requests;

namespace SalutationAnalytics
Expand Down Expand Up @@ -82,11 +84,12 @@ private static IHostBuilder CreateHostBuilder(string[] args) =>
options.HandlerLifetime = ServiceLifetime.Scoped;
options.MapperLifetime = ServiceLifetime.Singleton;
options.CommandProcessorLifetime = ServiceLifetime.Scoped;
options.PolicyRegistry = new SalutationPolicy();
})
.UseExternalBus(new RmqProducerRegistryFactory(
new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@rabbitmq:5672")),
AmpqUri = new AmqpUriSpecification(new Uri($"amqp://guest:guest@{host}:5672")),
Exchange = new Exchange("paramore.brighter.exchange"),
},
new RmqPublication[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Paramore.Brighter;
using Paramore.Brighter.Inbox.Attributes;
using Paramore.Brighter.Logging.Attributes;
using Paramore.Brighter.Policies.Attributes;
using SalutationEntities;
using SalutationPorts.EntityGateway;
using SalutationPorts.Requests;
Expand All @@ -22,8 +23,9 @@ public GreetingMadeHandlerAsync(SalutationsEntityGateway uow, IAmACommandProcess
_postBox = postBox;
}

[UseInboxAsync(step:0, contextKey: typeof(GreetingMadeHandlerAsync), onceOnly: true )]
//[UseInboxAsync(step:0, contextKey: typeof(GreetingMadeHandlerAsync), onceOnly: true )] -- we are using a global inbox, so need to be explicit!!
[RequestLoggingAsync(step: 1, timing: HandlerTiming.Before)]
[UsePolicyAsync(step:2, policy: Policies.Retry.EXPONENTIAL_RETRYPOLICYASYNC)]
public override async Task<GreetingMade> HandleAsync(GreetingMade @event, CancellationToken cancellationToken = default(CancellationToken))
{
var posts = new List<Guid>();
Expand All @@ -48,6 +50,8 @@ public GreetingMadeHandlerAsync(SalutationsEntityGateway uow, IAmACommandProcess
await tx.RollbackAsync(cancellationToken);

Console.WriteLine("Salutation analytical record not saved");

throw;
}

await _postBox.ClearOutboxAsync(posts, cancellationToken: cancellationToken);
Expand Down
27 changes: 27 additions & 0 deletions samples/WebAPI_EFCore/SalutationPorts/Policies/Retry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using Polly;
using Polly.Contrib.WaitAndRetry;
using Polly.Retry;

namespace SalutationPorts.Policies
{
public static class Retry
{
public const string RETRYPOLICYASYNC = "SalutationPorts.Policies.RetryPolicyAsync";
public const string EXPONENTIAL_RETRYPOLICYASYNC = "SalutationPorts.Policies.ExponenttialRetryPolicyAsync";

public static AsyncRetryPolicy GetSimpleHandlerRetryPolicy()
{
return Policy.Handle<Exception>().WaitAndRetryAsync(new[]
{
TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(150)
});
}

public static AsyncRetryPolicy GetExponentialHandlerRetryPolicy()
{
var delay = Backoff.ExponentialBackoff(TimeSpan.FromMilliseconds(100), retryCount: 5, fastFirst: true);
return Policy.Handle<Exception>().WaitAndRetryAsync(delay);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Paramore.Brighter;

namespace SalutationPorts.Policies
{
public class SalutationPolicy : DefaultPolicy
{
public SalutationPolicy()
{
AddSalutationPolicies();
}

private void AddSalutationPolicies()
{
Add(Retry.RETRYPOLICYASYNC, Retry.GetSimpleHandlerRetryPolicy());
Add(Retry.EXPONENTIAL_RETRYPOLICYASYNC, Retry.GetExponentialHandlerRetryPolicy());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.12" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.12" />
<PackageReference Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected virtual void ConnectToBroker(OnMissingChannel makeExchange)
connection.ConnectionBlocked += HandleBlocked;
connection.ConnectionUnblocked += HandleUnBlocked;

s_logger.LogDebug("RMQMessagingGateway: Opening channel to Rabbit MQ on subscription {URL}",
s_logger.LogDebug("RMQMessagingGateway: Opening channel to Rabbit MQ on {URL}",
Connection.AmpqUri.GetSanitizedUri());

Channel = connection.CreateModel();
Expand Down