Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos core events API #49

Merged
merged 32 commits into from
Nov 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d11a17a
Reorganize, adding Batch structure
bartelink Nov 14, 2018
c5e998d
injecting stored procedure
jinglegit Nov 15, 2018
b943ec9
Remove extranneous filter from query
bartelink Nov 15, 2018
563affe
Rework stored procedure
jinglegit Nov 16, 2018
c053b0a
Rework stored procedure
bartelink Nov 16, 2018
67bbb4d
WIP; Not working
bartelink Nov 16, 2018
8ab4fc7
low level api
Nov 17, 2018
0a72479
PrettifyJS, plumb self
bartelink Nov 17, 2018
fae977e
Use etag and self to tune reads
bartelink Nov 17, 2018
a994f41
Tidy stats
bartelink Nov 18, 2018
64a47e9
Major Token management cleanup
bartelink Nov 18, 2018
4306574
Tidy Events API
bartelink Nov 18, 2018
9708d44
Simplify WIP doc updates, remove _self tracking
bartelink Nov 18, 2018
dc703f2
Parameterize page size
bartelink Nov 18, 2018
19e211d
Fix one test, neuter another
bartelink Nov 19, 2018
b231617
Add tests for low level api get and getAll
Nov 19, 2018
b7d2bc9
use correct compare functions
Nov 19, 2018
249bcc7
Extract/Tidy CosmosEventsIntegration
bartelink Nov 19, 2018
a1f2c79
Add Better json diffing
bartelink Nov 19, 2018
270937d
Validate request charges; adjust signatures
bartelink Nov 19, 2018
cd6983a
Add Batching/Slicing test coverage
bartelink Nov 19, 2018
4d5f187
Add Events loadBackwards
bartelink Nov 19, 2018
9f3a6c6
Move to index and streamName naming
bartelink Nov 19, 2018
ac78d82
Tests and tweaks for append conflict management
bartelink Nov 19, 2018
738282b
Add AppendAtEnd mode
bartelink Nov 20, 2018
76ccf94
Add getNextIndex, flesh out and namespace APIs
bartelink Nov 20, 2018
9623796
Emit single events (only), fix bugs
bartelink Nov 21, 2018
c9625dd
Fixing minor bugs
bartelink Nov 21, 2018
72a5cd4
Remove Cosmos RollingSnapshots
bartelink Nov 22, 2018
1e0b41d
Adjust test expectations
bartelink Nov 22, 2018
ca223d9
Fix minor bugs
bartelink Nov 23, 2018
fef6970
Add explicit non-indexed mode
bartelink Nov 23, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 59 additions & 36 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

open Argu
open Domain
open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.EventStore
open Infrastructure
open Serilog
Expand Down Expand Up @@ -119,7 +119,7 @@ module Cosmos =
let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) =
EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime)
.Connect("equinox-cli", discovery)
let createGateway connection batchSize = EqxGateway(connection, EqxBatchingPolicy(maxBatchSize = batchSize))
let createGateway connection maxItems = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems))

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Store =
Expand All @@ -135,7 +135,7 @@ module Test =
clients.[clientIndex % clients.Length]
let selectClient = async { return async { return selectClient() } }
Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest
let fold, initial, compact, index = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact, Domain.Favorites.Folds.index
let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact
let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect()
let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings)
let codec = genCodec<Domain.Favorites.Events.Event>()
Expand All @@ -147,22 +147,19 @@ module Test =
else None
let eqxCache =
if targs.Contains Cached then
let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
let c = Equinox.Cosmos.Builder.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.Builder.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
let resolveStream streamName =
let resolveStream =
match store with
| Store.Mem store ->
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName)
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create(streamName)
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create
| Store.Cosmos (gateway, databaseId, connectionId) ->
if targs.Contains Indexed then
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.IndexedSearch index, ?caching = cache)
.Create(databaseId, connectionId, streamName)
else
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact, ?caching = cache)
.Create(databaseId, connectionId, streamName)
let store = EqxStore(gateway, EqxCollections(databaseId, connectionId))
if targs.Contains Indexed then EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection compact, ?caching = eqxCache).Create
else EqxStreamBuilder(store, codec, fold, initial, ?access=None, ?caching = eqxCache).Create
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
let sku = Guid.NewGuid() |> SkuId
Expand All @@ -172,34 +169,41 @@ module Test =

[<AutoOpen>]
module SerilogHelpers =
let (|CosmosReadRu|CosmosWriteRu|CosmosSliceRu|) (evt : Equinox.Cosmos.Log.Event) =
let inline (|Stats|) ({ interval = i; ru = ru }: Equinox.Cosmos.Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds
let (|CosmosReadRu|CosmosWriteRu|CosmosResyncRu|CosmosSliceRu|) (evt : Equinox.Cosmos.Log.Event) =
match evt with
| Equinox.Cosmos.Log.Index { ru = ru }
| Equinox.Cosmos.Log.IndexNotFound { ru = ru }
| Equinox.Cosmos.Log.IndexNotModified { ru = ru }
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Forward,_, { ru = ru })
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Backward,_, { ru = ru }) -> CosmosReadRu ru
| Equinox.Cosmos.Log.WriteSuccess {ru = ru }
| Equinox.Cosmos.Log.WriteConflict {ru = ru } -> CosmosWriteRu ru
| Equinox.Cosmos.Log.Index (Stats s)
| Equinox.Cosmos.Log.IndexNotFound (Stats s)
| Equinox.Cosmos.Log.IndexNotModified (Stats s)
| Equinox.Cosmos.Log.Batch (_,_, (Stats s)) -> CosmosReadRu s
| Equinox.Cosmos.Log.WriteSuccess (Stats s)
| Equinox.Cosmos.Log.WriteConflict (Stats s) -> CosmosWriteRu s
| Equinox.Cosmos.Log.WriteResync (Stats s) -> CosmosResyncRu s
// slices are rolled up into batches so be sure not to double-count
| Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Direction.Forward,{ ru = ru })
| Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Direction.Backward,{ ru = ru }) -> CosmosSliceRu ru
| Equinox.Cosmos.Log.Slice (_,{ ru = ru }) -> CosmosSliceRu ru
let (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function
| (:? ScalarValue as x) -> Some x.Value
| _ -> None
let (|CosmosMetric|_|) (logEvent : LogEvent) : Equinox.Cosmos.Log.Event option =
match logEvent.Properties.TryGetValue("cosmosEvt") with
| true, SerilogScalar (:? Equinox.Cosmos.Log.Event as e) -> Some e
| _ -> None
type RuCounter =
{ mutable rux100: int64; mutable count: int64; mutable ms: int64 }
static member Create() = { rux100 = 0L; count = 0L; ms = 0L }
member __.Ingest (ru, ms) =
Interlocked.Increment(&__.count) |> ignore
Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore
Interlocked.Add(&__.ms, ms) |> ignore
type RuCounterSink() =
static let mutable readX10 = 0L
static let mutable writeX10 = 0L
static member Read = readX10 / 10L
static member Write = writeX10 / 10L
static member val Read = RuCounter.Create()
static member val Write = RuCounter.Create()
static member val Resync = RuCounter.Create()
interface Serilog.Core.ILogEventSink with
member __.Emit logEvent = logEvent |> function
| CosmosMetric (CosmosReadRu ru) -> Interlocked.Add(&readX10, int64 (ru*10.)) |> ignore
| CosmosMetric (CosmosWriteRu ru) -> Interlocked.Add(&writeX10, int64 (ru*10.)) |> ignore
| CosmosMetric (CosmosReadRu stats) -> RuCounterSink.Read.Ingest stats
| CosmosMetric (CosmosWriteRu stats) -> RuCounterSink.Write.Ingest stats
| CosmosMetric (CosmosResyncRu stats) -> RuCounterSink.Resync.Ingest stats
| _ -> ()

let createStoreLog verbose verboseConsole maybeSeqEndpoint =
Expand Down Expand Up @@ -260,7 +264,7 @@ let main argv =
let resultFile = createResultLog report
for r in results do
resultFile.Information("Aggregate: {aggregate}", r)
log.Information("Run completed; Current memory allocation: {bytes:n0}", GC.GetTotalMemory(true))
log.Information("Run completed; Current memory allocation: {bytes:n2}MB", (GC.GetTotalMemory(true) |> float) / 1024./1024.)
0

match args.GetSubCommand() with
Expand Down Expand Up @@ -309,15 +313,34 @@ let main argv =
match sargs.TryGetSubCommand() with
| Some (Provision args) ->
let rus = args.GetResult(Rus)
log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus:n0}", rus)
Equinox.Cosmos.Initialization.initialize conn.Client dbName collName rus |> Async.RunSynchronously
log.Information("Configuring CosmosDb Collection with Throughput Provision: {rus:n0} RU/s", rus)
Equinox.Cosmos.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously
0
| Some (Run targs) ->
let conn = Store.Cosmos (Cosmos.createGateway conn defaultBatchSize, dbName, collName)
let res = runTest log conn targs
let read, write = RuCounterSink.Read, RuCounterSink.Write
let total = read+write
log.Information("Total Request Charges sustained in test: {totalRus:n0} (R:{readRus:n0}, W:{writeRus:n0})", total, read, write)
let stats =
[ "Read", RuCounterSink.Read
"Write", RuCounterSink.Write
"Resync", RuCounterSink.Resync ]
let mutable totalCount, totalRc, totalMs = 0L, 0., 0L
let logActivity name count rc lat =
log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms",
name, count, rc, (if count = 0L then Double.NaN else rc/float count), (if count = 0L then Double.NaN else float lat/float count))
for name, stat in stats do
let ru = float stat.rux100 / 100.
totalCount <- totalCount + stat.count
totalRc <- totalRc + ru
totalMs <- totalMs + stat.ms
logActivity name stat.count ru stat.ms
logActivity "TOTAL" totalCount totalRc totalMs
let measures : (string * (TimeSpan -> float)) list =
[ "s", fun x -> x.TotalSeconds
"m", fun x -> x.TotalMinutes
"h", fun x -> x.TotalHours ]
let logPeriodicRate name count ru = log.Information("rp{name} {count:n0} = ~{ru:n0} RU", name, count, ru)
let duration = targs.GetResult(DurationM,1.) |> TimeSpan.FromMinutes
for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d)
res
| _ -> failwith "init or run is required"
| _ -> failwith "ERROR: please specify memory, es or cosmos Store"
Expand Down
3 changes: 1 addition & 2 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module Folds =
let toSnapshot (s: State) : Events.Compaction.State =
{ items = [| for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } |] }
let ofCompacted (s: Events.Compaction.State) : State =
{ items = [ for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } ] }
{ items = if s.items = null then [] else [ for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } ] }
let initial = { items = [] }
let evolve (state : State) event =
let updateItems f = { state with items = f state.items }
Expand All @@ -43,7 +43,6 @@ module Folds =
| Events.ItemWaiveReturnsChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with returnsWaived = e.waived } | i -> i))
let fold state = Seq.fold evolve state
let compact = Events.Compaction.EventType, fun state -> Events.Compacted (State.toSnapshot state)
let index = (fun et -> et = Events.Compaction.EventType), fun state -> seq [ yield Events.Compacted (State.toSnapshot state) ]
type Context = { time: System.DateTime; requestId : RequestId }
type Command =
| AddItem of Context * SkuId * quantity: int
Expand Down
4 changes: 3 additions & 1 deletion samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ module Events =
type Preferences = { manyPromotions : bool; littlePromotions : bool; productReview : bool; quickSurveys : bool }
type Value = { email : string; preferences : Preferences }

let [<Literal>] EventTypeName = "contactPreferencesChanged"
type Event =
| [<System.Runtime.Serialization.DataMember(Name = "contactPreferencesChanged")>]Updated of Value
| [<System.Runtime.Serialization.DataMember(Name = EventTypeName)>]Updated of Value
interface TypeShape.UnionContract.IUnionContract
let eventTypeNames = System.Collections.Generic.HashSet<string>([EventTypeName])

module Folds =
type State = Events.Preferences
Expand Down
1 change: 0 additions & 1 deletion samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ module Folds =
for e in events do evolve s e
s.AsState()
let compact = Events.Compaction.EventType, fun state -> Events.Compacted { net = state }
let index = (fun x -> x = Events.Compaction.EventType), fun state -> seq [ Events.Compacted { net = state } ]

type Command =
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
Expand Down
32 changes: 16 additions & 16 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.CartIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -17,15 +17,15 @@ let createServiceMem log store =

let codec = Equinox.EventStore.Integration.EventStoreIntegration.genCodec<Domain.Cart.Events.Event>()

let resolveGesStreamWithCompactionEventType gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create(streamName)
let resolveGesStreamWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial).Create(streamName)
let resolveGesStreamWithRollingSnapshots gateway =
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesStreamBuilder(gateway, codec, fold, initial).Create

let resolveEqxStreamWithCompactionEventType gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveEqxStreamWithProjection gateway =
EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection compact).Create
let resolveEqxStreamWithoutCustomAccessStrategy gateway =
EqxStreamBuilder(gateway, codec, fold, initial).Create

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
Expand Down Expand Up @@ -60,24 +60,24 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCompactionSemantics
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithCompactionEventType
let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithoutCompactionSemantics
let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithCompactionEventType
let ``Can roundtrip against Cosmos, correctly folding the events with With Projection`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithProjection
do! act service args
}
22 changes: 11 additions & 11 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.ContactPreferencesIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -16,15 +16,15 @@ let createServiceMem log store =
Backend.ContactPreferences.Service(log, MemoryStreamBuilder(store, fold, initial).Create)

let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let resolveStreamGesWithCompactionSemantics gateway streamName =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(streamName)
let resolveStreamGesWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(streamName)
let resolveStreamGesWithCompactionSemantics gateway =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create
let resolveStreamGesWithoutCompactionSemantics gateway =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

let resolveStreamEqxWithCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args)
let resolveStreamEqxWithCompactionSemantics gateway =
EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType Domain.ContactPreferences.Events.eventTypeNames).Create
let resolveStreamEqxWithoutCompactionSemantics gateway =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down Expand Up @@ -63,12 +63,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithoutCompactionSemantics
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithCompactionSemantics
do! act service args
}
Loading