Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added E2E in-memory tests #14

Merged
merged 9 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Changelog

All notable changes to this project will be documented in this file.

## [2021-01-11](https://github.com/mizrael/OpenSleigh/pull/14)
### Added
- added in-memory E2E tests

### Fixed
- fixed in-memory state repository, added support for multiple types sharing correlation id
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reflection;
using System.Threading.Channels;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Persistence.InMemory.Messaging;

namespace OpenSleigh.Persistence.InMemory
{
[ExcludeFromCodeCoverage]
public static class InMemorySagaConfiguratorExtensions
{
private static readonly MethodInfo RawRegisterMessageMethod = typeof(InMemorySagaConfiguratorExtensions)
.GetMethod("RegisterMessage", BindingFlags.Static | BindingFlags.NonPublic);

public static ISagaConfigurator<TS, TD> UseInMemoryTransport<TS, TD>(this ISagaConfigurator<TS, TD> sagaConfigurator)
where TS : Saga<TD>
where TD : SagaState
Expand All @@ -30,21 +35,20 @@ public static ISagaConfigurator<TS, TD> UseInMemoryTransport<TS, TD>(this ISagaC

var messageType = i.GetGenericArguments().First();

var rawMethod = typeof(Channel).GetMethod(nameof(Channel.CreateUnbounded), Array.Empty<Type>());
var method = rawMethod.MakeGenericMethod(messageType);
dynamic channel = method.Invoke(null, null);

sagaConfigurator.Services.AddSingleton(typeof(Channel<>).MakeGenericType(messageType), (object)channel);

sagaConfigurator.Services.AddSingleton(typeof(ChannelWriter<>).MakeGenericType(messageType), (object)channel.Writer);

sagaConfigurator.Services.AddSingleton(typeof(ChannelReader<>).MakeGenericType(messageType), (object)channel.Reader);

sagaConfigurator.Services.AddSingleton(typeof(ISubscriber),
typeof(InMemorySubscriber<>).MakeGenericType(messageType));
//TODO: ensure the message was not already registered
var registerMessageMethod = RawRegisterMessageMethod.MakeGenericMethod(messageType);
registerMessageMethod.Invoke(null, new[] {sagaConfigurator.Services});
}

return sagaConfigurator;
}

private static void RegisterMessage<TM>(IServiceCollection services) where TM : IMessage
{
var channel = Channel.CreateUnbounded<TM>();
services.AddSingleton<ChannelReader<TM>>(channel.Reader)
.AddSingleton<ChannelWriter<TM>>(channel.Writer)
.AddSingleton<ISubscriber, InMemorySubscriber<TM>>();
}
}
}
32 changes: 19 additions & 13 deletions src/OpenSleigh.Persistence.InMemory/InMemorySagaStateRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,23 @@
using OpenSleigh.Core.Persistence;
using System;
using System.Collections.Concurrent;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;

namespace OpenSleigh.Persistence.InMemory
{
public class InMemorySagaStateRepository : ISagaStateRepository
{
private readonly ConcurrentDictionary<Guid, (SagaState state, Guid? lockId)> _items;
private static readonly SemaphoreSlim _semaphore = new (1, 1);

public InMemorySagaStateRepository()
{
_items = new ConcurrentDictionary<Guid, (SagaState state, Guid? lockId)>();
}
private readonly ConcurrentDictionary<string, (SagaState state, Guid? lockId)> _items = new ();
private static readonly SemaphoreSlim ReleaseSemaphore = new (1, 1);

public Task<(TD state, Guid lockId)> LockAsync<TD>(Guid correlationId, TD newState = default, CancellationToken cancellationToken = default)
where TD : SagaState
{
var (state, lockId) = _items.AddOrUpdate(correlationId,
var key = BuildKey<TD>(correlationId);

var (state, lockId) = _items.AddOrUpdate(key,
k => (newState, Guid.NewGuid()),
(k, v) =>
{
Expand All @@ -45,21 +43,29 @@ public Task ReleaseLockAsync<TD>(TD state, Guid lockId, ITransaction transaction
private async Task ReleaseLockAsyncCore<TD>(TD state, Guid lockId, CancellationToken cancellationToken)
where TD : SagaState
{
await _semaphore.WaitAsync(cancellationToken);
await ReleaseSemaphore.WaitAsync(cancellationToken);

var key = BuildKey<TD>(state.Id);

try
{
if (!_items.ContainsKey(state.Id))
if (!_items.ContainsKey(key))
throw new ArgumentOutOfRangeException(nameof(state), $"invalid state correlationId '{state.Id}'");
var stored = _items[state.Id];
var stored = _items[key];
if (stored.lockId != lockId)
throw new LockException($"unable to release lock on saga state '{state.Id}'");
_items[state.Id] = (state, null);
_items[key] = (state, null);
}
finally
{
_semaphore.Release();
ReleaseSemaphore.Release();
}
}

private static string BuildKey<TD>(Guid correlationId)
{
var stateType = typeof(TD);
return $"{correlationId}|{stateType.FullName}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ public InMemoryPublisher(IChannelFactory channelFactory)
{
_channelFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
}



public Task PublishAsync(IMessage message, CancellationToken cancellationToken = default)
{
if (message == null)
Expand Down
4 changes: 2 additions & 2 deletions tests/OpenSleigh.Core.Tests/OpenSleigh.Core.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="coverlet.msbuild" Version="2.9.0">
<PackageReference Include="coverlet.msbuild" Version="3.0.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand All @@ -20,7 +20,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="1.3.0">
<PackageReference Include="coverlet.collector" Version="3.0.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System;
using System.ComponentModel;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Messaging;
using OpenSleigh.Persistence.InMemory.Tests.E2E.Sagas;
using Xunit;

namespace OpenSleigh.Persistence.InMemory.Tests.E2E
{
[Category("E2E")]
[Trait("Category", "E2E")]
public class ParentChildScenario
{
[Fact]
public async Task run_parent_child_scenario()
{
var hostBuilder = CreateHostBuilder();

var message = new StartParentSaga(Guid.NewGuid(), Guid.NewGuid());

var received = false;
var tokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(2));

Action<ParentSagaCompleted> onMessage = msg =>
{
received = true;
tokenSource.Cancel();
};

hostBuilder.ConfigureServices((ctx, services) =>
{
services.AddSingleton(onMessage);
});

var host = hostBuilder.Build();

using var scope = host.Services.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();

await Task.WhenAll(new[]
{
host.RunAsync(token: tokenSource.Token),
bus.PublishAsync(message, tokenSource.Token)
});

received.Should().BeTrue();
}

static IHostBuilder CreateHostBuilder() =>
Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddOpenSleigh(cfg =>
{
cfg.UseInMemoryTransport()
.UseInMemoryPersistence();

cfg.AddSaga<ParentSaga, ParentSagaState>()
.UseStateFactory(msg => new ParentSagaState(msg.CorrelationId))
.UseInMemoryTransport();

cfg.AddSaga<ChildSaga, ChildSagaState>()
.UseStateFactory(msg => new ChildSagaState(msg.CorrelationId))
.UseInMemoryTransport();
});
});
}

}
38 changes: 38 additions & 0 deletions tests/OpenSleigh.Persistence.InMemory.Tests/E2E/Sagas/ChildSaga.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Persistence.InMemory.Tests.E2E.Sagas
{
public class ChildSagaState : SagaState
{
public ChildSagaState(Guid id) : base(id) { }
}

public record StartChildSaga(Guid Id, Guid CorrelationId) : ICommand { }

public record ProcessChildSaga(Guid Id, Guid CorrelationId) : ICommand { }

public record ChildSagaCompleted(Guid Id, Guid CorrelationId) : IEvent { }

public class ChildSaga :
Saga<ChildSagaState>,
IStartedBy<StartChildSaga>,
IHandleMessage<ProcessChildSaga>
{

public async Task HandleAsync(IMessageContext<StartChildSaga> context, CancellationToken cancellationToken = default)
{
var message = new ProcessChildSaga(Guid.NewGuid(), context.Message.CorrelationId);
await this.Bus.PublishAsync(message, cancellationToken);
}

public async Task HandleAsync(IMessageContext<ProcessChildSaga> context, CancellationToken cancellationToken = default)
{
var completedEvent = new ChildSagaCompleted(Guid.NewGuid(), context.Message.CorrelationId);
await this.Bus.PublishAsync(completedEvent, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Persistence.InMemory.Tests.E2E.Sagas
{
public class ParentSagaState : SagaState
{
public ParentSagaState(Guid id) : base(id) { }
}

public record StartParentSaga(Guid Id, Guid CorrelationId) : ICommand { }

public record ProcessParentSaga(Guid Id, Guid CorrelationId) : ICommand { }

public record ParentSagaCompleted(Guid Id, Guid CorrelationId) : IEvent { }

public class ParentSaga :
Saga<ParentSagaState>,
IStartedBy<StartParentSaga>,
IHandleMessage<ProcessParentSaga>,
IHandleMessage<ChildSagaCompleted>,
IHandleMessage<ParentSagaCompleted>
{
private readonly Action<ParentSagaCompleted> _onCompleted;


public ParentSaga(Action<ParentSagaCompleted> onCompleted)
{
_onCompleted = onCompleted ?? throw new ArgumentNullException(nameof(onCompleted));
}

public async Task HandleAsync(IMessageContext<StartParentSaga> context, CancellationToken cancellationToken = default)
{
var message = new ProcessParentSaga(Guid.NewGuid(), context.Message.CorrelationId);
await this.Bus.PublishAsync(message, cancellationToken);
}

public async Task HandleAsync(IMessageContext<ProcessParentSaga> context, CancellationToken cancellationToken = default)
{
var message = new StartChildSaga(Guid.NewGuid(), context.Message.CorrelationId);
await this.Bus.PublishAsync(message, cancellationToken);
}

public async Task HandleAsync(IMessageContext<ChildSagaCompleted> context, CancellationToken cancellationToken = default)
{
var message = new ParentSagaCompleted(Guid.NewGuid(), context.Message.CorrelationId);
await this.Bus.PublishAsync(message, cancellationToken);
}

public Task HandleAsync(IMessageContext<ParentSagaCompleted> context, CancellationToken cancellationToken = default)
{
this.State.MarkAsCompleted();

_onCompleted?.Invoke(context.Message);

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Persistence.InMemory.Tests.E2E.Sagas
{
public class SimpleSagaState : SagaState
{
public SimpleSagaState(Guid id) : base(id)
{
}
}

public record StartSimpleSaga(Guid Id, Guid CorrelationId) : ICommand;

public class SimpleSaga : Saga<SimpleSagaState>, IStartedBy<StartSimpleSaga>
{
private readonly Action<StartSimpleSaga> _onStart;

public SimpleSaga(Action<StartSimpleSaga> onStart)
{
_onStart = onStart;
}

public Task HandleAsync(IMessageContext<StartSimpleSaga> context, CancellationToken cancellationToken = default)
{
_onStart?.Invoke(context.Message);
return Task.CompletedTask;
}
}
}
Loading