Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy ES rolling snapshots and builder #52

Merged
merged 1 commit into from
Nov 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions cli/Equinox.Cli/Equinox.Cli.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
<IsTestProject>false</IsTestProject>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>

<!-- workaround for not being able to make Backend and Domain as inlined in a complete way https://github.com/nuget/home/issues/3891#issuecomment-377319939 -->
<TargetsForTfmSpecificBuildOutput>$(TargetsForTfmSpecificBuildOutput);CopyProjectReferencesToPackage</TargetsForTfmSpecificBuildOutput>
</PropertyGroup>

<ItemGroup>
Expand All @@ -21,11 +24,11 @@
</ItemGroup>

<ItemGroup>
<!-- workaround for not being able to make Backend and Domain as inlined in a complete way https://github.com/nuget/home/issues/3891#issuecomment-377319939 -->
<ProjectReference Include="..\..\src\Equinox.MemoryStore\Equinox.MemoryStore.fsproj" PrivateAssets="all" />
<ProjectReference Include="..\..\samples\Store\Backend\Backend.fsproj" PrivateAssets="all" />
<ProjectReference Include="..\..\samples\Store\Domain\Domain.fsproj" PrivateAssets="all" />
<ProjectReference Include="..\..\src\Equinox.EventStore\Equinox.EventStore.fsproj" />
<ProjectReference Include="..\..\src\Equinox.Codec\Equinox.Codec.fsproj" />
</ItemGroup>

<ItemGroup>
Expand All @@ -44,14 +47,9 @@
</ItemGroup>

<!-- workaround for not being able to make Backend and Domain as inlined in a complete way https://github.com/nuget/home/issues/3891#issuecomment-377319939 -->
<PropertyGroup>
<TargetsForTfmSpecificBuildOutput>$(TargetsForTfmSpecificBuildOutput);IncludeNonPackagedDlls</TargetsForTfmSpecificBuildOutput>
</PropertyGroup>
<Target Name="IncludeNonPackagedDlls">
<Target Name="CopyProjectReferencesToPackage" DependsOnTargets="ResolveReferences">
<ItemGroup>
<BuildOutputInPackage Include="$(MSBuildProjectDirectory)/$(OutputPath)Backend.dll" />
<BuildOutputInPackage Include="$(MSBuildProjectDirectory)/$(OutputPath)Domain.dll" />
<BuildOutputInPackage Include="$(MSBuildProjectDirectory)/$(OutputPath)Equinox.MemoryStore.dll" />
<BuildOutputInPackage Include="@(ReferenceCopyLocalPaths->WithMetadataValue('ReferenceSourceTarget', 'ProjectReference'))" />
</ItemGroup>
</Target>

Expand Down
8 changes: 4 additions & 4 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module Test =
clients.[clientIndex % clients.Length]
let selectClient = async { return async { return selectClient() } }
Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest
let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact
let fold, initial, snapshot = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.snapshot
let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect()
let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings)
let codec = genCodec<Domain.Favorites.Events.Event>()
Expand All @@ -102,12 +102,12 @@ module Test =
let c = Caching.Cache("Cli", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
let resolveStream streamName =
let resolveStream =
match store with
| Store.Mem store ->
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName)
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create(streamName)
GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot, ?caching = esCache).Create
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
let sku = Guid.NewGuid() |> SkuId
Expand Down
8 changes: 3 additions & 5 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ module Events =
type ItemWaiveReturnsInfo = { context: ContextInfo; skuId: SkuId; waived: bool }

module Compaction =
let [<Literal>] EventType = "compact/1"
type StateItemInfo = { skuId: SkuId; quantity: int; returnsWaived: bool }
type State = { items: StateItemInfo[] }

type Event =
| [<System.Runtime.Serialization.DataMember(Name = Compaction.EventType)>]
Compacted of Compaction.State
| Compacted of Compaction.State
| ItemAdded of ItemAddInfo
| ItemRemoved of ItemRemoveInfo
| ItemQuantityChanged of ItemQuantityChangeInfo
Expand All @@ -42,8 +40,8 @@ module Folds =
| Events.ItemQuantityChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with quantity = e.quantity } | i -> i))
| Events.ItemWaiveReturnsChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with returnsWaived = e.waived } | i -> i))
let fold state = Seq.fold evolve state
let compact = Events.Compaction.EventType, fun state -> Events.Compacted (State.toSnapshot state)

let isOrigin = function Events.Compacted _ -> true | _ -> false
let snapshot = isOrigin, State.toSnapshot >> Events.Compacted
type Context = { time: System.DateTime; requestId : RequestId }
type Command =
| AddItem of Context * SkuId * quantity: int
Expand Down
7 changes: 3 additions & 4 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ module Events =
type Favorited = { date: System.DateTimeOffset; skuId: SkuId }
type Unfavorited = { skuId: SkuId }
module Compaction =
let [<Literal>] EventType = "compacted/1"
type Compacted = { net: Favorited[] }

type Event =
| [<System.Runtime.Serialization.DataMember(Name = Compaction.EventType)>]
Compacted of Compaction.Compacted
| Compacted of Compaction.Compacted
| Favorited of Favorited
| Unfavorited of Unfavorited
interface TypeShape.UnionContract.IUnionContract
Expand All @@ -37,7 +35,8 @@ module Folds =
let s = InternalState state
for e in events do evolve s e
s.AsState()
let compact = Events.Compaction.EventType, fun state -> Events.Compacted { net = state }
let isOrigin = function Events.Compacted _ -> true | _ -> false
let snapshot = isOrigin, fun state -> Events.Compacted { net = state }

type Command =
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
Expand Down
16 changes: 8 additions & 8 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ open Swensen.Unquote

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

let fold, initial, compact = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.compact
let fold, initial, snapshot = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.snapshot

let createMemoryStore () =
new VolatileStore ()
Expand All @@ -15,10 +15,10 @@ let createServiceMem log store =

let codec = Equinox.EventStore.Integration.EventStoreIntegration.genCodec<Domain.Cart.Events.Event>()

let resolveGesStreamWithCompactionEventType gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create(streamName)
let resolveGesStreamWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial).Create(streamName)
let resolveGesStreamWithRollingSnapshots gateway =
GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Create
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesStreamBuilder(gateway, codec, fold, initial).Create

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
Expand Down Expand Up @@ -53,12 +53,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCompactionSemantics
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithCompactionEventType
let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots
do! act service args
}
12 changes: 6 additions & 6 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ let createServiceMem log store =
Backend.ContactPreferences.Service(log, MemoryStreamBuilder(store, fold, initial).Create)

let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let resolveStreamGesWithCompactionSemantics gateway streamName =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(streamName)
let resolveStreamGesWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(streamName)
let resolveStreamGesWithOptimizedStorageSemantics gateway =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create
let resolveStreamGesWithoutAccessStrategy gateway =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand All @@ -44,12 +44,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithoutCompactionSemantics
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithoutAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithCompactionSemantics
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithOptimizedStorageSemantics
do! act service args
}
5 changes: 3 additions & 2 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ open Swensen.Unquote

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact
let fold, initial, snapshot = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.snapshot

let createMemoryStore () =
new VolatileStore()
Expand All @@ -15,7 +15,8 @@ let createServiceMem log store =

let codec = genCodec<Domain.Favorites.Events.Event>()
let createServiceGes gateway log =
Backend.Favorites.Service(log, GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create)
let resolveStream = GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Create
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down
31 changes: 17 additions & 14 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
module Samples.Store.Integration.LogIntegration

open Domain
open Equinox.Store
open Swensen.Unquote
open System
open System.Collections.Concurrent

module EquinoxEsInterop =
open Equinox.EventStore
Expand Down Expand Up @@ -58,31 +61,31 @@ type SerilogMetricsExtractor(emit : string -> unit) =

let createLoggerWithMetricsExtraction emit =
let capture = SerilogMetricsExtractor emit
createLogger capture, capture
createLogger capture

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

type Tests() =
let act buffer capture (service : Backend.Cart.Service) itemCount context cartId skuId resultTag = async {
let act buffer (service : Backend.Cart.Service) itemCount context cartId skuId resultTag = async {
do! CartIntegration.addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service itemCount
let! state = service.Read cartId
test <@ itemCount = match state with { items = [{ quantity = quantity }] } -> quantity | _ -> failwith "nope" @>

// Even though we've gone over a page, we only need a single read to read the state (plus the one from the execute)
let contains (s : string) (x : string) = x.IndexOf s <> -1
test <@ let reads = buffer |> Seq.filter (fun s -> s |> contains resultTag)
2 = Seq.length reads
&& not (obj.ReferenceEquals(capture, null)) @> }
// Because we're using Access Strategies that enable us to read our state in a single roundtrip...
// (even though we've gone over a page), we only need a single read to read the state (plus the one from the execute)
let contains (s : string) (x : string) = x.Contains s
test <@ let reads = buffer |> Seq.filter (contains resultTag)
2 = Seq.length reads @> }

// Protip: Debug this test to view standard metrics rendering
[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, hooking, extracting and substituting metrics in the logging information`` context cartId skuId = Async.RunSynchronously <| async {
let buffer = ResizeArray<string>()
let ``Can roundtrip against EventStore, hooking, extracting and substituting metrics in the logging information`` context skuId = Async.RunSynchronously <| async {
let batchSize = defaultBatchSize
let (log,capture) = createLoggerWithMetricsExtraction buffer.Add
let buffer = ConcurrentQueue<string>()
let log = createLoggerWithMetricsExtraction buffer.Enqueue
let! conn = connectToLocalEventStoreNode log
let gateway = createGesGateway conn batchSize
let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithCompactionEventType gateway)
let itemCount, cartId = batchSize / 2 + 1, cartId ()
do! act buffer capture service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration"
let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithRollingSnapshots gateway)
let itemCount = batchSize / 2 + 1
let cartId = Guid.NewGuid() |> CartId
do! act buffer service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration"
}
Loading