Skip to content

Commit

Permalink
Compress snapshots in Index
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 14, 2018
1 parent 194c518 commit c476d6a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 21 deletions.
6 changes: 6 additions & 0 deletions cli/Equinox.Cli/Infrastructure/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ type internal SuccessException<'T>(value : 'T) =
inherit Exception()
member self.Value = value

type Exception with
// https://github.com/fsharp/fslang-suggestions/issues/660
member this.Reraise () =
(System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture this).Throw ()
Unchecked.defaultof<_>

type Async with
#if NET461
static member Choice<'T>(workflows : seq<Async<'T option>>) : Async<'T option> = async {
Expand Down
11 changes: 7 additions & 4 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,11 @@ let main argv =
"Duration for {duration} with test freq {tps} hits/s; max errors: {errorCutOff}, reporting intervals: {ri}, report file: {report}",
test, targs.Contains Cached, targs.Contains Indexed, duration, testsPerSecond, errorCutoff, reportingIntervals, report)
let runSingleTest clientId =
if verbose then domainLog.Information("Using session {sessionId}", ([|clientId|] : obj []))
Test.runFavoriteTest service clientId
if not verbose then Test.runFavoriteTest service clientId
else async {
domainLog.Information("Using session {sessionId}", ([|clientId|] : obj []))
try return! Test.runFavoriteTest service clientId
with e -> domainLog.Warning(e, "Threw"); e.Reraise () }
let results = Test.run log testsPerSecond (duration.Add(TimeSpan.FromSeconds 5.)) errorCutoff reportingIntervals clients runSingleTest |> Async.RunSynchronously
let resultFile = createResultLog report
for r in results do
Expand Down Expand Up @@ -301,15 +304,15 @@ let main argv =
match sargs.TryGetSubCommand() with
| Some (Provision args) ->
let rus = args.GetResult(Rus)
log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus}", rus)
log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus:n0}", rus)
Equinox.Cosmos.Initialization.initialize conn.Client dbName collName rus |> Async.RunSynchronously
0
| Some (Run targs) ->
let conn = Store.Cosmos (Cosmos.createGateway conn defaultBatchSize, dbName, collName)
let res = runTest log conn targs
let read, write = RuCounterSink.Read, RuCounterSink.Write
let total = read+write
log.Information("Total RUs consumed: {totalRus} (R:{readRus}, W:{writeRus})", total, read, write)
log.Information("Total Request Charges sustained in test: {totalRus:n0} (R:{readRus:n0}, W:{writeRus:n0})", total, read, write)
res
| _ -> failwith "init or run is required"
| _ -> failwith "ERROR: please specify memory, es or cosmos Store"
Expand Down
44 changes: 39 additions & 5 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ open Newtonsoft.Json.Linq
open Serilog
open System


[<AutoOpen>]
module DocDbExtensions =
module private DocDbExtensions =
/// Extracts the innermost exception from a nested hierarchy of Aggregate Exceptions
let (|AggregateException|) (exn : exn) =
let rec aux (e : exn) =
Expand Down Expand Up @@ -53,6 +52,8 @@ module DocDbExtensions =
| DocDbException (DocDbStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, NotModified }

module Store =
open System.IO
open System.IO.Compression
[<NoComparison>]
type Position =
{ collectionUri: Uri; streamName: string; index: int64 option; etag: string option }
Expand Down Expand Up @@ -165,21 +166,54 @@ module Store =
static member Create (pos: Position) eventCount (eds: EventData[]) : IndexEvent =
{ p = pos.streamName; id = IndexEvent.IdConstant; m = pos.IndexRel eventCount; _etag = null
c = [| for ed in eds -> { t = ed.eventType; d = ed.data; m = ed.metadata } |] }
and IndexProjection =
and [<CLIMutable>] IndexProjection =
{ /// The Event Type, used to drive deserialization
t: string // required

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb
[<JsonConverter(typeof<VerbatimUtf8JsonConverter>)>]
[<JsonConverter(typeof<Base64ZipUtf8JsonConverter>)>]
d: byte[] // required

/// Optional metadata (null, or same as d, not written if missing)
[<JsonConverter(typeof<VerbatimUtf8JsonConverter>); JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
[<JsonConverter(typeof<Base64ZipUtf8JsonConverter>); JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] } // optional
interface IEventData with
member __.EventType = __.t
member __.DataUtf8 = __.d
member __.MetaUtf8 = __.m

/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc
/// Only applied to snapshots in the Index
and Base64ZipUtf8JsonConverter() =
inherit JsonConverter()
let pickle (input : byte[]) : string =
if input = null then null else

use output = new MemoryStream()
use compressor = new DeflateStream(output, CompressionLevel.Optimal)
compressor.Write(input,0,input.Length)
compressor.Close()
Convert.ToBase64String(output.ToArray())
let unpickle str : byte[] =
if str = null then null else

let compressedBytes = Convert.FromBase64String str
use input = new MemoryStream(compressedBytes)
use decompressor = new DeflateStream(input, CompressionMode.Decompress)
use output = new MemoryStream()
decompressor.CopyTo(output)
decompressor.Close()
output.ToArray()

override __.CanConvert(objectType) =
typeof<byte[]>.Equals(objectType)
override __.ReadJson(reader, _, _, serializer) =
//( if reader.TokenType = JsonToken.Null then null else
serializer.Deserialize(reader, typedefof<string>) :?> string |> unpickle |> box
override __.WriteJson(writer, value, serializer) =
let pickled = value |> unbox |> pickle
serializer.Serialize(writer, pickled)

(* Pseudocode:
function sync(p, expectedVersion, windowSize, events) {
if (i == 0) then {
Expand Down
52 changes: 40 additions & 12 deletions tests/Equinox.Cosmos.Integration/VerbatimUtf8JsonConverterTests.fs
Original file line number Diff line number Diff line change
@@ -1,33 +1,61 @@
module Equinox.Cosmos.Integration.VerbatimUtf8JsonConverterTests

open Equinox.Cosmos
open FsCheck.Xunit
open Newtonsoft.Json
open Swensen.Unquote
open System
open Xunit

let inline serialize (x:'t) =
let serializer = new JsonSerializer()
use sw = new System.IO.StringWriter()
use w = new JsonTextWriter(sw)
serializer.Serialize(w,x)
sw.ToString()

type Embedded = { embed : string }
type Union =
| A of Embedded
| B of Embedded
interface TypeShape.UnionContract.IUnionContract

let mkUnionEncoder () = Equinox.UnionCodec.JsonUtf8.Create<Union>(JsonSerializerSettings())

[<Fact>]
let ``VerbatimUtf8JsonConverter serializes properly`` () =
let unionEncoder = Equinox.UnionCodec.JsonUtf8.Create<_>(JsonSerializerSettings())
let encoded = unionEncoder.Encode(A { embed = "\"" })
let ``VerbatimUtf8JsonConverter encodes correctly`` () =
let encoded = mkUnionEncoder().Encode(A { embed = "\"" })
let e : Store.Event =
{ p = "streamName"; id = string 0; i = 0L
c = DateTimeOffset.MinValue
t = encoded.caseName
d = encoded.payload
m = null }
let res = serialize e
test <@ res.Contains """"d":{"embed":"\""}""" @>
let res = JsonConvert.SerializeObject(e)
test <@ res.Contains """"d":{"embed":"\""}""" @>

type Base64ZipUtf8JsonConverterTests() =
let unionEncoder = mkUnionEncoder ()

[<Fact>]
let ``serializes, achieving compression`` () =
let encoded = unionEncoder.Encode(A { embed = String('x',5000) })
let e : Store.IndexProjection =
{ t = encoded.caseName
d = encoded.payload
m = null }
let res = JsonConvert.SerializeObject e
test <@ res.Contains("\"d\":\"") && res.Length < 100 @>

[<Property>]
let roundtrips value =
let hasNulls =
match value with
| A x | B x when obj.ReferenceEquals(null, x) -> true
| A { embed = x } | B { embed = x } -> obj.ReferenceEquals(null, x)
if hasNulls then () else

let encoded = unionEncoder.Encode value
let e : Store.IndexProjection =
{ t = encoded.caseName
d = encoded.payload
m = null }
let ser = JsonConvert.SerializeObject(e)
test <@ ser.Contains("\"d\":\"") @>
let des = JsonConvert.DeserializeObject<Store.IndexProjection>(ser)
let d : Equinox.UnionCodec.EncodedUnion<_> = { caseName = des.t; payload=des.d }
let decoded = unionEncoder.Decode d
test <@ value = decoded @>

0 comments on commit c476d6a

Please sign in to comment.