Skip to content

Commit

Permalink
Remove Cosmos RollingSnapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 22, 2018
1 parent c9625dd commit 2fa22c4
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 564 deletions.
17 changes: 7 additions & 10 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ module Test =
clients.[clientIndex % clients.Length]
let selectClient = async { return async { return selectClient() } }
Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest
let fold, initial, compact, index = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact, Domain.Favorites.Folds.index
let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact
let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect()
let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings)
let codec = genCodec<Domain.Favorites.Events.Event>()
Expand All @@ -150,19 +150,16 @@ module Test =
let c = Equinox.Cosmos.Builder.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.Builder.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
let resolveStream streamName =
let resolveStream =
match store with
| Store.Mem store ->
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName)
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create(streamName)
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create
| Store.Cosmos (gateway, databaseId, connectionId) ->
if targs.Contains Indexed then
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.IndexedSearch index, ?caching = eqxCache)
.Create(databaseId, connectionId, streamName)
else
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact, ?caching = eqxCache)
.Create(databaseId, connectionId, streamName)
let store = EqxStore(gateway, EqxCollections(databaseId, connectionId))
if targs.Contains Indexed then EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection compact, ?caching = eqxCache).Create
else EqxStreamBuilder(store, codec, fold, initial, ?access=None, ?caching = eqxCache).Create
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
let sku = Guid.NewGuid() |> SkuId
Expand Down
1 change: 0 additions & 1 deletion samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ module Folds =
| Events.ItemWaiveReturnsChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with returnsWaived = e.waived } | i -> i))
let fold state = Seq.fold evolve state
let compact = Events.Compaction.EventType, fun state -> Events.Compacted (State.toSnapshot state)
let index = (fun et -> et = Events.Compaction.EventType), fun state -> seq [ yield Events.Compacted (State.toSnapshot state) ]
type Context = { time: System.DateTime; requestId : RequestId }
type Command =
| AddItem of Context * SkuId * quantity: int
Expand Down
4 changes: 3 additions & 1 deletion samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ module Events =
type Preferences = { manyPromotions : bool; littlePromotions : bool; productReview : bool; quickSurveys : bool }
type Value = { email : string; preferences : Preferences }

let [<Literal>] EventTypeName = "contactPreferencesChanged"
type Event =
| [<System.Runtime.Serialization.DataMember(Name = "contactPreferencesChanged")>]Updated of Value
| [<System.Runtime.Serialization.DataMember(Name = EventTypeName)>]Updated of Value
interface TypeShape.UnionContract.IUnionContract
let eventTypeNames = System.Collections.Generic.HashSet<string>([EventTypeName])

module Folds =
type State = Events.Preferences
Expand Down
1 change: 0 additions & 1 deletion samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ module Folds =
for e in events do evolve s e
s.AsState()
let compact = Events.Compaction.EventType, fun state -> Events.Compacted { net = state }
let index = (fun x -> x = Events.Compaction.EventType), fun state -> seq [ Events.Compacted { net = state } ]

type Command =
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
Expand Down
30 changes: 15 additions & 15 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ let createServiceMem log store =

let codec = Equinox.EventStore.Integration.EventStoreIntegration.genCodec<Domain.Cart.Events.Event>()

let resolveGesStreamWithCompactionEventType gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create(streamName)
let resolveGesStreamWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial).Create(streamName)
let resolveGesStreamWithRollingSnapshots gateway =
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesStreamBuilder(gateway, codec, fold, initial).Create

let resolveEqxStreamWithCompactionEventType gateway (StoreCollection args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway (StoreCollection args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveEqxStreamWithProjection gateway =
EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection compact).Create
let resolveEqxStreamWithoutCustomAccessStrategy gateway =
EqxStreamBuilder(gateway, codec, fold, initial).Create

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
Expand Down Expand Up @@ -60,24 +60,24 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCompactionSemantics
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithCompactionEventType
let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithoutCompactionSemantics
let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithCompactionEventType
let ``Can roundtrip against Cosmos, correctly folding the events with With Projection`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithProjection
do! act service args
}
20 changes: 10 additions & 10 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ let createServiceMem log store =
Backend.ContactPreferences.Service(log, MemoryStreamBuilder(store, fold, initial).Create)

let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let resolveStreamGesWithCompactionSemantics gateway streamName =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(streamName)
let resolveStreamGesWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(streamName)
let resolveStreamGesWithCompactionSemantics gateway =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create
let resolveStreamGesWithoutCompactionSemantics gateway =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

let resolveStreamEqxWithCompactionSemantics gateway (StoreCollection args) =
EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway (StoreCollection args) =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args)
let resolveStreamEqxWithCompactionSemantics gateway =
EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType Domain.ContactPreferences.Events.eventTypeNames).Create
let resolveStreamEqxWithoutCompactionSemantics gateway =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down Expand Up @@ -63,12 +63,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithoutCompactionSemantics
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithCompactionSemantics
do! act service args
}
4 changes: 2 additions & 2 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ let createServiceGes gateway log =
Backend.Favorites.Service(log, GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create)

let createServiceEqx gateway log =
let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection compact).Create
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
Expand Down Expand Up @@ -57,7 +57,7 @@ type Tests(testOutputHelper) =
let ``Can roundtrip against Cosmos, correctly folding the events`` args = Async.RunSynchronously <| async {
let log = createLog ()
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createEqxGateway conn defaultBatchSize
let gateway = createEqxStore conn defaultBatchSize
let service = createServiceEqx gateway log
do! act service args
}
11 changes: 6 additions & 5 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ type Tests() =
do! CartIntegration.addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service itemCount
let! state = service.Read cartId
test <@ itemCount = match state with { items = [{ quantity = quantity }] } -> quantity | _ -> failwith "nope" @>
// Even though we've gone over a page, we only need a single read to read the state (plus the one from the execute)
// Because we're using Access Strategies that enable us to read our state in a single roundtrip...
// (even though we've gone over a page), we only need a single read to read the state (plus the one from the execute)
let contains (s : string) (x : string) = x.Contains s
test <@ let reads = buffer |> Seq.filter (fun s -> s |> contains resultTag)
test <@ let reads = buffer |> Seq.filter (contains resultTag)
2 = Seq.length reads @> }

// Protip: Debug this test to view standard metrics rendering
Expand All @@ -109,7 +110,7 @@ type Tests() =
let log = createLoggerWithMetricsExtraction buffer.Enqueue
let! conn = connectToLocalEventStoreNode log
let gateway = createGesGateway conn batchSize
let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithCompactionEventType gateway)
let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithRollingSnapshots gateway)
let itemCount = batchSize / 2 + 1
let cartId = Domain.Infrastructure.CartId(System.Guid.NewGuid())
do! act buffer service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration"
Expand All @@ -121,8 +122,8 @@ type Tests() =
let buffer = ConcurrentQueue<string>()
let log = createLoggerWithMetricsExtraction buffer.Enqueue
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createEqxGateway conn batchSize
let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithCompactionEventType gateway)
let gateway = createEqxStore conn batchSize
let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithProjection gateway)
let itemCount = batchSize / 2 + 1
let cartId = Domain.Infrastructure.CartId(System.Guid.NewGuid())
do! act buffer service itemCount context cartId skuId "EqxReadStreamEventsBackwardAsync-Duration"
Expand Down
Loading

0 comments on commit 2fa22c4

Please sign in to comment.