Skip to content

Commit

Permalink
Tidy ES rolling snapshots and builder (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Nov 26, 2018
1 parent 838e332 commit 7c1e64d
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 120 deletions.
14 changes: 6 additions & 8 deletions cli/Equinox.Cli/Equinox.Cli.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
<IsTestProject>false</IsTestProject>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>

<!-- workaround for not being able to make Backend and Domain as inlined in a complete way https://github.com/nuget/home/issues/3891#issuecomment-377319939 -->
<TargetsForTfmSpecificBuildOutput>$(TargetsForTfmSpecificBuildOutput);CopyProjectReferencesToPackage</TargetsForTfmSpecificBuildOutput>
</PropertyGroup>

<ItemGroup>
Expand All @@ -21,11 +24,11 @@
</ItemGroup>

<ItemGroup>
<!-- workaround for not being able to make Backend and Domain as inlined in a complete way https://github.com/nuget/home/issues/3891#issuecomment-377319939 -->
<ProjectReference Include="..\..\src\Equinox.MemoryStore\Equinox.MemoryStore.fsproj" PrivateAssets="all" />
<ProjectReference Include="..\..\samples\Store\Backend\Backend.fsproj" PrivateAssets="all" />
<ProjectReference Include="..\..\samples\Store\Domain\Domain.fsproj" PrivateAssets="all" />
<ProjectReference Include="..\..\src\Equinox.EventStore\Equinox.EventStore.fsproj" />
<ProjectReference Include="..\..\src\Equinox.Codec\Equinox.Codec.fsproj" />
</ItemGroup>

<ItemGroup>
Expand All @@ -44,14 +47,9 @@
</ItemGroup>

<!-- workaround for not being able to make Backend and Domain as inlined in a complete way https://github.com/nuget/home/issues/3891#issuecomment-377319939 -->
<PropertyGroup>
<TargetsForTfmSpecificBuildOutput>$(TargetsForTfmSpecificBuildOutput);IncludeNonPackagedDlls</TargetsForTfmSpecificBuildOutput>
</PropertyGroup>
<Target Name="IncludeNonPackagedDlls">
<Target Name="CopyProjectReferencesToPackage" DependsOnTargets="ResolveReferences">
<ItemGroup>
<BuildOutputInPackage Include="$(MSBuildProjectDirectory)/$(OutputPath)Backend.dll" />
<BuildOutputInPackage Include="$(MSBuildProjectDirectory)/$(OutputPath)Domain.dll" />
<BuildOutputInPackage Include="$(MSBuildProjectDirectory)/$(OutputPath)Equinox.MemoryStore.dll" />
<BuildOutputInPackage Include="@(ReferenceCopyLocalPaths->WithMetadataValue('ReferenceSourceTarget', 'ProjectReference'))" />
</ItemGroup>
</Target>

Expand Down
8 changes: 4 additions & 4 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module Test =
clients.[clientIndex % clients.Length]
let selectClient = async { return async { return selectClient() } }
Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest
let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact
let fold, initial, snapshot = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.snapshot
let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect()
let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings)
let codec = genCodec<Domain.Favorites.Events.Event>()
Expand All @@ -102,12 +102,12 @@ module Test =
let c = Caching.Cache("Cli", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
let resolveStream streamName =
let resolveStream =
match store with
| Store.Mem store ->
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName)
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create(streamName)
GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot, ?caching = esCache).Create
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
let sku = Guid.NewGuid() |> SkuId
Expand Down
8 changes: 3 additions & 5 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ module Events =
type ItemWaiveReturnsInfo = { context: ContextInfo; skuId: SkuId; waived: bool }

module Compaction =
let [<Literal>] EventType = "compact/1"
type StateItemInfo = { skuId: SkuId; quantity: int; returnsWaived: bool }
type State = { items: StateItemInfo[] }

type Event =
| [<System.Runtime.Serialization.DataMember(Name = Compaction.EventType)>]
Compacted of Compaction.State
| Compacted of Compaction.State
| ItemAdded of ItemAddInfo
| ItemRemoved of ItemRemoveInfo
| ItemQuantityChanged of ItemQuantityChangeInfo
Expand All @@ -42,8 +40,8 @@ module Folds =
| Events.ItemQuantityChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with quantity = e.quantity } | i -> i))
| Events.ItemWaiveReturnsChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with returnsWaived = e.waived } | i -> i))
let fold state = Seq.fold evolve state
let compact = Events.Compaction.EventType, fun state -> Events.Compacted (State.toSnapshot state)

let isOrigin = function Events.Compacted _ -> true | _ -> false
let snapshot = isOrigin, State.toSnapshot >> Events.Compacted
type Context = { time: System.DateTime; requestId : RequestId }
type Command =
| AddItem of Context * SkuId * quantity: int
Expand Down
7 changes: 3 additions & 4 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ module Events =
type Favorited = { date: System.DateTimeOffset; skuId: SkuId }
type Unfavorited = { skuId: SkuId }
module Compaction =
let [<Literal>] EventType = "compacted/1"
type Compacted = { net: Favorited[] }

type Event =
| [<System.Runtime.Serialization.DataMember(Name = Compaction.EventType)>]
Compacted of Compaction.Compacted
| Compacted of Compaction.Compacted
| Favorited of Favorited
| Unfavorited of Unfavorited
interface TypeShape.UnionContract.IUnionContract
Expand All @@ -37,7 +35,8 @@ module Folds =
let s = InternalState state
for e in events do evolve s e
s.AsState()
let compact = Events.Compaction.EventType, fun state -> Events.Compacted { net = state }
let isOrigin = function Events.Compacted _ -> true | _ -> false
let snapshot = isOrigin, fun state -> Events.Compacted { net = state }

type Command =
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
Expand Down
16 changes: 8 additions & 8 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ open Swensen.Unquote

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

let fold, initial, compact = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.compact
let fold, initial, snapshot = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.snapshot

let createMemoryStore () =
new VolatileStore ()
Expand All @@ -15,10 +15,10 @@ let createServiceMem log store =

let codec = Equinox.EventStore.Integration.EventStoreIntegration.genCodec<Domain.Cart.Events.Event>()

let resolveGesStreamWithCompactionEventType gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create(streamName)
let resolveGesStreamWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial).Create(streamName)
let resolveGesStreamWithRollingSnapshots gateway =
GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Create
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesStreamBuilder(gateway, codec, fold, initial).Create

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
Expand Down Expand Up @@ -53,12 +53,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCompactionSemantics
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithCompactionEventType
let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots
do! act service args
}
12 changes: 6 additions & 6 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ let createServiceMem log store =
Backend.ContactPreferences.Service(log, MemoryStreamBuilder(store, fold, initial).Create)

let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let resolveStreamGesWithCompactionSemantics gateway streamName =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(streamName)
let resolveStreamGesWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(streamName)
let resolveStreamGesWithOptimizedStorageSemantics gateway =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create
let resolveStreamGesWithoutAccessStrategy gateway =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand All @@ -44,12 +44,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithoutCompactionSemantics
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithoutAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithCompactionSemantics
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithOptimizedStorageSemantics
do! act service args
}
5 changes: 3 additions & 2 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ open Swensen.Unquote

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact
let fold, initial, snapshot = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.snapshot

let createMemoryStore () =
new VolatileStore()
Expand All @@ -15,7 +15,8 @@ let createServiceMem log store =

let codec = genCodec<Domain.Favorites.Events.Event>()
let createServiceGes gateway log =
Backend.Favorites.Service(log, GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create)
let resolveStream = GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Create
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down
31 changes: 17 additions & 14 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
module Samples.Store.Integration.LogIntegration

open Domain
open Equinox.Store
open Swensen.Unquote
open System
open System.Collections.Concurrent

module EquinoxEsInterop =
open Equinox.EventStore
Expand Down Expand Up @@ -58,31 +61,31 @@ type SerilogMetricsExtractor(emit : string -> unit) =

let createLoggerWithMetricsExtraction emit =
let capture = SerilogMetricsExtractor emit
createLogger capture, capture
createLogger capture

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

type Tests() =
let act buffer capture (service : Backend.Cart.Service) itemCount context cartId skuId resultTag = async {
let act buffer (service : Backend.Cart.Service) itemCount context cartId skuId resultTag = async {
do! CartIntegration.addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service itemCount
let! state = service.Read cartId
test <@ itemCount = match state with { items = [{ quantity = quantity }] } -> quantity | _ -> failwith "nope" @>

// Even though we've gone over a page, we only need a single read to read the state (plus the one from the execute)
let contains (s : string) (x : string) = x.IndexOf s <> -1
test <@ let reads = buffer |> Seq.filter (fun s -> s |> contains resultTag)
2 = Seq.length reads
&& not (obj.ReferenceEquals(capture, null)) @> }
// Because we're using Access Strategies that enable us to read our state in a single roundtrip...
// (even though we've gone over a page), we only need a single read to read the state (plus the one from the execute)
let contains (s : string) (x : string) = x.Contains s
test <@ let reads = buffer |> Seq.filter (contains resultTag)
2 = Seq.length reads @> }

// Protip: Debug this test to view standard metrics rendering
[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, hooking, extracting and substituting metrics in the logging information`` context cartId skuId = Async.RunSynchronously <| async {
let buffer = ResizeArray<string>()
let ``Can roundtrip against EventStore, hooking, extracting and substituting metrics in the logging information`` context skuId = Async.RunSynchronously <| async {
let batchSize = defaultBatchSize
let (log,capture) = createLoggerWithMetricsExtraction buffer.Add
let buffer = ConcurrentQueue<string>()
let log = createLoggerWithMetricsExtraction buffer.Enqueue
let! conn = connectToLocalEventStoreNode log
let gateway = createGesGateway conn batchSize
let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithCompactionEventType gateway)
let itemCount, cartId = batchSize / 2 + 1, cartId ()
do! act buffer capture service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration"
let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithRollingSnapshots gateway)
let itemCount = batchSize / 2 + 1
let cartId = Guid.NewGuid() |> CartId
do! act buffer service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration"
}
Loading

0 comments on commit 7c1e64d

Please sign in to comment.