Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 9, 2018
1 parent 67ae40d commit 8466ee1
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 46 deletions.
25 changes: 15 additions & 10 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ and TestArguments =
| DurationM _ -> "specify a run duration in minutes (default: 1)."
| ErrorCutoff _ -> "specify an error cutoff (default: 10000)."
| ReportIntervalS _ -> "specify reporting intervals in seconds (default: 10)."
and Test = | Favorites | FavoritesCached
and Test = | Favorites | FavoritesCached | FavoritesIndexed
and [<NoEquality; NoComparison>] EsArguments =
| [<AltCommandLine("-vs")>] VerboseStore
| [<AltCommandLine("-o")>] Timeout of float
Expand Down Expand Up @@ -131,20 +131,28 @@ module Test =
clients.[clientIndex % clients.Length]
let selectClient = async { return async { return selectClient() } }
Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest
let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact
let fold, initial, compact, index = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact, Domain.Favorites.Folds.index
let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect()
let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings)
let codec = genCodec<Domain.Favorites.Events.Event>()
let createFavoritesService store cache log =
let createFavoritesService store (targs: ParseResults<TestArguments>) log =
let resolveStream streamName =
match store with
| Store.Mem store ->
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName)
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create(streamName)
| Store.Cosmos (gateway, databaseId, connectionId) ->
let cache = cache |> Option.map (fun c -> Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.))
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact, ?caching = cache).Create(databaseId, connectionId, streamName)
match targs.TryGetResult Name with
| Some FavoritesIndexed ->
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.IndexedSearch index).Create(databaseId, connectionId, streamName)
| _ ->
let cache =
match targs.TryGetResult Name with
| Some FavoritesCached -> Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50) |> Some
| _ -> None
let cache = cache |> Option.map (fun c -> Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.))
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact, ?caching = cache).Create(databaseId, connectionId, streamName)
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
let sku = Guid.NewGuid() |> SkuId
Expand All @@ -156,6 +164,7 @@ module Test =
module SerilogHelpers =
let (|CosmosReadRu|CosmosWriteRu|CosmosSliceRu|) (evt : Equinox.Cosmos.Log.Event) =
match evt with
| Equinox.Cosmos.Log.Index { ru = ru }
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Forward,_, { ru = ru })
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Backward,_, { ru = ru }) -> CosmosReadRu ru
| Equinox.Cosmos.Log.WriteSuccess {ru = ru }
Expand Down Expand Up @@ -213,11 +222,7 @@ let main argv =
let runTest (log: ILogger) conn (targs: ParseResults<TestArguments>) =
let verbose = args.Contains(VerboseDomain)
let domainLog = createDomainLog verbose verboseConsole maybeSeq
let cache =
match targs.TryGetResult Name with
| Some FavoritesCached -> Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50) |> Some
| _ -> None
let service = Test.createFavoritesService conn cache domainLog
let service = Test.createFavoritesService conn targs domainLog

let errorCutoff = targs.GetResult(ErrorCutoff,10000L)
let testsPerSecond = targs.GetResult(TestsPerSecond,1000)
Expand Down
1 change: 1 addition & 0 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ module Folds =
| Events.ItemWaiveReturnsChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with returnsWaived = e.waived } | i -> i))
let fold state = Seq.fold evolve state
let compact = Events.Compaction.EventType, fun state -> Events.Compacted (State.toSnapshot state)
let index = (fun et -> et = Events.Compaction.EventType), fun state -> seq [ yield Events.Compacted (State.toSnapshot state) ]
type Context = { time: System.DateTime; requestId : RequestId }
type Command =
| AddItem of Context * SkuId * quantity: int
Expand Down
1 change: 1 addition & 0 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ module Folds =
for e in events do evolve s e
s.AsState()
let compact = Events.Compaction.EventType, fun state -> Events.Compacted { net = state }
let index = (fun x -> x = Events.Compaction.EventType), fun state -> seq [ Events.Compacted { net = state } ]

type Command =
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
Expand Down
1 change: 1 addition & 0 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ module EquinoxCosmosInterop =
| Log.Slice (Direction.Backward,m) -> "EqxReadStreamEventsBackwardAsync", m, None, m.ru
| Log.Batch (Direction.Forward,c,m) -> "EqxLoadF", m, Some c, m.ru
| Log.Batch (Direction.Backward,c,m) -> "EqxLoadB", m, Some c, m.ru
| Log.Index m -> "EqxLoadI", m, None, m.ru
{ action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; batches = batches
interval = StopwatchInterval(metric.interval.StartTicks,metric.interval.EndTicks); ru = ru }

Expand Down
Loading

0 comments on commit 8466ee1

Please sign in to comment.