From c476d6a94a39d12be320eabd6ecb54a3ad718ecd Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 14 Nov 2018 07:05:46 +0000 Subject: [PATCH] Compress snapshots in Index --- .../Infrastructure/Infrastructure.fs | 6 +++ cli/Equinox.Cli/Program.fs | 11 ++-- src/Equinox.Cosmos/Cosmos.fs | 44 ++++++++++++++-- .../VerbatimUtf8JsonConverterTests.fs | 52 ++++++++++++++----- 4 files changed, 92 insertions(+), 21 deletions(-) diff --git a/cli/Equinox.Cli/Infrastructure/Infrastructure.fs b/cli/Equinox.Cli/Infrastructure/Infrastructure.fs index 988d4ce8f..7fd7de5a5 100644 --- a/cli/Equinox.Cli/Infrastructure/Infrastructure.fs +++ b/cli/Equinox.Cli/Infrastructure/Infrastructure.fs @@ -24,6 +24,12 @@ type internal SuccessException<'T>(value : 'T) = inherit Exception() member self.Value = value +type Exception with + // https://github.com/fsharp/fslang-suggestions/issues/660 + member this.Reraise () = + (System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture this).Throw () + Unchecked.defaultof<_> + type Async with #if NET461 static member Choice<'T>(workflows : seq>) : Async<'T option> = async { diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index a5795e9c4..7994e271f 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -246,8 +246,11 @@ let main argv = "Duration for {duration} with test freq {tps} hits/s; max errors: {errorCutOff}, reporting intervals: {ri}, report file: {report}", test, targs.Contains Cached, targs.Contains Indexed, duration, testsPerSecond, errorCutoff, reportingIntervals, report) let runSingleTest clientId = - if verbose then domainLog.Information("Using session {sessionId}", ([|clientId|] : obj [])) - Test.runFavoriteTest service clientId + if not verbose then Test.runFavoriteTest service clientId + else async { + domainLog.Information("Using session {sessionId}", ([|clientId|] : obj [])) + try return! Test.runFavoriteTest service clientId + with e -> domainLog.Warning(e, "Threw"); e.Reraise () } let results = Test.run log testsPerSecond (duration.Add(TimeSpan.FromSeconds 5.)) errorCutoff reportingIntervals clients runSingleTest |> Async.RunSynchronously let resultFile = createResultLog report for r in results do @@ -301,7 +304,7 @@ let main argv = match sargs.TryGetSubCommand() with | Some (Provision args) -> let rus = args.GetResult(Rus) - log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus}", rus) + log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus:n0}", rus) Equinox.Cosmos.Initialization.initialize conn.Client dbName collName rus |> Async.RunSynchronously 0 | Some (Run targs) -> @@ -309,7 +312,7 @@ let main argv = let res = runTest log conn targs let read, write = RuCounterSink.Read, RuCounterSink.Write let total = read+write - log.Information("Total RUs consumed: {totalRus} (R:{readRus}, W:{writeRus})", total, read, write) + log.Information("Total Request Charges sustained in test: {totalRus:n0} (R:{readRus:n0}, W:{writeRus:n0})", total, read, write) res | _ -> failwith "init or run is required" | _ -> failwith "ERROR: please specify memory, es or cosmos Store" diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 1d67f753a..8e7700306 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -10,9 +10,8 @@ open Newtonsoft.Json.Linq open Serilog open System - [] -module DocDbExtensions = +module private DocDbExtensions = /// Extracts the innermost exception from a nested hierarchy of Aggregate Exceptions let (|AggregateException|) (exn : exn) = let rec aux (e : exn) = @@ -53,6 +52,8 @@ module DocDbExtensions = | DocDbException (DocDbStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, NotModified } module Store = + open System.IO + open System.IO.Compression [] type Position = { collectionUri: Uri; streamName: string; index: int64 option; etag: string option } @@ -165,21 +166,54 @@ module Store = static member Create (pos: Position) eventCount (eds: EventData[]) : IndexEvent = { p = pos.streamName; id = IndexEvent.IdConstant; m = pos.IndexRel eventCount; _etag = null c = [| for ed in eds -> { t = ed.eventType; d = ed.data; m = ed.metadata } |] } - and IndexProjection = + and [] IndexProjection = { /// The Event Type, used to drive deserialization t: string // required /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb - [)>] + [)>] d: byte[] // required /// Optional metadata (null, or same as d, not written if missing) - [); JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>] + [); JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>] m: byte[] } // optional interface IEventData with member __.EventType = __.t member __.DataUtf8 = __.d member __.MetaUtf8 = __.m + + /// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc + /// Only applied to snapshots in the Index + and Base64ZipUtf8JsonConverter() = + inherit JsonConverter() + let pickle (input : byte[]) : string = + if input = null then null else + + use output = new MemoryStream() + use compressor = new DeflateStream(output, CompressionLevel.Optimal) + compressor.Write(input,0,input.Length) + compressor.Close() + Convert.ToBase64String(output.ToArray()) + let unpickle str : byte[] = + if str = null then null else + + let compressedBytes = Convert.FromBase64String str + use input = new MemoryStream(compressedBytes) + use decompressor = new DeflateStream(input, CompressionMode.Decompress) + use output = new MemoryStream() + decompressor.CopyTo(output) + decompressor.Close() + output.ToArray() + + override __.CanConvert(objectType) = + typeof.Equals(objectType) + override __.ReadJson(reader, _, _, serializer) = + //( if reader.TokenType = JsonToken.Null then null else + serializer.Deserialize(reader, typedefof) :?> string |> unpickle |> box + override __.WriteJson(writer, value, serializer) = + let pickled = value |> unbox |> pickle + serializer.Serialize(writer, pickled) + (* Pseudocode: function sync(p, expectedVersion, windowSize, events) { if (i == 0) then { diff --git a/tests/Equinox.Cosmos.Integration/VerbatimUtf8JsonConverterTests.fs b/tests/Equinox.Cosmos.Integration/VerbatimUtf8JsonConverterTests.fs index 76122ed69..903c9f1c0 100644 --- a/tests/Equinox.Cosmos.Integration/VerbatimUtf8JsonConverterTests.fs +++ b/tests/Equinox.Cosmos.Integration/VerbatimUtf8JsonConverterTests.fs @@ -1,33 +1,61 @@ module Equinox.Cosmos.Integration.VerbatimUtf8JsonConverterTests open Equinox.Cosmos +open FsCheck.Xunit open Newtonsoft.Json open Swensen.Unquote open System open Xunit -let inline serialize (x:'t) = - let serializer = new JsonSerializer() - use sw = new System.IO.StringWriter() - use w = new JsonTextWriter(sw) - serializer.Serialize(w,x) - sw.ToString() - type Embedded = { embed : string } type Union = | A of Embedded | B of Embedded interface TypeShape.UnionContract.IUnionContract +let mkUnionEncoder () = Equinox.UnionCodec.JsonUtf8.Create(JsonSerializerSettings()) + [] -let ``VerbatimUtf8JsonConverter serializes properly`` () = - let unionEncoder = Equinox.UnionCodec.JsonUtf8.Create<_>(JsonSerializerSettings()) - let encoded = unionEncoder.Encode(A { embed = "\"" }) +let ``VerbatimUtf8JsonConverter encodes correctly`` () = + let encoded = mkUnionEncoder().Encode(A { embed = "\"" }) let e : Store.Event = { p = "streamName"; id = string 0; i = 0L c = DateTimeOffset.MinValue t = encoded.caseName d = encoded.payload m = null } - let res = serialize e - test <@ res.Contains """"d":{"embed":"\""}""" @> \ No newline at end of file + let res = JsonConvert.SerializeObject(e) + test <@ res.Contains """"d":{"embed":"\""}""" @> + +type Base64ZipUtf8JsonConverterTests() = + let unionEncoder = mkUnionEncoder () + + [] + let ``serializes, achieving compression`` () = + let encoded = unionEncoder.Encode(A { embed = String('x',5000) }) + let e : Store.IndexProjection = + { t = encoded.caseName + d = encoded.payload + m = null } + let res = JsonConvert.SerializeObject e + test <@ res.Contains("\"d\":\"") && res.Length < 100 @> + + [] + let roundtrips value = + let hasNulls = + match value with + | A x | B x when obj.ReferenceEquals(null, x) -> true + | A { embed = x } | B { embed = x } -> obj.ReferenceEquals(null, x) + if hasNulls then () else + + let encoded = unionEncoder.Encode value + let e : Store.IndexProjection = + { t = encoded.caseName + d = encoded.payload + m = null } + let ser = JsonConvert.SerializeObject(e) + test <@ ser.Contains("\"d\":\"") @> + let des = JsonConvert.DeserializeObject(ser) + let d : Equinox.UnionCodec.EncodedUnion<_> = { caseName = des.t; payload=des.d } + let decoded = unionEncoder.Decode d + test <@ value = decoded @> \ No newline at end of file