Skip to content

Commit

Permalink
Merge pull request #89 from mizrael/outbox
Browse files Browse the repository at this point in the history
improved outbox management
  • Loading branch information
mizrael authored Aug 4, 2023
2 parents ed03cf9 + 0420ed1 commit c53c052
Show file tree
Hide file tree
Showing 52 changed files with 357 additions and 240 deletions.
3 changes: 1 addition & 2 deletions samples/OpenSleigh.Samples.Sample1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ static IHostBuilder CreateHostBuilder(string[] args) =>
cfg.UseInMemoryTransport()
.UseInMemoryPersistence()
.AddSaga<SagaWithoutState>()
.AddSaga<SagaWithState, MySagaState>()
;
.AddSaga<SagaWithState, MySagaState>();
});
});

Expand Down
4 changes: 0 additions & 4 deletions samples/OpenSleigh.Samples.Sample1/SagaWithState.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh;
using OpenSleigh.Transport;
using OpenSleigh.Utils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
using System.Threading;
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Samples.Sample2.Common.Messages;
using OpenSleigh.Transport;

namespace OpenSleigh.Samples.Sample2.API.Controllers
{
Expand All @@ -24,11 +24,11 @@ public async Task<IActionResult> StartSimpleSaga(bool isSimple = true, Cancellat
IMessage message = isSimple ? new StartSimpleSaga(Guid.NewGuid(), Guid.NewGuid()) :
new StartParentSaga(Guid.NewGuid(), Guid.NewGuid());

await _bus.PublishAsync(message, cancellationToken);
var receipt = await _bus.PublishAsync(message, cancellationToken);

return Accepted(new
{
SagaId = message.CorrelationId
SagaId = receipt.CorrelationId
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public void ConfigureServices(IServiceCollection services)
mongoSection["DbName"]);

var rabbitSection = Configuration.GetSection("Rabbit");
var rabbitCfg = new RabbitConfiguration(rabbitSection["HostName"],
var rabbitCfg = new RabbitConfiguration(
rabbitSection["HostName"],
rabbitSection["VirtualHost"],
rabbitSection["UserName"],
rabbitSection["Password"]);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{
"Mongo": {
"ConnectionString": "mongodb://root:password@127.0.0.1:27017",
"ConnectionString": "mongodb://127.0.0.1:27017",
"DbName": "OpenSleighSample2_API"
},
"Rabbit": {
"Hostname": "127.0.0.1",
"VirtualHost": "/opensleigh-sample2",
"UserName": "guest",
"Password": "guest"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.DependencyInjection;
using OpenSleigh.Persistence.Mongo;
using OpenSleigh.Samples.Sample2.Common.Messages;
using OpenSleigh.Samples.Sample2.Worker.Sagas;
using OpenSleigh.Transport.RabbitMQ;
using System.Threading.Tasks;

namespace OpenSleigh.Samples.Sample2.Worker
{
Expand All @@ -31,7 +30,9 @@ static IHostBuilder CreateHostBuilder(string[] args) =>
.AddOpenSleigh(cfg =>
{
var rabbitSection = hostContext.Configuration.GetSection("Rabbit");
var rabbitCfg = new RabbitConfiguration(rabbitSection["HostName"],
var rabbitCfg = new RabbitConfiguration(
rabbitSection["HostName"],
rabbitSection["VirtualHost"],
rabbitSection["UserName"],
rabbitSection["Password"]);

Expand All @@ -41,27 +42,12 @@ static IHostBuilder CreateHostBuilder(string[] args) =>
MongoSagaStateRepositoryOptions.Default,
MongoOutboxRepositoryOptions.Default);

cfg.UseRabbitMQTransport(rabbitCfg, builder =>
{
// OPTIONAL
// using a custom naming policy allows us to define the names for exchanges, queues and routing keys.
// for example, we could have a single exchange bound to multiple queues.
builder.UseMessageNamingPolicy<StartChildSaga>(() => new QueueReferences("child", "child.start", "child.start", "child.dead", "child.dead.start"));
builder.UseMessageNamingPolicy<ProcessChildSaga>(() => new QueueReferences("child", "child.process", "child.process", "child.dead", "child.dead.process"));
})
.UseMongoPersistence(mongoCfg);
cfg.UseRabbitMQTransport(rabbitCfg)
.UseMongoPersistence(mongoCfg);

cfg.AddSaga<SimpleSaga, SimpleSagaState>()
.UseStateFactory<StartSimpleSaga>(msg => new SimpleSagaState(msg.CorrelationId))
.UseRabbitMQTransport();

cfg.AddSaga<ParentSaga, ParentSagaState>()
.UseStateFactory<StartParentSaga>(msg => new ParentSagaState(msg.CorrelationId))
.UseRabbitMQTransport();

cfg.AddSaga<ChildSaga, ChildSagaState>()
.UseStateFactory<StartChildSaga>(msg => new ChildSagaState(msg.CorrelationId))
.UseRabbitMQTransport();
.AddSaga<ParentSaga, ParentSagaState>()
.AddSaga<ChildSaga, ChildSagaState>();
});
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"profiles": {
"OpenSleigh.Samples.Console": {
"OpenSleigh.Samples.Sample2.Worker": {
"commandName": "Project",
"environmentVariables": {
"DOTNET_ENVIRONMENT": "Development"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
using Microsoft.Extensions.Logging;
using OpenSleigh.Transport;
using OpenSleigh.Utils;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Samples.Sample2.Worker.Sagas
{
public record ChildSagaState : SagaState{
public ChildSagaState(Guid id) : base(id){}
}
public record ChildSagaState;

public record StartChildSaga(Guid Id, Guid CorrelationId) : ICommand { }
public record StartChildSaga(Guid Id, Guid CorrelationId) : IMessage { }

public record ProcessChildSaga(Guid Id, Guid CorrelationId) : ICommand { }
public record ProcessChildSaga(Guid Id, Guid CorrelationId) : IMessage { }

public record ChildSagaCompleted(Guid Id, Guid CorrelationId) : IEvent { }
public record ChildSagaCompleted(Guid Id, Guid CorrelationId) : IMessage { }

public class ChildSaga :
Saga<ChildSagaState>,
Expand All @@ -26,12 +24,15 @@ public class ChildSaga :

private readonly Random _random = new Random();

public ChildSaga(ILogger<ChildSaga> logger, ChildSagaState state) : base(state)
public ChildSaga(
ILogger<ChildSaga> logger,
ISagaExecutionContext<ChildSagaState> context,
ISerializer serializer) : base(context, serializer)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task HandleAsync(IMessageContext<StartChildSaga> context, CancellationToken cancellationToken = default)
public async ValueTask HandleAsync(IMessageContext<StartChildSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"starting child saga '{context.Message.CorrelationId}'...");

Expand All @@ -41,7 +42,7 @@ public async Task HandleAsync(IMessageContext<StartChildSaga> context, Cancellat
this.Publish(message);
}

public async Task HandleAsync(IMessageContext<ProcessChildSaga> context, CancellationToken cancellationToken = default)
public async ValueTask HandleAsync(IMessageContext<ProcessChildSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"processing child saga '{context.Message.CorrelationId}'...");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
using Microsoft.Extensions.Logging;
using OpenSleigh.Samples.Sample2.Common.Messages;
using OpenSleigh.Transport;
using OpenSleigh.Utils;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Samples.Sample2.Common.Messages;

namespace OpenSleigh.Samples.Sample2.Worker.Sagas
{
public record ParentSagaState : SagaState{
public ParentSagaState(Guid id) : base(id){}
}
public record ParentSagaState;

public record ProcessParentSaga(Guid Id, Guid CorrelationId) : ICommand { }
public record ProcessParentSaga(Guid Id, Guid CorrelationId) : IMessage { }

public record ParentSagaCompleted(Guid Id, Guid CorrelationId) : IEvent { }
public record ParentSagaCompleted(Guid Id, Guid CorrelationId) : IMessage { }

public class ParentSaga :
Saga<ParentSagaState>,
Expand All @@ -27,28 +25,30 @@ public class ParentSaga :

private readonly Random _random = new Random();

public ParentSaga(ILogger<ParentSaga> logger, ParentSagaState state) : base(state)
public ParentSaga(ILogger<ParentSaga> logger,
ISagaExecutionContext<ParentSagaState> context,
ISerializer serializer) : base(context, serializer)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task HandleAsync(IMessageContext<StartParentSaga> context, CancellationToken cancellationToken = default)
public async ValueTask HandleAsync(IMessageContext<StartParentSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"starting parent saga '{context.Message.CorrelationId}'...");

var message = new ProcessParentSaga(Guid.NewGuid(), context.Message.CorrelationId);
this.Publish(message);
}

public async Task HandleAsync(IMessageContext<ProcessParentSaga> context, CancellationToken cancellationToken = default)
public async ValueTask HandleAsync(IMessageContext<ProcessParentSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"starting child saga from parent saga '{context.Message.CorrelationId}'...");

var message = new StartChildSaga(Guid.NewGuid(), context.Message.CorrelationId);
this.Publish(message);
}

public async Task HandleAsync(IMessageContext<ChildSagaCompleted> context, CancellationToken cancellationToken = default)
public async ValueTask HandleAsync(IMessageContext<ChildSagaCompleted> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"child saga completed, finalizing parent saga '{context.Message.CorrelationId}'...");

Expand All @@ -58,11 +58,11 @@ public async Task HandleAsync(IMessageContext<ChildSagaCompleted> context, Cance
this.Publish(message);
}

public Task HandleAsync(IMessageContext<ParentSagaCompleted> context, CancellationToken cancellationToken = default)
public ValueTask HandleAsync(IMessageContext<ParentSagaCompleted> context, CancellationToken cancellationToken = default)
{
this.State.MarkAsCompleted();
this.Context.MarkAsCompleted();
_logger.LogInformation($"parent saga '{context.Message.CorrelationId}' completed!");
return Task.CompletedTask;
return ValueTask.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Samples.Sample2.Common.Messages;
using OpenSleigh.Transport;
using OpenSleigh.Utils;

namespace OpenSleigh.Samples.Sample2.Worker.Sagas
{
public record SimpleSagaState : SagaState{
public SimpleSagaState(Guid id) : base(id){}
}
public record SimpleSagaState;

public record ProcessSimpleSaga(Guid Id, Guid CorrelationId) : ICommand { }
public record ProcessSimpleSaga(Guid Id, Guid CorrelationId) : IMessage { }

public record SimpleSagaCompleted(Guid Id, Guid CorrelationId) : IEvent { }
public record SimpleSagaCompleted(Guid Id, Guid CorrelationId) : IMessage { }

public class SimpleSaga :
Saga<SimpleSagaState>,
Expand All @@ -24,32 +22,35 @@ public class SimpleSaga :
{
private readonly ILogger<SimpleSaga> _logger;

public SimpleSaga(ILogger<SimpleSaga> logger, SimpleSagaState state): base(state)
public SimpleSaga(
ILogger<SimpleSaga> logger,
ISagaExecutionContext<SimpleSagaState> context,
ISerializer serializer) : base(context, serializer)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task HandleAsync(IMessageContext<StartSimpleSaga> context, CancellationToken cancellationToken = default)
public async ValueTask HandleAsync(IMessageContext<StartSimpleSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"starting saga '{context.Message.CorrelationId}'...");

var message = new ProcessSimpleSaga(Guid.NewGuid(), context.Message.CorrelationId);
this.Publish(message);
}

public async Task HandleAsync(IMessageContext<ProcessSimpleSaga> context, CancellationToken cancellationToken = default)
public async ValueTask HandleAsync(IMessageContext<ProcessSimpleSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"processing saga '{context.Message.CorrelationId}'...");

var message = new SimpleSagaCompleted(Guid.NewGuid(), context.Message.CorrelationId);
this.Publish(message);
}

public Task HandleAsync(IMessageContext<SimpleSagaCompleted> context, CancellationToken cancellationToken = default)
public ValueTask HandleAsync(IMessageContext<SimpleSagaCompleted> context, CancellationToken cancellationToken = default)
{
this.State.MarkAsCompleted();
this.Context.MarkAsCompleted();
_logger.LogInformation($"saga '{context.Message.CorrelationId}' completed!");
return Task.CompletedTask;
return ValueTask.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{
"Mongo": {
"ConnectionString": "mongodb://root:password@127.0.0.1:27017",
"ConnectionString": "mongodb://127.0.0.1:27017",
"DbName": "OpenSleighSample2_Worker"
},
"Rabbit": {
"Hostname": "localhost",
"Hostname": "127.0.0.1",
"VirtualHost": "/opensleigh-sample2",
"UserName": "guest",
"Password": "guest"
},
Expand Down
12 changes: 2 additions & 10 deletions samples/OpenSleigh.Samples.Sample2/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,15 @@ services:
image: mongo:latest
container_name: openSleigh.sample2.infrastructure.mongodb
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: password
MONGO_REPLICA_SET_NAME: opensleigh
healthcheck:
test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u root -p password --quiet) -eq 1
interval: 10s
start_period: 30s
ports:
- 27017:27017
- 27017:27017

openSleigh.sample2.infrastructure.rabbitmq:
image: rabbitmq:3-management-alpine
container_name: openSleigh.sample2.infrastructure.rabbitmq
restart: always
environment:
RABBITMQ_DEFAULT_VHOST: "/"
RABBITMQ_DEFAULT_VHOST: "/opensleigh-sample2"
ports:
- "15671:15671"
- "15672:15672"
Expand Down
Loading

0 comments on commit c53c052

Please sign in to comment.