Skip to content

Commit

Permalink
Add initAux CLI command re #60
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 19, 2018
1 parent feeb10b commit 212a7b7
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 21 deletions.
9 changes: 7 additions & 2 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ param(
[Alias("cd")][string] $cosmosDatabase=$env:EQUINOX_COSMOS_DATABASE,
[Alias("cc")][string] $cosmosCollection=$env:EQUINOX_COSMOS_COLLECTION,
[Alias("scp")][switch][bool] $skipProvisionCosmos=$skipCosmos -or -not $cosmosServer -or -not $cosmosDatabase -or -not $cosmosCollection,
[Alias("ca")][switch][bool] $cosmosProvisionAux,
[Alias("scd")][switch][bool] $skipDeprovisionCosmos=$skipProvisionCosmos,
[string] $additionalMsBuildArgs="-t:Build"
)
Expand All @@ -29,10 +30,14 @@ if ($skipCosmos) {
} elseif ($skipProvisionCosmos) {
warn "Skipping Provisioning Cosmos"
} else {
warn "Provisioning cosmos..."
warn "Provisioning cosmos (without stored procedure)..."
# -P: inhibit creation of stored proc (everything in the repo should work without it due to auto-provisioning)
cliCosmos @("init", "-ru", "400", "-P")
cliCosmos @("init", "-ru", "400", "-P")
$deprovisionCosmos=$true
if ($cosmosProvisionAux) {
warn "Provisioning cosmos aux collection for projector..."
cliCosmos @("initAux", "-ru", "400")
}
}
$env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos

Expand Down
37 changes: 30 additions & 7 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ type Arguments =
| [<AltCommandLine("-S")>] LocalSeq
| [<AltCommandLine("-l")>] LogFile of string
| [<CliPrefix(CliPrefix.None); Last; Unique; AltCommandLine>] Run of ParseResults<TestArguments>
| [<CliPrefix(CliPrefix.None); Last; Unique; AltCommandLine("init")>] Initialize of ParseResults<InitArguments>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Init of ParseResults<InitArguments>
| [<CliPrefix(CliPrefix.None); Last; Unique; AltCommandLine("initAux")>] InitAux of ParseResults<InitAuxArguments>
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose -> "Include low level logging regarding specific test runs."
| VerboseConsole -> "Include low level test and store actions logging in on-screen output to console."
| LocalSeq -> "Configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net"
| LogFile _ -> "specify a log file to write the result breakdown into (default: Equinox.Cli.log)."
| Run _ -> "Run a load test"
| Initialize _ -> "Initialize a store"
| Init _ -> "Initialize store (presently only relevant for `cosmos`, where it creates database+collection+stored proc if not already present)."
| InitAux _ -> "Initialize auxilliary store (presently only relevant for `cosmos`, when you intend to run the [presently closed source] Projector)."
and [<NoComparison>]InitArguments =
| [<AltCommandLine("-ru"); Mandatory>] Rus of int
| [<AltCommandLine("-P")>] SkipStoredProc
Expand All @@ -37,6 +39,15 @@ and [<NoComparison>]InitArguments =
| Rus _ -> "Specify RU/s level to provision for the Application Collection."
| SkipStoredProc -> "Inhibit creation of stored procedure in cited Collection."
| Cosmos _ -> "Cosmos Connection parameters."
and [<NoComparison>]InitAuxArguments =
| [<AltCommandLine("-ru"); Mandatory>] Rus of int
| [<AltCommandLine("-s"); Mandatory>] Suffix of string
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<CosmosArguments>
interface IArgParserTemplate with
member a.Usage = a |> function
| Rus _ -> "Specify RU/s level to provision for the Application Collection."
| Suffix _ -> "Specify Collection Name suffix (default: `-aux`)."
| Cosmos _ -> "Cosmos Connection parameters."
and [<NoComparison>]WebArguments =
| [<AltCommandLine("-u")>] Endpoint of string
interface IArgParserTemplate with
Expand Down Expand Up @@ -201,24 +212,36 @@ let main argv =
let verbose = args.Contains Verbose
let log = createDomainLog verbose verboseConsole maybeSeq
match args.GetSubCommand() with
| Initialize iargs ->
let rus = iargs.GetResult(Rus)
| Init iargs ->
let rus = iargs.GetResult(InitArguments.Rus)
match iargs.TryGetSubCommand() with
| Some (InitArguments.Cosmos sargs) ->
let storeLog = createStoreLog (sargs.Contains CosmosArguments.VerboseStore) verboseConsole maybeSeq
let dbName, collName, (_pageSize: int), conn = Cosmos.conn (log,storeLog) sargs
log.Information("Configuring CosmosDb Collection with Throughput Provision: {rus:n0} RU/s", rus)
log.Information("Configuring CosmosDb Collection {collName} with Throughput Provision: {rus:n0} RU/s", collName, rus)
Async.RunSynchronously <| async {
do! Equinox.Cosmos.Store.Sync.Initialization.createDatabaseIfNotExists conn.Client dbName
do! Equinox.Cosmos.Store.Sync.Initialization.createCollectionIfNotExists conn.Client (dbName,collName) rus
do! Equinox.Cosmos.Store.Sync.Initialization.createBatchAndTipCollectionIfNotExists conn.Client (dbName,collName) rus
let collectionUri = Microsoft.Azure.Documents.Client.UriFactory.CreateDocumentCollectionUri(dbName,collName)
if not (iargs.Contains SkipStoredProc) then
do! Equinox.Cosmos.Store.Sync.Initialization.createSyncStoredProcIfNotExists (Some (upcast log)) conn.Client collectionUri }
| _ -> failwith "please specify a `cosmos` endpoint"
| InitAux iargs ->
let rus = iargs.GetResult(InitAuxArguments.Rus)
match iargs.TryGetSubCommand() with
| Some (InitAuxArguments.Cosmos sargs) ->
let storeLog = createStoreLog (sargs.Contains CosmosArguments.VerboseStore) verboseConsole maybeSeq
let dbName, collName, (_pageSize: int), conn = Cosmos.conn (log,storeLog) sargs
let collName = collName + (iargs.GetResult(InitAuxArguments.Suffix,"-aux"))
log.Information("Configuring CosmosDb Aux Collection {collName} with Throughput Provision: {rus:n0} RU/s", collName, rus)
Async.RunSynchronously <| async {
do! Equinox.Cosmos.Store.Sync.Initialization.createDatabaseIfNotExists conn.Client dbName
do! Equinox.Cosmos.Store.Sync.Initialization.createAuxCollectionIfNotExists conn.Client (dbName,collName) rus }
| _ -> failwith "please specify a `cosmos` endpoint"
| Run rargs ->
let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName
LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs
| _ -> failwith "Please specify a valid subcommand :- init or run"
| _ -> failwith "Please specify a valid subcommand :- init, initAux or run"
0
with e ->
printfn "%s" e.Message
Expand Down
33 changes: 21 additions & 12 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -415,29 +415,38 @@ function sync(req, expectedVersion, maxEvents) {
let createDatabaseIfNotExists (client:IDocumentClient) dbName =
let opts = Client.RequestOptions(ConsistencyLevel = Nullable ConsistencyLevel.Session)
client.CreateDatabaseIfNotExistsAsync(Database(Id=dbName), options = opts) |> Async.AwaitTaskCorrect |> Async.Ignore
let createCollectionIfNotExists (client: IDocumentClient) (dbName,collName) ru = async {
let private createCollectionIfNotExists (client: IDocumentClient) dbName (def: DocumentCollection, ru) = async {
let dbUri = Client.UriFactory.CreateDatabaseUri dbName
return! client.CreateDocumentCollectionIfNotExistsAsync(dbUri, def, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect |> Async.Ignore }
let private createStoredProcIfNotExists (client: IDocumentClient) (collectionUri: Uri) (name, body): Async<float> = async {
try let! r = client.CreateStoredProcedureAsync(collectionUri, StoredProcedure(Id = name, Body = body)) |> Async.AwaitTaskCorrect
return r.RequestCharge
with DocDbException ((DocDbStatusCode sc) as e) when sc = System.Net.HttpStatusCode.Conflict -> return e.RequestCharge }
let createBatchAndTipCollectionIfNotExists (client: IDocumentClient) (dbName,collName) ru : Async<unit> =
let pkd = PartitionKeyDefinition()
pkd.Paths.Add(sprintf "/%s" Batch.PartitionKeyField)
let colld = DocumentCollection(Id = collName, PartitionKey = pkd)
let def = DocumentCollection(Id = collName, PartitionKey = pkd)

colld.IndexingPolicy.IndexingMode <- IndexingMode.Consistent
colld.IndexingPolicy.Automatic <- true
def.IndexingPolicy.IndexingMode <- IndexingMode.Consistent
def.IndexingPolicy.Automatic <- true
// Can either do a blacklist or a whitelist
// Given how long and variable the blacklist would be, we whitelist instead
colld.IndexingPolicy.ExcludedPaths <- Collection [|ExcludedPath(Path="/*")|]
def.IndexingPolicy.ExcludedPaths <- Collection [|ExcludedPath(Path="/*")|]
// NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors
colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Batch.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |]
let dbUri = Client.UriFactory.CreateDatabaseUri dbName
return! client.CreateDocumentCollectionIfNotExistsAsync(dbUri, colld, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect |> Async.Ignore }
let private createStoredProcIfNotExists (client: IDocumentClient) (collectionUri: Uri) (name, body): Async<float> = async {
try let! r = client.CreateStoredProcedureAsync(collectionUri, StoredProcedure(Id = name, Body = body)) |> Async.AwaitTaskCorrect
return r.RequestCharge
with DocDbException ((DocDbStatusCode sc) as e) when sc = System.Net.HttpStatusCode.Conflict -> return e.RequestCharge }
def.IndexingPolicy.IncludedPaths <- Collection [| for k in Batch.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |]
createCollectionIfNotExists client dbName (def, ru)
let createSyncStoredProcIfNotExists (log: ILogger option) client collUri = async {
let! t, ru = createStoredProcIfNotExists client collUri (sprocName,sprocBody) |> Stopwatch.Time
match log with
| None -> ()
| Some log -> log.Information("Created stored procedure {sprocId} rc={ru} t={ms}", sprocName, ru, (let e = t.Elapsed in e.TotalMilliseconds)) }
let createAuxCollectionIfNotExists (client: IDocumentClient) (dbName,collName) ru : Async<unit> =
let def = DocumentCollection(Id = collName)
// for now, we are leaving the default IndexingPolicy mode wrt fields to index and in which manner as default: autoindexing all fields
def.IndexingPolicy.IndexingMode <- IndexingMode.Lazy
// Expire Projector documentId to Kafka offsets mapping records after one year
def.DefaultTimeToLive <- Nullable (365 * 60 * 60 * 24)
createCollectionIfNotExists client dbName (def, ru)

module internal Tip =
let private get (client: IDocumentClient) (stream: CollectionStream, maybePos: Position option) =
Expand Down

0 comments on commit 212a7b7

Please sign in to comment.