Skip to content

Commit

Permalink
Multi-event Batches (#48)
Browse files Browse the repository at this point in the history
* Multi-event batches; Tip now a Batch too
* Handle startIndex within multi-item batches
  • Loading branch information
bartelink authored Nov 29, 2018
1 parent d176146 commit 35d6b7e
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 164 deletions.
11 changes: 7 additions & 4 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ and [<NoEquality; NoComparison>] CosmosArguments =
| [<AltCommandLine("-d")>] Database of string
| [<AltCommandLine("-c")>] Collection of string
| [<AltCommandLine("-rt")>] RetriesWaitTime of int
| [<AltCommandLine("-a")>] PageSize of int

| [<CliPrefix(CliPrefix.None)>] Provision of ParseResults<CosmosProvisionArguments>
| [<CliPrefix(CliPrefix.None)>] Run of ParseResults<TestArguments>
Expand All @@ -89,6 +90,7 @@ and [<NoEquality; NoComparison>] CosmosArguments =
| Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)."
| Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)"
| PageSize _ -> "Specify maximum number of events to record on a page before switching to a new one (default: 1)"
| Provision _ -> "Initialize a store collection."
| Run _ -> "Run a load test."
and CosmosProvisionArguments =
Expand Down Expand Up @@ -119,7 +121,7 @@ module Cosmos =
let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) =
EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime)
.Connect("equinox-cli", discovery)
let createGateway connection maxItems = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems))
let createGateway connection (maxItems,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems, maxEventsPerSlice=maxEvents))

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Store =
Expand Down Expand Up @@ -306,9 +308,10 @@ let main argv =
let collName = sargs.GetResult(Collection, defaultArg (read "EQUINOX_COSMOS_COLLECTION") "equinox-test")
let timeout = sargs.GetResult(Timeout,5.) |> float |> TimeSpan.FromSeconds
let (retries, maxRetryWaitTime) as operationThrottling = sargs.GetResult(Retries, 1), sargs.GetResult(RetriesWaitTime, 5)
log.Information("Using CosmosDb Connection {connection} Database: {database} Collection: {collection}. " +
let pageSize = sargs.GetResult(PageSize,1)
log.Information("Using CosmosDb Connection {connection} Database: {database} Collection: {collection} with page size: {pageSize}. " +
"Request timeout: {timeout} with {retries} retries; throttling MaxRetryWaitTime {maxRetryWaitTime}",
connUri, dbName, collName, timeout, retries, maxRetryWaitTime)
connUri, dbName, collName, pageSize, timeout, retries, maxRetryWaitTime)
let conn = Cosmos.connect log discovery timeout operationThrottling |> Async.RunSynchronously
match sargs.TryGetSubCommand() with
| Some (Provision args) ->
Expand All @@ -317,7 +320,7 @@ let main argv =
Equinox.Cosmos.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously
0
| Some (Run targs) ->
let conn = Store.Cosmos (Cosmos.createGateway conn defaultBatchSize, dbName, collName)
let conn = Store.Cosmos (Cosmos.createGateway conn (defaultBatchSize,pageSize), dbName, collName)
let res = runTest log conn targs
let stats =
[ "Read", RuCounterSink.Read
Expand Down
Loading

0 comments on commit 35d6b7e

Please sign in to comment.