diff --git a/Equinox.sln b/Equinox.sln index 8943628b9..dc0155d7a 100644 --- a/Equinox.sln +++ b/Equinox.sln @@ -46,7 +46,9 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Cosmos", "src\Equin EndProject Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Cosmos.Integration", "tests\Equinox.Cosmos.Integration\Equinox.Cosmos.Integration.fsproj", "{DE0FEBF0-72DC-4D4A-BBA7-788D875D6B4B}" EndProject -Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Web", "samples\Store\Web\Web.fsproj", "{1B0D4568-96FD-4083-8520-CD537C0B2FF0}" +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Web", "samples\Store\Web\Web.fsproj", "{1B0D4568-96FD-4083-8520-CD537C0B2FF0}" +EndProject +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Infrastructure", "samples\Store\Infrastructure\Infrastructure.fsproj", "{ACE52D04-2FE3-4FD6-A066-9C81429C3997}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -110,6 +112,10 @@ Global {1B0D4568-96FD-4083-8520-CD537C0B2FF0}.Debug|Any CPU.Build.0 = Debug|Any CPU {1B0D4568-96FD-4083-8520-CD537C0B2FF0}.Release|Any CPU.ActiveCfg = Release|Any CPU {1B0D4568-96FD-4083-8520-CD537C0B2FF0}.Release|Any CPU.Build.0 = Release|Any CPU + {ACE52D04-2FE3-4FD6-A066-9C81429C3997}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ACE52D04-2FE3-4FD6-A066-9C81429C3997}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ACE52D04-2FE3-4FD6-A066-9C81429C3997}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ACE52D04-2FE3-4FD6-A066-9C81429C3997}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -120,6 +126,7 @@ Global {406A280E-0708-4B12-8443-8FD5660CD271} = {D67D5A5F-2E59-4514-A997-FEBDC467AAF6} {0B2D5815-D6A5-4AAC-9B75-D57B165E2A92} = {D67D5A5F-2E59-4514-A997-FEBDC467AAF6} {1B0D4568-96FD-4083-8520-CD537C0B2FF0} = {D67D5A5F-2E59-4514-A997-FEBDC467AAF6} + {ACE52D04-2FE3-4FD6-A066-9C81429C3997} = {D67D5A5F-2E59-4514-A997-FEBDC467AAF6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {177E1E7B-E275-4FC6-AE3C-2C651ECCF71E} diff --git a/README.md b/README.md index 084b95d8d..62ddc803f 100644 --- a/README.md +++ b/README.md @@ -89,25 +89,45 @@ Run, including running the tests that assume you've got a local EventStore and p ## build, skip EventStore tests, skip auto-provisioning + de-provisioning Cosmos ./build -se -scp + +# BENCHMARKS + +A key facility of this repo is beoing able to run load tests, either in process against a nominated store, or via HTTP to a nominated Web app. The following tests are implemented at present: + +- `Favorite` - Simulate a very enthusiastic user that favorites things once per Second - triggering an ever-growing state which can only work efficiently if you: + - apply a snapshotting scheme (although being unbounded, it will eventually hit the store's limits - 4MB/event for EventStore, 3MB/document for CosmosDb) + - apply caching on CosmosDb (so re-reading and transporting the snapshots is eliminated from the RU/bandwidth/latency costs) +- `SaveForLater` - Simulate a happy shopper that saves 3 items per second, and empties the Save For Later list whenever it is full (when it hits 50 items) + - Snapshotting helps a lot + - Caching is not as essential as it is for the `Favorite` test + ## Run EventStore benchmark (when provisioned) - & .\cli\Equinox.Cli\bin\Release\net461\Equinox.Cli.exe es run - & dotnet run -f netcoreapp2.1 -p cli/equinox.cli -- es run + & .\cli\Equinox.Cli\bin\Release\net461\Equinox.Cli.exe run es + & dotnet run -f netcoreapp2.1 -p cli/equinox.cli -- run es ## run CosmosDb benchmark (when provisioned) -``` -$env:EQUINOX_COSMOS_CONNECTION="AccountEndpoint=https://....;AccountKey=....=;" -$env:EQUINOX_COSMOS_DATABASE="equinox-test" -$env:EQUINOX_COSMOS_COLLECTION="equinox-test" - -cli/Equinox.cli/bin/Release/net461/Equinox.Cli ` - cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION ` - run -dotnet run -f netcoreapp2.1 -p cli/equinox.cli -- ` - cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION ` - run -``` + $env:EQUINOX_COSMOS_CONNECTION="AccountEndpoint=https://....;AccountKey=....=;" + $env:EQUINOX_COSMOS_DATABASE="equinox-test" + $env:EQUINOX_COSMOS_COLLECTION="equinox-test" + + cli/Equinox.cli/bin/Release/net461/Equinox.Cli run ` + cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION + dotnet run -f netcoreapp2.1 -p cli/equinox.cli -- run ` + cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION + +## run Web benchmark + +The CLI can drive the Store/Web ASP.NET Core app. Doing so requires starting a web process with an appropriate store (Cosmos in this example, but can be `memory`/omitted, `es` etc as in the other examples) + +### in Window 1 + + & dotnet run -c Release -f netcoreapp2.1 -p samples/Store/Web -- -C -U cosmos + +### in Window 2 + + & dotnet run -c Release -f netcoreapp2.1 -p cli/Equinox.Cli -- run -t saveforlater -f 200 web # PROVISIONING @@ -120,6 +140,15 @@ For EventStore, the tests assume a running local instance configured as follows # run as a single-node cluster to allow connection logic to use cluster mode as for a commercial cluster & $env:ProgramData\chocolatey\bin\EventStore.ClusterNode.exe --gossip-on-single-node --discover-via-dns 0 --ext-http-port=30778 +## CosmosDb (when not using -sc) + +``` +dotnet run -f netcoreapp2.1 -p cli/equinox.cli -- init -ru 10000 ` + cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION +``` + +# DEPROVISIONING + ## Deprovisioning (aka nuking) EventStore data resulting from tests to reset baseline While EventStore rarely shows any negative effects from repeated load test runs, it can be useful for various reasons to drop all the data generated by the load tests by casting it to the winds:- @@ -127,13 +156,7 @@ While EventStore rarely shows any negative effects from repeated load test runs, # requires admin privilege rm $env:ProgramData\chocolatey\lib\eventstore-oss\tools\data -## COSMOSDB (when not using -sc) - -``` -dotnet run -f netcoreapp2.1 -p cli/equinox.cli -- cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION provision -ru 10000 -``` - -## DEPROVISIONING COSMOSDB +## Deprovisioning CosmosDb The above provisioning step provisions RUs in DocDB for the collection, which add up quickly. *When finished running any test, it's critical to drop the RU allocations back down again via some mechanism*. diff --git a/build.proj b/build.proj index d6e98cb3a..40d80e736 100644 --- a/build.proj +++ b/build.proj @@ -25,4 +25,4 @@ - + \ No newline at end of file diff --git a/build.ps1 b/build.ps1 index 3b27d0148..9762133db 100644 --- a/build.ps1 +++ b/build.ps1 @@ -20,8 +20,8 @@ $env:EQUINOX_INTEGRATION_SKIP_EVENTSTORE=[string]$skipEs if ($skipEs) { warn "Skipping EventStore tests" } function cliCosmos($arghs) { - Write-Host "dotnet run cli/Equinox.Cli cosmos -s -d $cosmosDatabase -c $cosmosCollection $arghs" - dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs + Write-Host "dotnet run cli/Equinox.Cli -- $arghs cosmos -s -d $cosmosDatabase -c $cosmosCollection" + dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 -- @arghs cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection } if ($skipCosmos) { @@ -30,7 +30,7 @@ if ($skipCosmos) { warn "Skipping Provisioning Cosmos" } else { warn "Provisioning cosmos..." - cliCosmos @("provision", "-ru", "1000") + cliCosmos @("init", "-ru", "1000") $deprovisionCosmos=$true } $env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos diff --git a/cli/Equinox.Cli/Clients.fs b/cli/Equinox.Cli/Clients.fs new file mode 100644 index 000000000..07dfff15a --- /dev/null +++ b/cli/Equinox.Cli/Clients.fs @@ -0,0 +1,62 @@ +module Equinox.Cli.Clients + +open Domain +open Equinox.Cli.Infrastructure +open System +open System.Net +open System.Net.Http + +type Session(client: HttpClient, clientId: ClientId) = + + member __.Send(req : HttpRequestMessage) : Async = + let req = req |> HttpReq.withHeader "COMPLETELY_INSECURE_CLIENT_ID" clientId.Value + client.Send(req) + +type Favorited = { date: System.DateTimeOffset; skuId: SkuId } + +type FavoritesClient(session: Session) = + + member __.Favorite(skus: SkuId[]) = async { + let request = HttpReq.post () |> HttpReq.withPath "api/favorites" |> HttpReq.withJsonNet skus + let! response = session.Send request + do! response.EnsureStatusCode(HttpStatusCode.NoContent) + } + + member __.List = async { + let request = HttpReq.get () |> HttpReq.withPath "api/favorites" + let! response = session.Send request + return! response |> HttpRes.deserializeOkJsonNet + } + +type Saved = { skuId : SkuId; dateSaved : DateTimeOffset } + +type SavesClient(session: Session) = + + // this (returning a bool indicating whether it got saved) is fine for now + // IRL we don't want to be leaning on the fact we get a 400 when we exceed the max imems limit as a core API design element + member __.Save(skus: SkuId[]) : Async = async { + let request = HttpReq.post () |> HttpReq.withPath "api/saves" |> HttpReq.withJsonNet skus + let! response = session.Send request + if response.StatusCode = HttpStatusCode.BadRequest then + return false + else + do! response.EnsureStatusCode(HttpStatusCode.NoContent) + return true + } + + member __.Remove(skus: SkuId[]) : Async = async { + let request = HttpReq.delete () |> HttpReq.withPath "api/saves" |> HttpReq.withJsonNet skus + let! response = session.Send request + return! response.EnsureStatusCode(HttpStatusCode.NoContent) + } + + member __.List = async { + let request = HttpReq.get () |> HttpReq.withPath "api/saves" + let! response = session.Send request + return! response |> HttpRes.deserializeOkJsonNet + } + +type Session with + + member session.Favorites = FavoritesClient session + member session.Saves = SavesClient session \ No newline at end of file diff --git a/cli/Equinox.Cli/Equinox.Cli.fsproj b/cli/Equinox.Cli/Equinox.Cli.fsproj index 649f0c788..77ef164d3 100644 --- a/cli/Equinox.Cli/Equinox.Cli.fsproj +++ b/cli/Equinox.Cli/Equinox.Cli.fsproj @@ -20,37 +20,35 @@ + + - + - + - - - - - + diff --git a/cli/Equinox.Cli/Infrastructure/Infrastructure.fs b/cli/Equinox.Cli/Infrastructure/Infrastructure.fs index 7fd7de5a5..b48f3284b 100644 --- a/cli/Equinox.Cli/Infrastructure/Infrastructure.fs +++ b/cli/Equinox.Cli/Infrastructure/Infrastructure.fs @@ -31,6 +31,12 @@ type Exception with Unchecked.defaultof<_> type Async with + /// + /// Raises an exception using Async's continuation mechanism directly. + /// + /// Exception to be raised. + static member Raise (exn : #exn) = Async.FromContinuations(fun (_,ec,_) -> ec exn) + #if NET461 static member Choice<'T>(workflows : seq>) : Async<'T option> = async { try @@ -111,4 +117,191 @@ type StringBuilder with static member inline Build(builder : StringBuilder -> unit) = let instance = StringBuilder() // TOCONSIDER PooledStringBuilder.GetInstance() builder instance - instance.ToString() \ No newline at end of file + instance.ToString() + +[] +module HttpHelpers = + + open System.Net + open System.Net.Http + open System.Runtime.Serialization + + /// Operations on System.Net.HttpRequestMessage + [] + module HttpReq = + + /// Creates an HTTP GET request. + let inline create () = new HttpRequestMessage() + + /// Assigns a method to an HTTP request. + let inline withMethod (m : HttpMethod) (req : HttpRequestMessage) = + req.Method <- m + req + + /// Creates an HTTP GET request. + let inline get () = create () + + /// Creates an HTTP POST request. + let inline post () = create () |> withMethod HttpMethod.Post + + /// Creates an HTTP DELET request. + let inline delete () = create () |> withMethod HttpMethod.Delete + + /// Assigns a path to an HTTP request. + let inline withUri (u : Uri) (req : HttpRequestMessage) = + req.RequestUri <- u + req + + /// Assigns a path to an HTTP request. + let inline withPath (p : string) (req : HttpRequestMessage) = + req |> withUri (Uri(p, UriKind.Relative)) + + /// Assigns a path to a Http request using printf-like formatting. + let inline withPathf fmt = + Printf.ksprintf withPath fmt + + /// Uses supplied string as UTF8-encoded json request body + let withJsonString (json : string) (request : HttpRequestMessage) = + request.Content <- new StringContent(json, Encoding.UTF8, "application/json") + request + + /// Use supplied serialize to convert the request to a json string, include that as a UTF-8 encoded body + let withJson (serialize : 'Request -> string) (input : 'Request) (request : HttpRequestMessage) = + request |> withJsonString (serialize input) + + /// Use Batman Json.Net profile convert the request to a json rendering + let withJsonNet<'Request> (input : 'Request) (request : HttpRequestMessage) = + request |> withJson Newtonsoft.Json.JsonConvert.SerializeObject input + + /// Appends supplied header to request header + let inline withHeader (name : string) (value : string) (request : HttpRequestMessage) = + request.Headers.Add(name, value) + request + + type HttpContent with + member c.ReadAsString() = async { + match c with + | null -> return null + | c -> return! c.ReadAsStringAsync() |> Async.AwaitTaskCorrect + } + + // only intended for logging under control of InvalidHttpResponseException, hence the esoteric name + member internal c.ReadAsStringDiapered() = async { + try return! c.ReadAsString() + with :? ObjectDisposedException -> return "" + } + + type HttpClient with + /// + /// Drop-in replacement for HttpClient.SendAsync which addresses known timeout issues + /// + /// HttpRequestMessage to be submitted. + member client.Send(msg : HttpRequestMessage) = async { + let! ct = Async.CancellationToken + try return! client.SendAsync(msg, ct) |> Async.AwaitTaskCorrect + // address https://github.com/dotnet/corefx/issues/20296 + with :? System.Threading.Tasks.TaskCanceledException -> + let message = + match client.BaseAddress with + | null -> "HTTP request timeout" + | baseAddr -> sprintf "HTTP request timeout [%O]" baseAddr + + return! Async.Raise(TimeoutException message) + } + + /// Exception indicating an unexpected response received by an Http Client + type InvalidHttpResponseException = + inherit Exception + + // TODO: include headers + val private userMessage : string + val private requestMethod : string + val RequestUri : Uri + val RequestBody : string + val StatusCode : HttpStatusCode + val ReasonPhrase : string + val ResponseBody : string + + member __.RequestMethod = new HttpMethod(__.requestMethod) + + private new (userMessage : string, requestMethod : HttpMethod, requestUri : Uri, requestBody : string, + statusCode : HttpStatusCode, reasonPhrase : string, responseBody : string, + ?innerException : exn) = + { + inherit Exception(message = null, innerException = defaultArg innerException null) ; userMessage = userMessage ; + requestMethod = string requestMethod ; RequestUri = requestUri ; RequestBody = requestBody ; + StatusCode = statusCode ; ReasonPhrase = reasonPhrase ; ResponseBody = responseBody + } + + override e.Message = + StringBuilder.Build(fun sb -> + sb.Appendfn "%s %O RequestUri=%O HttpStatusCode=%O" e.userMessage e.RequestMethod e.RequestUri e.StatusCode + let getBodyString str = if String.IsNullOrWhiteSpace str then "" else str + sb.Appendfn "RequestBody=%s" (getBodyString e.RequestBody) + sb.Appendfn "ResponseBody=%s" (getBodyString e.ResponseBody)) + + interface ISerializable with + member e.GetObjectData(si : SerializationInfo, sc : StreamingContext) = + let add name (value:obj) = si.AddValue(name, value) + base.GetObjectData(si, sc) ; add "userMessage" e.userMessage ; + add "requestUri" e.RequestUri ; add "requestMethod" e.requestMethod ; add "requestBody" e.RequestBody + add "statusCode" e.StatusCode ; add "reasonPhrase" e.ReasonPhrase ; add "responseBody" e.ResponseBody + + new (si : SerializationInfo, sc : StreamingContext) = + let get name = si.GetValue(name, typeof<'a>) :?> 'a + { + inherit Exception(si, sc) ; userMessage = get "userMessage" ; + RequestUri = get "requestUri" ; requestMethod = get "requestMethod" ; RequestBody = get "requestBody" ; + StatusCode = get "statusCode" ; ReasonPhrase = get "reasonPhrase" ; ResponseBody = get "responseBody" + } + + static member Create(userMessage : string, response : HttpResponseMessage, ?innerException : exn) = async { + let request = response.RequestMessage + let! responseBodyC = response.Content.ReadAsStringDiapered() |> Async.StartChild + let! requestBody = request.Content.ReadAsStringDiapered() + let! responseBody = responseBodyC + return + new InvalidHttpResponseException( + userMessage, request.Method, request.RequestUri, requestBody, + response.StatusCode, response.ReasonPhrase, responseBody, + ?innerException = innerException) + } + + static member Create(response : HttpResponseMessage, ?innerException : exn) = + InvalidHttpResponseException.Create("HTTP request yielded unexpected response.", response, ?innerException = innerException) + + type HttpResponseMessage with + + /// Raises an InvalidHttpResponseException if the response status code does not match expected value. + member response.EnsureStatusCode(expectedStatusCode : HttpStatusCode) = async { + if response.StatusCode <> expectedStatusCode then + let! exn = InvalidHttpResponseException.Create("Http request yielded unanticipated HTTP Result.", response) + do raise exn + } + + /// Asynchronously deserializes the json response content using the supplied `deserializer`, without validating the `StatusCode` + /// The decoder routine to apply to the body content. Exceptions are wrapped in exceptions containing the offending content. + member response.InterpretContent<'Decoded>(deserializer : string -> 'Decoded) : Async<'Decoded> = async { + let! content = response.Content.ReadAsString() + try return deserializer content + with e -> + let! exn = InvalidHttpResponseException.Create("HTTP response could not be decoded.", response, e) + return raise exn + } + + /// Asynchronously deserializes the json response content using the supplied `deserializer`, validating the `StatusCode` is `expectedStatusCode` + /// check that status code matches supplied code or raise a InvalidHttpResponseException if it doesn't. + /// The decoder routine to apply to the body content. Exceptions are wrapped in exceptions containing the offending content. + member response.Interpret<'Decoded>(expectedStatusCode : HttpStatusCode, deserializer : string -> 'Decoded) : Async<'Decoded> = async { + do! response.EnsureStatusCode expectedStatusCode + return! response.InterpretContent deserializer + } + + module HttpRes = + /// Deserialize body using default Json.Net profile - throw with content details if StatusCode is unexpected or decoding fails + let deserializeExpectedJsonNet<'t> expectedStatusCode (res : HttpResponseMessage) = + res.Interpret(expectedStatusCode, Newtonsoft.Json.JsonConvert.DeserializeObject<'t>) + + /// Deserialize body using Batman Json.Net profile - throw with content details if StatusCode is not OK or decoding fails + let deserializeOkJsonNet<'t> = + deserializeExpectedJsonNet<'t> HttpStatusCode.OK \ No newline at end of file diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index 17bf2f8f1..2abf56932 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -2,40 +2,58 @@ open Argu open Domain.Infrastructure -open Equinox.Cosmos -open Equinox.EventStore open Equinox.Cli.Infrastructure +open Microsoft.Extensions.DependencyInjection +open Samples +open Samples.Config +open Samples.Log open Serilog open Serilog.Events open System open System.Threading +open System.Net.Http [] type Arguments = - | [] VerboseDomain + | [] Verbose | [] VerboseConsole | [] LocalSeq | [] LogFile of string - | [] Memory of ParseResults - | [] Es of ParseResults - | [] Cosmos of ParseResults + | [] Initialize of ParseResults + | [] Run of ParseResults interface IArgParserTemplate with member a.Usage = a |> function - | VerboseDomain -> "Include low level Domain logging." + | Verbose -> "Include low level Domain logging." | VerboseConsole -> "Include low level Domain and Store logging in screen output." | LocalSeq -> "Configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net" | LogFile _ -> "specify a log file to write the result breakdown (default: Equinox.Cli.log)." - | Memory _ -> "specify In-Memory Volatile Store baseline test" - | Es _ -> "specify EventStore actions" - | Cosmos _ -> "specify CosmosDb actions" -and TestArguments = - | [] Name of Test + | Run _ -> "Run a load test" + | Initialize _ -> "Initialize a store" +and []InitArguments = + | [] Rus of int + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | Rus _ -> "Specify RUs to Allocate for the Application Collection." + | Cosmos _ -> "Cosmos connection parameters." +and []WebArguments = + | [] Endpoint of string + interface IArgParserTemplate with + member a.Usage = a |> function + | Endpoint _ -> "Target address. Default: https://localhost:5001/api" +and [] + TestArguments = + | [] Name of Tests.Test | [] Cached | [] Unfolds | [] TestsPerSecond of int | [] DurationM of float | [] ErrorCutoff of int64 | [] ReportIntervalS of int + | [] Memory of ParseResults + | [] Es of ParseResults + | [] Cosmos of ParseResults + | [] Web of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Name _ -> "Specify which test to run. (default: Favorites)" @@ -45,237 +63,128 @@ and TestArguments = | DurationM _ -> "specify a run duration in minutes (default: 30)." | ErrorCutoff _ -> "specify an error cutoff (default: 10000)." | ReportIntervalS _ -> "specify reporting intervals in seconds (default: 10)." -and Test = Favorites | SaveForLater -and [] EsArguments = - | [] VerboseStore - | [] Timeout of float - | [] Retries of int - | [] Host of string - | [] Username of string - | [] Password of string - | [] ConcurrentOperationsLimit of int - | [] HeartbeatTimeout of float - - | [] Run of ParseResults - interface IArgParserTemplate with - member a.Usage = a |> function - | VerboseStore -> "Include low level Store logging." - | Timeout _ -> "specify operation timeout in seconds (default: 5)." - | Retries _ -> "specify operation retries (default: 1)." - | Host _ -> "specify a DNS query, using Gossip-driven discovery against all A records returned (default: localhost)." - | Username _ -> "specify a username (default: admin)." - | Password _ -> "specify a Password (default: changeit)." - | ConcurrentOperationsLimit _ -> "max concurrent operations in flight (default: 5000)." - | HeartbeatTimeout _ -> "specify heartbeat timeout in seconds (default: 1.5)." - | Run _ -> "Run a load test." -and [] CosmosArguments = - | [] VerboseStore - | [] Timeout of float - | [] Retries of int - | [] Connection of string - | [] Database of string - | [] Collection of string - | [] RetriesWaitTime of int - | [] PageSize of int - - | [] Provision of ParseResults - | [] Run of ParseResults - interface IArgParserTemplate with - member a.Usage = a |> function - | VerboseStore -> "Include low level Store logging." - | Timeout _ -> "specify operation timeout in seconds (default: 5)." - | Retries _ -> "specify operation retries (default: 1)." - | Connection _ -> "specify a connection string for a Cosmos account (defaults: envvar:EQUINOX_COSMOS_CONNECTION, Cosmos Emulator)." - | Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)." - | Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)." - | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" - | PageSize _ -> "Specify maximum number of events to record on a page before switching to a new one (default: 1)" - | Provision _ -> "Initialize a store collection." - | Run _ -> "Run a load test." -and CosmosProvisionArguments = - | [] Rus of int - interface IArgParserTemplate with - member a.Usage = a |> function - | Rus _ -> "Specify RUs to Allocate for the Application Collection." - -let defaultBatchSize = 500 - -module EventStore = - /// To establish a local node to run the tests against: - /// 1. cinst eventstore-oss -y # where cinst is an invocation of the Chocolatey Package Installer on Windows - /// 2. & $env:ProgramData\chocolatey\bin\EventStore.ClusterNode.exe --gossip-on-single-node --discover-via-dns 0 --ext-http-port=30778 - let connect (log: ILogger) (dnsQuery, heartbeatTimeout, col) (username, password) (operationTimeout, operationRetries) = - GesConnector(username, password, reqTimeout=operationTimeout, reqRetries=operationRetries, - heartbeatTimeout=heartbeatTimeout, concurrentOperationsLimit = col, - log=(if log.IsEnabled(LogEventLevel.Debug) then Logger.SerilogVerbose log else Logger.SerilogNormal log), - tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string]) - .Establish("equinox-cli", Discovery.GossipDns dnsQuery, ConnectionStrategy.ClusterTwinPreferSlaveReads) - let createGateway connection batchSize = GesGateway(connection, GesBatchingPolicy(maxBatchSize = batchSize)) - -module Cosmos = - /// Standing up an Equinox instance is necessary to run for test purposes; You'll need to either: - /// 1) replace connection below with a connection string or Uri+Key for an initialized Equinox instance with a database and collection named "equinox-test" - /// 2) Set the 3x environment variables and create a local Equinox using cli/Equinox.cli/bin/Release/net461/Equinox.Cli ` - /// cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION provision -ru 1000 - let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) = - EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime) - .Connect("equinox-cli", discovery) - let createGateway connection (maxItems,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems, maxEventsPerSlice=maxEvents)) + | Memory _ -> "Target in-process Transient Memory Store (Default if not other target specified)" + | Es _ -> "Run transaction in-process against EventStore" + | Cosmos _ -> "Run transaction in-process against CosmosDb" + | Web _ -> "Run transaction against Web endpoint" -[] -type Store = - | Mem of Equinox.MemoryStore.VolatileStore - | Es of GesGateway - | Cosmos of EqxGateway * databaseId: string * collectionId: string +let createStoreLog verbose verboseConsole maybeSeqEndpoint = + let c = LoggerConfiguration().Destructure.FSharpTypes() + let c = if verbose then c.MinimumLevel.Debug() else c + let c = c.WriteTo.Sink(Log.SerilogHelpers.RuCounterSink()) + let c = c.WriteTo.Console((if verbose && verboseConsole then LogEventLevel.Debug else LogEventLevel.Warning), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) + let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) + c.CreateLogger() :> ILogger -module Test = - let run log testsPerSecond duration errorCutoff reportingIntervals (clients : ClientId[]) runSingleTest = +module LoadTest = + let private runLoadTest log testsPerSecond duration errorCutoff reportingIntervals (clients : ClientId[]) runSingleTest = let mutable idx = -1L let selectClient () = let clientIndex = Interlocked.Increment(&idx) |> int clients.[clientIndex % clients.Length] let selectClient = async { return async { return selectClient() } } Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest - let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect() - let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings) - type EsResolver(useCache) = - member val Cache = - if useCache then - let c = Equinox.EventStore.Caching.Cache("Cli", sizeMb = 50) - CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some - else None - member __.CreateAccessStrategy snapshot = - match snapshot with - | None -> None - | Some snapshot -> Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some - type CosmosResolver(useCache) = - member val Cache = - if useCache then - let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50) - Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some - else None - member __.CreateAccessStrategy snapshot = - match snapshot with - | None -> None - | Some snapshot -> AccessStrategy.Snapshot snapshot |> Some - type Builder(store, useCache, useUnfolds) = - member __.ResolveStream - ( codec : Equinox.UnionCodec.IUnionEncoder<'event,byte[]>, - fold: ('state -> 'event seq -> 'state), - initial: 'state, - snapshot: (('event -> bool) * ('state -> 'event))) = - let snapshot = if useUnfolds then Some snapshot else None - match store with - | Store.Mem store -> - Equinox.MemoryStore.MemResolver(store, fold, initial).Resolve - | Store.Es gateway -> - let resolver = EsResolver(useCache) - GesResolver<'event,'state>(gateway, codec, fold, initial, ?access = resolver.CreateAccessStrategy(snapshot), ?caching = resolver.Cache).Resolve - | Store.Cosmos (gateway, databaseId, connectionId) -> - let resolver = CosmosResolver(useCache) - let store = EqxStore(gateway, EqxCollections(databaseId, connectionId)) - EqxResolver<'event,'state>(store, codec, fold, initial, ?access = resolver.CreateAccessStrategy snapshot, ?caching = resolver.Cache).Resolve - - let createTest store test (cache,unfolds) log = - let builder = Builder(store, cache, unfolds) - match test with - | Favorites -> - let fold, initial, snapshot = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.snapshot - let codec = genCodec() - let service = Backend.Favorites.Service(log, builder.ResolveStream(codec,fold,initial,snapshot)) - fun clientId -> async { - let sku = Guid.NewGuid() |> SkuId - do! service.Favorite(clientId,[sku]) - let! items = service.List clientId - if items |> Array.exists (fun x -> x.skuId = sku) |> not then invalidOp "Added item not found" } - | SaveForLater -> - let fold, initial, snapshot = Domain.SavedForLater.Folds.fold, Domain.SavedForLater.Folds.initial, Domain.SavedForLater.Folds.snapshot - let codec = genCodec() - let service = Backend.SavedForLater.Service(log, builder.ResolveStream(codec,fold,initial,snapshot), maxSavedItems=50, maxAttempts=3) - fun clientId -> async { - let skus = [Guid.NewGuid() |> SkuId; Guid.NewGuid() |> SkuId; Guid.NewGuid() |> SkuId] - let! saved = service.Save(clientId,skus) - if saved then - let! items = service.List clientId - if skus |> List.forall (fun sku -> items |> Array.exists (fun x -> x.skuId = sku)) |> not then invalidOp "Added item not found" - else - let! current = service.List clientId - let resolveSkus _hasSku = async { - return [|for x in current -> x.skuId|] } - let! removed = service.Remove(clientId, resolveSkus) - if not removed then invalidOp "Remove failed" } - - let createRunner conn domainLog verbose (targs: ParseResults) = - let test = targs.GetResult(Name,Favorites) - let options = targs.GetResults Cached @ targs.GetResults Unfolds - let cache, unfold = options |> List.contains Cached, options |> List.contains Unfolds - let run = createTest conn test (cache,unfold) domainLog + let private decorateWithLogger (domainLog : ILogger, verbose) (run: 't -> Async) = let execute clientId = if not verbose then run clientId else async { domainLog.Information("Executing for client {sessionId}", clientId) try return! run clientId with e -> domainLog.Warning(e, "Test threw an exception"); e.Reraise () } - test,options,execute - -[] -module SerilogHelpers = - let inline (|Stats|) ({ interval = i; ru = ru }: Equinox.Cosmos.Store.Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds - open Equinox.Cosmos.Store - let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|) = function - | Log.Tip (Stats s) - | Log.TipNotFound (Stats s) - | Log.TipNotModified (Stats s) - | Log.Query (_,_, (Stats s)) -> CosmosReadRc s - // slices are rolled up into batches so be sure not to double-count - | Log.Response (_,(Stats s)) -> CosmosResponseRc s - | Log.SyncSuccess (Stats s) - | Log.SyncConflict (Stats s) -> CosmosWriteRc s - | Log.SyncResync (Stats s) -> CosmosResyncRc s - let (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function - | (:? ScalarValue as x) -> Some x.Value - | _ -> None - let (|CosmosMetric|_|) (logEvent : LogEvent) : Log.Event option = - match logEvent.Properties.TryGetValue("cosmosEvt") with - | true, SerilogScalar (:? Log.Event as e) -> Some e - | _ -> None - type RuCounter = - { mutable rux100: int64; mutable count: int64; mutable ms: int64 } - static member Create() = { rux100 = 0L; count = 0L; ms = 0L } - member __.Ingest (ru, ms) = - Interlocked.Increment(&__.count) |> ignore - Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore - Interlocked.Add(&__.ms, ms) |> ignore - type RuCounterSink() = - static member val Read = RuCounter.Create() - static member val Write = RuCounter.Create() - static member val Resync = RuCounter.Create() - interface Serilog.Core.ILogEventSink with - member __.Emit logEvent = logEvent |> function - | CosmosMetric (CosmosReadRc stats) -> RuCounterSink.Read.Ingest stats - | CosmosMetric (CosmosWriteRc stats) -> RuCounterSink.Write.Ingest stats - | CosmosMetric (CosmosResyncRc stats) -> RuCounterSink.Resync.Ingest stats - | _ -> () + execute + let private createResultLog fileName = + LoggerConfiguration() + .Destructure.FSharpTypes() + .WriteTo.File(fileName) + .CreateLogger() + let run (log: ILogger) (verbose,verboseConsole,maybeSeq) reportFilename (args: ParseResults) = + let storage = args.TryGetSubCommand() + + let createStoreLog verboseStore = createStoreLog verboseStore verboseConsole maybeSeq + let storeLog, storeConfig, httpClient: ILogger * StorageConfig option * HttpClient option = + let options = args.GetResults Cached @ args.GetResults Unfolds + let cache, unfolds = options |> List.contains Cached, options |> List.contains Unfolds + match storage with + | Some (Web wargs) -> + let uri = wargs.GetResult(WebArguments.Endpoint,"https://localhost:5001") |> Uri + log.Information("Running web test targetting: {url}", uri) + createStoreLog false, None, new HttpClient(BaseAddress=uri) |> Some + | Some (Es sargs) -> + let storeLog = createStoreLog <| sargs.Contains EsArguments.VerboseStore + log.Information("EventStore Storage options: {options:l}", options) + storeLog, EventStore.config (log,storeLog) (cache, unfolds) sargs |> Some, None + | Some (Cosmos sargs) -> + let storeLog = createStoreLog <| sargs.Contains CosmosArguments.VerboseStore + log.Information("CosmosDb Storage options: {options:l}", options) + storeLog, Cosmos.config (log,storeLog) (cache, unfolds) sargs |> Some, None + | _ | Some (Memory _) -> + log.Information("Volatile Store; Storage options: {options:l}", options) + createStoreLog false, MemoryStore.config () |> Some, None + let test = args.GetResult(Name,Tests.Favorite) + let runSingleTest : ClientId -> Async = + match storeConfig, httpClient with + | None, Some client -> + let execForClient = Tests.executeRemote client test + decorateWithLogger (log,verbose) execForClient + | Some storeConfig, _ -> + let services = ServiceCollection() + Services.registerServices(services, storeConfig, storeLog) + let container = services.BuildServiceProvider() + let execForClient = Tests.executeLocal container test + decorateWithLogger (log, verbose) execForClient + | None, None -> invalidOp "impossible None, None" + let errorCutoff = args.GetResult(ErrorCutoff,10000L) + let testsPerSecond = args.GetResult(TestsPerSecond,1000) + let duration = args.GetResult(DurationM,30.) |> TimeSpan.FromMinutes + let reportingIntervals = + match args.GetResults(ReportIntervalS) with + | [] -> TimeSpan.FromSeconds 10.|> Seq.singleton + | intervals -> seq { for i in intervals -> TimeSpan.FromSeconds(float i) } + |> fun intervals -> [| yield duration; yield! intervals |] + let clients = Array.init (testsPerSecond * 2) (fun _ -> Guid.NewGuid () |> ClientId) + + log.Information( "Running {test} for {duration} @ {tps} hits/s across {clients} clients; Max errors: {errorCutOff}, reporting intervals: {ri}, report file: {report}", + test, duration, testsPerSecond, clients.Length, errorCutoff, reportingIntervals, reportFilename) + let results = runLoadTest log testsPerSecond (duration.Add(TimeSpan.FromSeconds 5.)) errorCutoff reportingIntervals clients runSingleTest |> Async.RunSynchronously + + let resultFile = createResultLog reportFilename + for r in results do + resultFile.Information("Aggregate: {aggregate}", r) + log.Information("Run completed; Current memory allocation: {bytes:n2} MiB", (GC.GetTotalMemory(true) |> float) / 1024./1024.) + + match storeConfig with + | Some (StorageConfig.Cosmos _) -> + let stats = + [ "Read", RuCounterSink.Read + "Write", RuCounterSink.Write + "Resync", RuCounterSink.Resync ] + let mutable totalCount, totalRc, totalMs = 0L, 0., 0L + let logActivity name count rc lat = + log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms", + name, count, rc, (if count = 0L then Double.NaN else rc/float count), (if count = 0L then Double.NaN else float lat/float count)) + for name, stat in stats do + let ru = float stat.rux100 / 100. + totalCount <- totalCount + stat.count + totalRc <- totalRc + ru + totalMs <- totalMs + stat.ms + logActivity name stat.count ru stat.ms + logActivity "TOTAL" totalCount totalRc totalMs + let measures : (string * (TimeSpan -> float)) list = + [ "s", fun x -> x.TotalSeconds + "m", fun x -> x.TotalMinutes + "h", fun x -> x.TotalHours ] + let logPeriodicRate name count ru = log.Information("rp{name} {count:n0} = ~{ru:n0} RU", name, count, ru) + let duration = args.GetResult(DurationM,1.) |> TimeSpan.FromMinutes + for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d) + | _ -> () -let createStoreLog verbose verboseConsole maybeSeqEndpoint = - let c = LoggerConfiguration().Destructure.FSharpTypes() - let c = if verbose then c.MinimumLevel.Debug() else c - let c = c.WriteTo.Sink(RuCounterSink()) - let c = c.WriteTo.Console((if verboseConsole then LogEventLevel.Debug else LogEventLevel.Information), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) - let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) - c.CreateLogger() :> ILogger let createDomainLog verbose verboseConsole maybeSeqEndpoint = let c = LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext() let c = if verbose then c.MinimumLevel.Debug() else c - let c = c.WriteTo.Sink(RuCounterSink()) - let c = c.WriteTo.Console((if verboseConsole then LogEventLevel.Debug else LogEventLevel.Warning), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) + let c = c.WriteTo.Sink(Log.SerilogHelpers.RuCounterSink()) + let c = c.WriteTo.Console((if verboseConsole then LogEventLevel.Debug else LogEventLevel.Information), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) c.CreateLogger() -let createResultLog fileName = - LoggerConfiguration() - .Destructure.FSharpTypes() - .WriteTo.File(fileName) - .CreateLogger() [] let main argv = @@ -283,112 +192,25 @@ let main argv = let parser = ArgumentParser.Create(programName = programName) try let args = parser.ParseCommandLine argv - let verboseConsole = args.Contains(VerboseConsole) + let verboseConsole = args.Contains VerboseConsole let maybeSeq = if args.Contains LocalSeq then Some "http://localhost:5341" else None - let report = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName - let runTest (log: ILogger) conn (targs: ParseResults) = - let verbose = args.Contains(VerboseDomain) - let domainLog = createDomainLog verbose verboseConsole maybeSeq - let test, options, runTest = Test.createRunner conn domainLog verbose targs - - let errorCutoff = targs.GetResult(ErrorCutoff,10000L) - let testsPerSecond = targs.GetResult(TestsPerSecond,1000) - let duration = targs.GetResult(DurationM,30.) |> TimeSpan.FromMinutes - let reportingIntervals = - match targs.GetResults(ReportIntervalS) with - | [] -> TimeSpan.FromSeconds 10.|> Seq.singleton - | intervals -> seq { for i in intervals -> TimeSpan.FromSeconds(float i) } - |> fun intervals -> [| yield duration; yield! intervals |] - let clients = Array.init (testsPerSecond * 2) (fun _ -> Guid.NewGuid () |> ClientId) - - log.Information( "Running {test} {options:l} for {duration} @ {tps} hits/s across {clients} clients; Max errors: {errorCutOff}, reporting intervals: {ri}, report file: {report}", - test, options, duration, testsPerSecond, clients.Length, errorCutoff, reportingIntervals, report) - let results = Test.run log testsPerSecond (duration.Add(TimeSpan.FromSeconds 5.)) errorCutoff reportingIntervals clients runTest |> Async.RunSynchronously - let resultFile = createResultLog report - for r in results do - resultFile.Information("Aggregate: {aggregate}", r) - log.Information("Run completed; Current memory allocation: {bytes:n2} MiB", (GC.GetTotalMemory(true) |> float) / 1024./1024.) - 0 - + let verbose = args.Contains Verbose + let log = createDomainLog verbose verboseConsole maybeSeq match args.GetSubCommand() with - | Memory targs -> - let verboseStore = false - let log = createStoreLog verboseStore verboseConsole maybeSeq - log.Information( "Using In-memory Volatile Store") - // TODO implement backoffs - let conn = Store.Mem (Equinox.MemoryStore.VolatileStore()) - runTest log conn targs - | Es sargs -> - let verboseStore = sargs.Contains(EsArguments.VerboseStore) - // TODO implement backoffs - let log = createStoreLog verboseStore verboseConsole maybeSeq - let host = sargs.GetResult(Host,"localhost") - let creds = sargs.GetResult(Username,"admin"), sargs.GetResult(Password,"changeit") - let (timeout, retries) as operationThrottling = - sargs.GetResult(EsArguments.Timeout,5.) |> float |> TimeSpan.FromSeconds, - sargs.GetResult(EsArguments.Retries,1) - let heartbeatTimeout = sargs.GetResult(HeartbeatTimeout,1.5) |> float |> TimeSpan.FromSeconds - let concurrentOperationsLimit = sargs.GetResult(ConcurrentOperationsLimit,5000) - log.Information("Using EventStore targeting {host} with heartbeat: {heartbeat}, max concurrent requests: {concurrency}. " + - "Operation timeout: {timeout} with {retries} retries", - host, heartbeatTimeout, concurrentOperationsLimit, timeout, retries) - let conn = EventStore.connect log (host, heartbeatTimeout, concurrentOperationsLimit) creds operationThrottling |> Async.RunSynchronously - let store = Store.Es (EventStore.createGateway conn defaultBatchSize) - match sargs.TryGetSubCommand() with - | Some (EsArguments.Run targs) -> runTest log store targs - | _ -> failwith "run is required" - | Cosmos sargs -> - let verboseStore = sargs.Contains(VerboseStore) - let log = createStoreLog verboseStore verboseConsole maybeSeq - let read key = Environment.GetEnvironmentVariable key |> Option.ofObj - - let (Discovery.UriAndKey (connUri,_)) as discovery = - sargs.GetResult(Connection, defaultArg (read "EQUINOX_COSMOS_CONNECTION") "AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;") - |> Discovery.FromConnectionString - - let dbName = sargs.GetResult(Database, defaultArg (read "EQUINOX_COSMOS_DATABASE") "equinox-test") - let collName = sargs.GetResult(Collection, defaultArg (read "EQUINOX_COSMOS_COLLECTION") "equinox-test") - let timeout = sargs.GetResult(Timeout,5.) |> float |> TimeSpan.FromSeconds - let (retries, maxRetryWaitTime) as operationThrottling = sargs.GetResult(Retries, 1), sargs.GetResult(RetriesWaitTime, 5) - let pageSize = sargs.GetResult(PageSize,1) - log.Information("Using CosmosDb Connection {connection} Database: {database} Collection: {collection} maxEventsPerSlice: {pageSize}. " + - "Request timeout: {timeout} with {retries} retries; throttling MaxRetryWaitTime {maxRetryWaitTime}", - connUri, dbName, collName, pageSize, timeout, retries, maxRetryWaitTime) - let conn = Cosmos.connect log discovery timeout operationThrottling |> Async.RunSynchronously - match sargs.TryGetSubCommand() with - | Some (Provision args) -> - let rus = args.GetResult(Rus) + | Initialize iargs -> + let rus = iargs.GetResult(Rus) + match iargs.TryGetSubCommand() with + | Some (InitArguments.Cosmos sargs) -> + let storeLog = createStoreLog (sargs.Contains CosmosArguments.VerboseStore) verboseConsole maybeSeq + let dbName, collName, (_pageSize: int), conn = Cosmos.conn (log,storeLog) sargs log.Information("Configuring CosmosDb Collection with Throughput Provision: {rus:n0} RU/s", rus) Equinox.Cosmos.Store.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously - 0 - | Some (Run targs) -> - let conn = Store.Cosmos (Cosmos.createGateway conn (defaultBatchSize,pageSize), dbName, collName) - let res = runTest log conn targs - let stats = - [ "Read", RuCounterSink.Read - "Write", RuCounterSink.Write - "Resync", RuCounterSink.Resync ] - let mutable totalCount, totalRc, totalMs = 0L, 0., 0L - let logActivity name count rc lat = - log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms", - name, count, rc, (if count = 0L then Double.NaN else rc/float count), (if count = 0L then Double.NaN else float lat/float count)) - for name, stat in stats do - let ru = float stat.rux100 / 100. - totalCount <- totalCount + stat.count - totalRc <- totalRc + ru - totalMs <- totalMs + stat.ms - logActivity name stat.count ru stat.ms - logActivity "TOTAL" totalCount totalRc totalMs - let measures : (string * (TimeSpan -> float)) list = - [ "s", fun x -> x.TotalSeconds - "m", fun x -> x.TotalMinutes - "h", fun x -> x.TotalHours ] - let logPeriodicRate name count ru = log.Information("rp{name} {count:n0} = ~{ru:n0} RU", name, count, ru) - let duration = targs.GetResult(DurationM,1.) |> TimeSpan.FromMinutes - for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d) - res - | _ -> failwith "init or run is required" - | _ -> failwith "ERROR: please specify memory, es or cosmos Store" + | _ -> failwith "please specify a cosmos endpoint" + | Run rargs -> + let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName + LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs + | _ -> failwith "Please specify a valid subcommand :- init or run" + 0 with e -> printfn "%s" e.Message 1 \ No newline at end of file diff --git a/cli/Equinox.Cli/Tests.fs b/cli/Equinox.Cli/Tests.fs new file mode 100644 index 000000000..8943af59b --- /dev/null +++ b/cli/Equinox.Cli/Tests.fs @@ -0,0 +1,57 @@ +module Equinox.Cli.Tests + +open Domain +open Microsoft.Extensions.DependencyInjection +open System +open System.Net.Http + +type Test = Favorite | SaveForLater + +let executeLocal (container: ServiceProvider) test: ClientId -> Async = + match test with + | Favorite -> + let service = container.GetRequiredService() + fun clientId -> async { + let sku = Guid.NewGuid() |> SkuId + do! service.Favorite(clientId,[sku]) + let! items = service.List clientId + if items |> Array.exists (fun x -> x.skuId = sku) |> not then invalidOp "Added item not found" } + | SaveForLater -> + let service = container.GetRequiredService() + fun clientId -> async { + let skus = [Guid.NewGuid() |> SkuId; Guid.NewGuid() |> SkuId; Guid.NewGuid() |> SkuId] + let! saved = service.Save(clientId,skus) + if saved then + let! items = service.List clientId + if skus |> List.forall (fun sku -> items |> Array.exists (fun x -> x.skuId = sku)) |> not then invalidOp "Added item not found" + else + let! current = service.List clientId + let resolveSkus _hasSku = async { + return [|for x in current -> x.skuId|] } + return! service.Remove(clientId, resolveSkus) } + +let executeRemote (client: HttpClient) test = + match test with + | Favorite -> + fun clientId -> + let session = Clients.Session(client, clientId) + let client = session.Favorites + async { + let sku = Guid.NewGuid() |> SkuId + do! client.Favorite [|sku|] + let! items = client.List + if items |> Array.exists (fun x -> x.skuId = sku) |> not then invalidOp "Added item not found" } + | SaveForLater -> + fun clientId -> + let session = Clients.Session(client, clientId) + let client = session.Saves + async { + let skus = [| Guid.NewGuid() |> SkuId; Guid.NewGuid() |> SkuId; Guid.NewGuid() |> SkuId |] + let! saved = client.Save skus + if saved then + let! items = client.List + // NB this can happen if we overload the system and the next operation for this virtual client takes the other leg and removes it + if skus |> Array.forall (fun sku -> items |> Array.exists (fun x -> x.skuId = sku)) |> not then invalidOp "Added item not found" + else + let! current = client.List + return! client.Remove [|for x in current -> x.skuId|] } \ No newline at end of file diff --git a/global.json b/global.json index 455cd36d7..21e27cede 100644 --- a/global.json +++ b/global.json @@ -1,5 +1,5 @@ { "sdk": { - "version": "2.1.402" + "version": "2.1.500" } } \ No newline at end of file diff --git a/samples/Store/Backend/SavedForLater.fs b/samples/Store/Backend/SavedForLater.fs index 05ae3f6a0..9ffe247bf 100644 --- a/samples/Store/Backend/SavedForLater.fs +++ b/samples/Store/Backend/SavedForLater.fs @@ -5,11 +5,11 @@ open Domain.SavedForLater open Domain.SavedForLater.Commands open System -type Service(log, resolveStream, maxSavedItems : int, maxAttempts) = +type Service(handlerLog, resolveStream, maxSavedItems : int, maxAttempts) = do if maxSavedItems < 0 then invalidArg "maxSavedItems" "must be non-negative value." let (|Stream|) (id: ClientId) = let stream = resolveStream <| Equinox.CatId ("savedforlater", id.Value) - Handler(log, stream, maxSavedItems, maxAttempts) + Handler(handlerLog, stream, maxSavedItems, maxAttempts) member __.MaxSavedItems = maxSavedItems @@ -19,7 +19,7 @@ type Service(log, resolveStream, maxSavedItems : int, maxAttempts) = member __.Save(Stream stream, skus : seq) : Async = stream.Execute <| Add (DateTimeOffset.Now, Seq.toArray skus) - member __.Remove(Stream stream, resolve : (SkuId -> bool) -> Async) : Async = + member __.Remove(Stream stream, resolve : (SkuId -> bool) -> Async) : Async = let resolve hasSku = async { let! skus = resolve hasSku return Remove skus } diff --git a/samples/Store/Domain/Infrastructure.fs b/samples/Store/Domain/Infrastructure.fs index 9c5c8e02f..d4324e81d 100644 --- a/samples/Store/Domain/Infrastructure.fs +++ b/samples/Store/Domain/Infrastructure.fs @@ -94,6 +94,8 @@ and private CartIdJsonConverter() = /// ClientId strongly typed id [); AutoSerializable(false); StructuredFormatDisplay("{Value}")>] +// To support model binding using aspnetcore 2 FromHeader +[)>] // (Internally a string for most efficient copying semantics) type ClientId private (id : string) = inherit Comparable(id) @@ -110,4 +112,16 @@ and private ClientIdJsonConverter() = /// Renders as per Guid.ToString("N") override __.Pickle value = value.Value /// Input must be a Guid.Parseable value - override __.UnPickle input = ClientId.Parse input \ No newline at end of file + override __.UnPickle input = ClientId.Parse input +and private ClientIdStringConverter() = + inherit System.ComponentModel.TypeConverter() + override __.CanConvertFrom(context, sourceType) = + sourceType = typedefof || base.CanConvertFrom(context,sourceType) + override __.ConvertFrom(context, culture, value) = + match value with + | :? string as s -> s |> ClientId.Parse |> box + | _ -> base.ConvertFrom(context, culture, value) + override __.ConvertTo(context, culture, value, destinationType) = + match value with + | :? ClientId as value when destinationType = typedefof -> value.Value :> _ + | _ -> base.ConvertTo(context, culture, value, destinationType) \ No newline at end of file diff --git a/samples/Store/Domain/SavedForLater.fs b/samples/Store/Domain/SavedForLater.fs index b8ac35bb5..168e682ff 100644 --- a/samples/Store/Domain/SavedForLater.fs +++ b/samples/Store/Domain/SavedForLater.fs @@ -109,11 +109,12 @@ type Handler(log, stream, maxSavedItems, maxAttempts) = let decide (ctx : Equinox.Context<_,_>) command = ctx.Decide (Commands.decide maxSavedItems command) - member __.Remove (resolve : ((SkuId->bool) -> Async)) : Async = + member __.Remove (resolve : ((SkuId->bool) -> Async)) : Async = inner.DecideAsync <| fun ctx -> async { let contents = seq { for item in ctx.State -> item.skuId } |> set let! cmd = resolve contents.Contains return cmd |> decide ctx } + |> Async.Ignore member __.Execute command : Async = inner.Decide <| fun fctx -> diff --git a/samples/Store/Infrastructure/Config.fs b/samples/Store/Infrastructure/Config.fs new file mode 100644 index 000000000..968665953 --- /dev/null +++ b/samples/Store/Infrastructure/Config.fs @@ -0,0 +1,132 @@ +module Samples.Config + +open Argu +open Serilog +open System + +type [] MemArguments = + | [] VerboseStore + interface IArgParserTemplate with + member a.Usage = a |> function + | VerboseStore -> "Include low level Store logging." +and [] EsArguments = + | [] VerboseStore + | [] Timeout of float + | [] Retries of int + | [] Host of string + | [] Username of string + | [] Password of string + | [] ConcurrentOperationsLimit of int + | [] HeartbeatTimeout of float + interface IArgParserTemplate with + member a.Usage = a |> function + | VerboseStore -> "Include low level Store logging." + | Timeout _ -> "specify operation timeout in seconds (default: 5)." + | Retries _ -> "specify operation retries (default: 1)." + | Host _ -> "specify a DNS query, using Gossip-driven discovery against all A records returned (default: localhost)." + | Username _ -> "specify a username (default: admin)." + | Password _ -> "specify a Password (default: changeit)." + | ConcurrentOperationsLimit _ -> "max concurrent operations in flight (default: 5000)." + | HeartbeatTimeout _ -> "specify heartbeat timeout in seconds (default: 1.5)." +and [] CosmosArguments = + | [] VerboseStore + | [] ConnectionMode of Equinox.Cosmos.ConnectionMode + | [] Timeout of float + | [] Retries of int + | [] Connection of string + | [] Database of string + | [] Collection of string + | [] RetriesWaitTime of int + | [] PageSize of int + interface IArgParserTemplate with + member a.Usage = a |> function + | VerboseStore -> "Include low level Store logging." + | ConnectionMode _ -> "Override the connection mode (default: DirectTcp)." + | Timeout _ -> "specify operation timeout in seconds (default: 5)." + | Retries _ -> "specify operation retries (default: 1)." + | Connection _ -> "specify a connection string for a Cosmos account (defaults: envvar:EQUINOX_COSMOS_CONNECTION, Cosmos Emulator)." + | Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)." + | Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" + | PageSize _ -> "Specify maximum number of events to record on a page before switching to a new one (default: 1)" + +let defaultBatchSize = 500 + +[] +type StorageConfig = + | Memory of Equinox.MemoryStore.VolatileStore + | Es of Equinox.EventStore.GesGateway * Equinox.EventStore.CachingStrategy option * unfolds: bool + | Cosmos of Equinox.Cosmos.EqxGateway * Equinox.Cosmos.CachingStrategy option * unfolds: bool * databaseId: string * collectionId: string + +module MemoryStore = + let config () = + StorageConfig.Memory (Equinox.MemoryStore.VolatileStore()) + +module EventStore = + open Equinox.EventStore + + /// To establish a local node to run the tests against: + /// 1. cinst eventstore-oss -y # where cinst is an invocation of the Chocolatey Package Installer on Windows + /// 2. & $env:ProgramData\chocolatey\bin\EventStore.ClusterNode.exe --gossip-on-single-node --discover-via-dns 0 --ext-http-port=30778 + let private connect (log: ILogger) (dnsQuery, heartbeatTimeout, col) (username, password) (operationTimeout, operationRetries) = + GesConnector(username, password, reqTimeout=operationTimeout, reqRetries=operationRetries, + heartbeatTimeout=heartbeatTimeout, concurrentOperationsLimit = col, + log=(if log.IsEnabled(Serilog.Events.LogEventLevel.Debug) then Logger.SerilogVerbose log else Logger.SerilogNormal log), + tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string]) + .Establish("equinox-cli", Discovery.GossipDns dnsQuery, ConnectionStrategy.ClusterTwinPreferSlaveReads) + let private createGateway connection batchSize = GesGateway(connection, GesBatchingPolicy(maxBatchSize = batchSize)) + let config (log: ILogger, storeLog) (cache, unfolds) (sargs : ParseResults) = + let host = sargs.GetResult(Host,"localhost") + let creds = sargs.GetResult(Username,"admin"), sargs.GetResult(Password,"changeit") + let (timeout, retries) as operationThrottling = + sargs.GetResult(EsArguments.Timeout,5.) |> float |> TimeSpan.FromSeconds, + sargs.GetResult(EsArguments.Retries,1) + let heartbeatTimeout = sargs.GetResult(HeartbeatTimeout,1.5) |> float |> TimeSpan.FromSeconds + let concurrentOperationsLimit = sargs.GetResult(ConcurrentOperationsLimit,5000) + log.Information("Using EventStore targeting {host} with heartbeat: {heartbeat}, max concurrent requests: {concurrency}. " + + "Operation timeout: {timeout} with {retries} retries", + host, heartbeatTimeout, concurrentOperationsLimit, timeout, retries) + let conn = connect storeLog (host, heartbeatTimeout, concurrentOperationsLimit) creds operationThrottling |> Async.RunSynchronously + let cacheStrategy = + if cache then + let c = Caching.Cache("Cli", sizeMb = 50) + CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some + else None + StorageConfig.Es ((createGateway conn defaultBatchSize), cacheStrategy, unfolds) + +module Cosmos = + open Equinox.Cosmos + + /// Standing up an Equinox instance is necessary to run for test purposes; You'll need to either: + /// 1) replace connection below with a connection string or Uri+Key for an initialized Equinox instance with a database and collection named "equinox-test" + /// 2) Set the 3x environment variables and create a local Equinox using cli/Equinox.cli/bin/Release/net461/Equinox.Cli ` + /// cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION provision -ru 1000 + let private connect (log: ILogger) mode discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) = + EqxConnector(log=log, mode=mode, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime) + .Connect("equinox-cli", discovery) + let private createGateway connection (maxItems,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems, maxEventsPerSlice=maxEvents)) + let conn (log: ILogger, storeLog) (sargs : ParseResults) = + let read key = Environment.GetEnvironmentVariable key |> Option.ofObj + + let (Discovery.UriAndKey (connUri,_)) as discovery = + sargs.GetResult(Connection, defaultArg (read "EQUINOX_COSMOS_CONNECTION") "AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;") + |> Discovery.FromConnectionString + + let dbName = sargs.GetResult(Database, defaultArg (read "EQUINOX_COSMOS_DATABASE") "equinox-test") + let collName = sargs.GetResult(Collection, defaultArg (read "EQUINOX_COSMOS_COLLECTION") "equinox-test") + let timeout = sargs.GetResult(Timeout,5.) |> float |> TimeSpan.FromSeconds + let mode = sargs.GetResult(ConnectionMode,ConnectionMode.DirectTcp) + let (retries, maxRetryWaitTime) as operationThrottling = sargs.GetResult(Retries, 1), sargs.GetResult(RetriesWaitTime, 5) + let pageSize = sargs.GetResult(PageSize,1) + log.Information("Using CosmosDb {mode} Connection {connection} Database: {database} Collection: {collection} maxEventsPerSlice: {pageSize}. " + + "Request timeout: {timeout} with {retries} retries; throttling MaxRetryWaitTime {maxRetryWaitTime}", + mode, connUri, dbName, collName, pageSize, timeout, retries, maxRetryWaitTime) + dbName, collName, pageSize, connect storeLog mode discovery timeout operationThrottling |> Async.RunSynchronously + let config (log: ILogger, storeLog) (cache, unfolds) (sargs : ParseResults) = + let dbName, collName, pageSize, conn = conn (log, storeLog) sargs + let cacheStrategy = + if cache then + let c = Caching.Cache("Cli", sizeMb = 50) + CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some + else None + StorageConfig.Cosmos (createGateway conn (defaultBatchSize,pageSize), cacheStrategy, unfolds, dbName, collName) \ No newline at end of file diff --git a/samples/Store/Infrastructure/Infrastructure.fsproj b/samples/Store/Infrastructure/Infrastructure.fsproj new file mode 100644 index 000000000..1f7b02014 --- /dev/null +++ b/samples/Store/Infrastructure/Infrastructure.fsproj @@ -0,0 +1,41 @@ + + + + netstandard2.0;net461 + 5 + false + true + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/samples/Store/Infrastructure/Log.fs b/samples/Store/Infrastructure/Log.fs new file mode 100644 index 000000000..013d27154 --- /dev/null +++ b/samples/Store/Infrastructure/Log.fs @@ -0,0 +1,43 @@ +module Samples.Log + +open Serilog.Events + +[] +module SerilogHelpers = + open Equinox.Cosmos.Store + let inline (|Stats|) ({ interval = i; ru = ru }: Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds + + let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|) = function + | Log.Tip (Stats s) + | Log.TipNotFound (Stats s) + | Log.TipNotModified (Stats s) + | Log.Query (_,_, (Stats s)) -> CosmosReadRc s + // slices are rolled up into batches so be sure not to double-count + | Log.Response (_,(Stats s)) -> CosmosResponseRc s + | Log.SyncSuccess (Stats s) + | Log.SyncConflict (Stats s) -> CosmosWriteRc s + | Log.SyncResync (Stats s) -> CosmosResyncRc s + let (|SerilogScalar|_|) : LogEventPropertyValue -> obj option = function + | (:? ScalarValue as x) -> Some x.Value + | _ -> None + let (|CosmosMetric|_|) (logEvent : LogEvent) : Log.Event option = + match logEvent.Properties.TryGetValue("cosmosEvt") with + | true, SerilogScalar (:? Log.Event as e) -> Some e + | _ -> None + type RuCounter = + { mutable rux100: int64; mutable count: int64; mutable ms: int64 } + static member Create() = { rux100 = 0L; count = 0L; ms = 0L } + member __.Ingest (ru, ms) = + System.Threading.Interlocked.Increment(&__.count) |> ignore + System.Threading.Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore + System.Threading.Interlocked.Add(&__.ms, ms) |> ignore + type RuCounterSink() = + static member val Read = RuCounter.Create() + static member val Write = RuCounter.Create() + static member val Resync = RuCounter.Create() + interface Serilog.Core.ILogEventSink with + member __.Emit logEvent = logEvent |> function + | CosmosMetric (CosmosReadRc stats) -> RuCounterSink.Read.Ingest stats + | CosmosMetric (CosmosWriteRc stats) -> RuCounterSink.Write.Ingest stats + | CosmosMetric (CosmosResyncRc stats) -> RuCounterSink.Resync.Ingest stats + | _ -> () \ No newline at end of file diff --git a/samples/Store/Infrastructure/Services.fs b/samples/Store/Infrastructure/Services.fs new file mode 100644 index 000000000..b51a75d51 --- /dev/null +++ b/samples/Store/Infrastructure/Services.fs @@ -0,0 +1,24 @@ +module Samples.Services + +open Microsoft.Extensions.DependencyInjection +open Serilog +open System + +let registerServices (services : IServiceCollection, storageConfig, handlerLog) = + let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore + + regF <| fun _sp -> storageConfig : Config.StorageConfig + + regF <| fun sp -> Streams.Resolver(sp.GetService()) + + let mkFavorites (resolver: Streams.Resolver) = + let fold, initial, snapshot = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.snapshot + let codec = Streams.genCodec() + Backend.Favorites.Service(handlerLog, resolver.Resolve(codec,fold,initial,snapshot)) + regF <| fun sp -> mkFavorites (sp.GetService()) + + let mkSaves (resolver: Streams.Resolver) = + let fold, initial, snapshot = Domain.SavedForLater.Folds.fold, Domain.SavedForLater.Folds.initial, Domain.SavedForLater.Folds.snapshot + let codec = Streams.genCodec() + Backend.SavedForLater.Service(handlerLog, resolver.Resolve(codec,fold,initial,snapshot), maxSavedItems=50, maxAttempts=3) + regF <| fun sp -> mkSaves (sp.GetService()) \ No newline at end of file diff --git a/samples/Store/Infrastructure/Streams.fs b/samples/Store/Infrastructure/Streams.fs new file mode 100644 index 000000000..fde8a6b77 --- /dev/null +++ b/samples/Store/Infrastructure/Streams.fs @@ -0,0 +1,29 @@ +/// TL;DR There is no secret plan to make a package out of this; +/// it's infrastructure that makes sense solely in the context of this test rig +/// ...depending on 3 stores is not a goal or achievement + +/// Yes, they live together here - that's because it's infrastructure that makes sense in the context of this test rig +/// It also proves the point that the Domain and backend are not bound to the Store, which is important too +/// IRL, it's unlikely you'd want to do storage switching in this manner + +module Samples.Streams + +let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect() +let inline genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings) + +type Resolver(storage) = + member __.Resolve + ( codec : Equinox.UnionCodec.IUnionEncoder<'event,byte[]>, + fold: ('state -> 'event seq -> 'state), + initial: 'state, + snapshot: (('event -> bool) * ('state -> 'event))) = + match storage with + | Config.StorageConfig.Memory store -> + Equinox.MemoryStore.MemResolver(store, fold, initial).Resolve + | Config.StorageConfig.Es (gateway, cache, unfolds) -> + let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None + Equinox.EventStore.GesResolver<'event,'state>(gateway, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve + | Config.StorageConfig.Cosmos (gateway, cache, unfolds, databaseId, connectionId) -> + let store = Equinox.Cosmos.EqxStore(gateway, Equinox.Cosmos.EqxCollections(databaseId, connectionId)) + let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot |> Some else None + Equinox.Cosmos.EqxResolver<'event,'state>(store, codec, fold, initial, ?access = accessStrategy, ?caching = cache).Resolve \ No newline at end of file diff --git a/samples/Store/Web/Controllers/FavoritesController.fs b/samples/Store/Web/Controllers/FavoritesController.fs index 38cb8c162..b5cabc977 100644 --- a/samples/Store/Web/Controllers/FavoritesController.fs +++ b/samples/Store/Web/Controllers/FavoritesController.fs @@ -3,23 +3,28 @@ open Domain open Microsoft.AspNetCore.Mvc -[] +[] [] type FavoritesController(service : Backend.Favorites.Service) = inherit ControllerBase() - [] - member __.Get(clientId : ClientId) = async { + [] + member __.Get + ( []clientId : ClientId) = async { let! res = service.List(clientId) return ActionResult<_> res } - [] - member __.Favorite(clientId : ClientId, []skuIds : SkuId[]) = async { + [] + member __.Favorite + ( []clientId : ClientId, + []skuIds : SkuId[]) = async { return! service.Favorite(clientId,List.ofArray skuIds) } - [] - member __.Unfavorite(clientId : ClientId, skuId : SkuId) = async { + [] + member __.Unfavorite + ( []clientId, + skuId : SkuId) = async { return! service.Unfavorite(clientId, skuId) } \ No newline at end of file diff --git a/samples/Store/Web/Controllers/SavesController.fs b/samples/Store/Web/Controllers/SavesController.fs index 17f5e0a36..c9595fd75 100644 --- a/samples/Store/Web/Controllers/SavesController.fs +++ b/samples/Store/Web/Controllers/SavesController.fs @@ -3,24 +3,33 @@ open Domain open Microsoft.AspNetCore.Mvc +type FromClientIdHeaderAttribute() = inherit FromHeaderAttribute(Name="COMPLETELY_INSECURE_CLIENT_ID") + [] [] type SavesController(service : Backend.SavedForLater.Service) = inherit ControllerBase() - [] - member __.Get(clientId : ClientId) = async { + [] + member __.Get + ( []clientId : ClientId) = async { let! res = service.List(clientId) return ActionResult<_> res } - [] - member __.Save(clientId : ClientId, []skuIds : SkuId[]) = async { - return! service.Save(clientId, List.ofArray skuIds) + // Returns 400 if item limit exceeded + [] + member __.Save + ( []clientId : ClientId, + []skuIds : SkuId[]) : Async = async { + let! ok = service.Save(clientId, List.ofArray skuIds) + if ok then return __.NoContent() :> _ else return __.BadRequest("Exceeded maximum number of items in Saved list; please validate before requesting Save.") :> _ } - [] - member __.Remove(clientId : ClientId, []skuIds : SkuId[]) = async { + [] + member __.Remove + ( []clientId : ClientId, + []skuIds : SkuId[]) : Async = async { let resolveSkus _hasSavedSku = async { return skuIds } return! service.Remove(clientId, resolveSkus) - } + } \ No newline at end of file diff --git a/samples/Store/Web/Program.fs b/samples/Store/Web/Program.fs index d90dfd30b..f38795d1e 100644 --- a/samples/Store/Web/Program.fs +++ b/samples/Store/Web/Program.fs @@ -1,24 +1,41 @@ namespace Web +open Argu open Microsoft.AspNetCore open Microsoft.AspNetCore.Hosting +open Microsoft.Extensions.DependencyInjection open Serilog module Program = - let exitCode = 0 - - let createWebHostBuilder args : IWebHostBuilder = + let createWebHostBuilder (args,parsed) : IWebHostBuilder = WebHost .CreateDefaultBuilder(args) - .UseStartup() - .UseSerilog(fun hostingContext (loggerConfiguration : LoggerConfiguration) -> - loggerConfiguration - .ReadFrom.Configuration(hostingContext.Configuration) - .Enrich.FromLogContext() - .WriteTo.Console() |> ignore) + .ConfigureServices(fun services -> Startup.ConfigureServices(services, parsed)) + .Configure(fun app -> Startup.Configure(app, app.ApplicationServices.GetService())) + .UseSerilog() [] - let main args = - createWebHostBuilder(args).Build().Run() - - exitCode + let main argv = + try + let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name + printfn "Running at pid %d" (System.Diagnostics.Process.GetCurrentProcess().Id) + let args = ArgumentParser.Create(programName = programName).ParseCommandLine(argv) + // Replace logger chain with https://github.com/serilog/serilog-aspnetcore + let c = + LoggerConfiguration() + .MinimumLevel.Debug() + .MinimumLevel.Override("Microsoft", Serilog.Events.LogEventLevel.Warning) + .Enrich.FromLogContext() + .WriteTo.Console() + // TOCONSIDER log and reset every minute or something ? + .WriteTo.Sink(Samples.Log.SerilogHelpers.RuCounterSink()) + let c = + let maybeSeq = if args.Contains LocalSeq then Some "http://localhost:5341" else None + match maybeSeq with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) + let log : ILogger = c.CreateLogger() :> _ + Log.Logger <- log + createWebHostBuilder(argv, args).Build().Run() + 0 + with e -> + eprintfn "%s" e.Message + 1 \ No newline at end of file diff --git a/samples/Store/Web/Startup.fs b/samples/Store/Web/Startup.fs index c10b9de5c..12cd51b9c 100644 --- a/samples/Store/Web/Startup.fs +++ b/samples/Store/Web/Startup.fs @@ -1,35 +1,72 @@ namespace Web +open Argu open Microsoft.AspNetCore.Builder open Microsoft.AspNetCore.Hosting open Microsoft.AspNetCore.Mvc -open Microsoft.Extensions.Configuration open Microsoft.Extensions.DependencyInjection +open Samples +open Samples.Config open Serilog -open System +open Serilog.Events -type Store() = - member __.GetStream() = failwith "TODO" +[] +type Arguments = + | [] VerboseConsole + | [] LocalSeq + | [] Cached + | [] Unfolds + | [] Memory of ParseResults + | [] Es of ParseResults + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | VerboseConsole -> "Include low level Domain and Store logging in screen output." + | LocalSeq -> "Configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net" + | Memory _ -> "specify In-Memory Volatile Store (Default store)" + | Es _ -> "specify EventStore actions" + | Cosmos _ -> "specify CosmosDb actions" + | Cached -> "Employ a 50MB cache" + | Unfolds -> "Employ a store-appropriate Rolling Snapshots and/or Unfolding strategy" -type Startup(configuration: IConfiguration) = - let configureMvc (s : IServiceCollection) = - s.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1) |> ignore - let configureApp (services : IServiceCollection) = - let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore +type App = class end - services.AddSingleton() |> ignore +// :shame: This should be a class used via UseStartup, but I couldnt figure out how to pass the parsed args in as MS have changed stuff around too much to make it googleable within my boredom threshold +type Startup() = + // This method gets called by the runtime. Use this method to add services to the container. + static member ConfigureServices(services: IServiceCollection, args: ParseResults) : unit = + services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1) |> ignore - regF <| fun sp -> Backend.Favorites.Service(sp.GetService(), sp.GetService().GetStream()) + let verboseConsole = args.Contains VerboseConsole + let maybeSeq = if args.Contains LocalSeq then Some "http://localhost:5341" else None + let createStoreLog verboseStore = + let c = LoggerConfiguration().Destructure.FSharpTypes() + let c = if verboseStore then c.MinimumLevel.Debug() else c + let c = c.WriteTo.Console((if verboseStore && verboseConsole then LogEventLevel.Debug else LogEventLevel.Warning), theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) + let c = match maybeSeq with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint) + c.CreateLogger() :> ILogger - //member val Configuration : IConfiguration = null with get, set + let storeConfig, storeLog : StorageConfig * ILogger = + let options = args.GetResults Cached @ args.GetResults Unfolds + let cache, unfolds = options |> List.contains Cached, options |> List.contains Unfolds + let log = Log.ForContext() - // This method gets called by the runtime. Use this method to add services to the container. - member __.ConfigureServices(services: IServiceCollection) = - configureMvc services - configureApp services + match args.TryGetSubCommand() with + | Some (Es sargs) -> + let storeLog = createStoreLog <| sargs.Contains EsArguments.VerboseStore + log.Information("EventStore Storage options: {options:l}", options) + EventStore.config (log,storeLog) (cache, unfolds) sargs, storeLog + | Some (Cosmos sargs) -> + let storeLog = createStoreLog <| sargs.Contains CosmosArguments.VerboseStore + log.Information("CosmosDb Storage options: {options:l}", options) + Cosmos.config (log,storeLog) (cache, unfolds) sargs, storeLog + | _ | Some (Memory _) -> + log.Fatal("Web App is using Volatile Store; Storage options: {options:l}", options) + MemoryStore.config (), log + Services.registerServices(services, storeConfig, storeLog) // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. - member __.Configure(app: IApplicationBuilder, env: IHostingEnvironment) = + static member Configure(app: IApplicationBuilder, env: IHostingEnvironment) : unit = if env.IsDevelopment() then app.UseDeveloperExceptionPage() |> ignore else app.UseHsts() |> ignore diff --git a/samples/Store/Web/Web.fsproj b/samples/Store/Web/Web.fsproj index 2e8268544..6a2ccbee4 100644 --- a/samples/Store/Web/Web.fsproj +++ b/samples/Store/Web/Web.fsproj @@ -12,18 +12,19 @@ + + - - - + + diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 7819f2b3e..b88073de4 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -953,6 +953,7 @@ type EqxConnector /// Additional strings identifying the context of this connection; should provide enough context to disambiguate all potential connections to a cluster /// NB as this will enter server and client logs, it should not contain sensitive information ?tags : (string*string) seq) = + do if log = null then nullArg "log" let connPolicy = let cp = Client.ConnectionPolicy.Default @@ -1010,7 +1011,7 @@ type EqxContext /// Database + Collection selector collections: EqxCollections, /// Logger to write to - see https://github.com/serilog/serilog/wiki/Provided-Sinks for how to wire to your logger - logger : Serilog.ILogger, + log : Serilog.ILogger, /// Optional maximum number of Store.Batch records to retrieve as a set (how many Events are placed therein is controlled by maxEventsPerSlice). /// Defaults to 10 ?defaultMaxItems, @@ -1019,6 +1020,7 @@ type EqxContext /// Threshold defining the number of events a slice is allowed to hold before switching to a new Batch is triggered. /// Defaults to 1 ?maxEventsPerSlice) = + do if log = null then nullArg "log" let getDefaultMaxItems = match getDefaultMaxItems with Some f -> f | None -> fun () -> defaultArg defaultMaxItems 10 let maxEventsPerSlice = defaultArg maxEventsPerSlice 1 let batching = EqxBatchingPolicy(getDefaultMaxItems=getDefaultMaxItems, maxEventsPerSlice=maxEventsPerSlice) @@ -1041,7 +1043,7 @@ type EqxContext let direction = defaultArg direction Direction.Forward let batchSize = defaultArg batchSize batching.MaxItems * maxEventsPerSlice let batching = EqxBatchingPolicy(if batchSize < maxEventsPerSlice then 1 else batchSize/maxEventsPerSlice) - gateway.ReadLazy batching logger stream direction startPos (Some,fun _ -> false) + gateway.ReadLazy batching log stream direction startPos (Some,fun _ -> false) member internal __.GetInternal((stream, startPos), ?maxCount, ?direction) = async { let direction = defaultArg direction Direction.Forward @@ -1053,13 +1055,13 @@ type EqxContext match maxCount with | Some limit -> maxCountPredicate limit | None -> fun _ -> false - return! gateway.Read logger stream direction startPos (Some,isOrigin) } + return! gateway.Read log stream direction startPos (Some,isOrigin) } /// Establishes the current position of the stream in as effficient a manner as possible /// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency state checks) member __.Sync(stream, ?position: Position) : Async = async { //let indexed predicate = load fold initial (coll.Gateway.IndexedOrBatched log predicate (stream,None)) - let! (Token.Unpack (_,pos')) = gateway.GetPosition(logger, stream, ?pos=position) + let! (Token.Unpack (_,pos')) = gateway.GetPosition(log, stream, ?pos=position) return pos' } /// Reads in batches of `batchSize` from the specified `Position`, allowing the reader to efficiently walk away from a running query @@ -1075,7 +1077,7 @@ type EqxContext /// Callers should implement appropriate idempotent handling, or use Equinox.Handler for that purpose member __.Sync(stream, position, events: IEvent[]) : Async> = async { let batch = Sync.mkBatch stream events Seq.empty - let! res = gateway.Sync logger stream (Some position.index,batch) + let! res = gateway.Sync log stream (Some position.index,batch) match res with | InternalSyncResult.Written (Token.Unpack (_,pos)) -> return AppendResult.Ok pos | InternalSyncResult.Conflict (Token.Unpack (_,pos),events) -> return AppendResult.Conflict (pos, events) diff --git a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs index 24e207fb9..6ced20f0a 100644 --- a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs +++ b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs @@ -20,7 +20,7 @@ type VerbatimUtf8Tests() = [] let ``encodes correctly`` () = - let encoded = mkUnionEncoder().Encode(A { embed = "\"" }) + let encoded = unionEncoder.Encode(A { embed = "\"" }) let e : Store.Batch = { p = "streamName"; id = string 0; i = -1L; n = -1L; _etag = null e = [| { t = DateTimeOffset.MinValue; c = encoded.caseName; d = encoded.payload; m = null } |] }