From 9a444fba96473f097ec6f9f2f61cb0599d5d76f1 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 29 Nov 2018 11:57:14 +0000 Subject: [PATCH] Unify Tip vs Batch formats (#54) --- src/Equinox.Cosmos/Cosmos.fs | 138 ++++++++---------- .../CosmosCoreIntegration.fs | 2 +- .../JsonConverterTests.fs | 15 +- 3 files changed, 69 insertions(+), 86 deletions(-) diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 0b3d75e55..f85bc7c26 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -37,7 +37,7 @@ open Newtonsoft.Json /// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds) type [] - Event = + Batch = { /// DocDb-mandated Partition Key, must be maintained within the document /// Not actually required if running in single partition mode, but for simplicity, we always write it p: string // "{streamName}" @@ -52,15 +52,32 @@ type [] /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed [] _etag: string + /// When we encounter the Tip, we're interested in the 'real i', which is kept in -_i for now + [] + _i: int64 - /// Same as `id`; necessitated by fact that it's not presently possible to do an ORDER BY on the row key + /// base 'i' value for the Events held herein i: int64 // {index} - /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) - c: System.DateTimeOffset // ISO 8601 + /// The events at this offset in the stream + e: BatchEvent[] } + /// Unless running in single partion mode (which would restrict us to 10GB per collection) + /// we need to nominate a partition key that will be in every document + static member PartitionKeyField = "p" + /// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use + static member IndexedFields = [Batch.PartitionKeyField; "i"] + /// If we encounter the tip (id=-1) doc, we're interested in its etag so we can re-sync for 1 RU + member x.TryToPosition() = + if x.id <> Tip.WellKnownDocumentId then None + else Some { index = x._i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x } +/// A single event from the array held in a batch +and [] + BatchEvent = + { /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) + t: System.DateTimeOffset // ISO 8601 /// The Case (Event Type), used to drive deserialization - t: string // required + c: string // required /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb [)>] @@ -70,23 +87,13 @@ type [] [)>] [] m: byte[] } // optional - /// Unless running in single partion mode (which would restrict us to 10GB per collection) - /// we need to nominate a partition key that will be in every document - static member PartitionKeyField = "p" - /// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use - static member IndexedFields = [Event.PartitionKeyField; "i"] - /// If we encounter a -1 doc, we're interested in its etag so we can re-read for one RU - member x.TryToPosition() = - if x.id <> Tip.WellKnownDocumentId then None - else Some { index = (let ``x.e.LongLength`` = 1L in x.i+``x.e.LongLength``); etag = match x._etag with null -> None | x -> Some x } /// The Special 'Pending' Batch Format /// NB this Type does double duty as /// a) transport for when we read it /// b) a way of encoding a batch that the stored procedure will write in to the actual document (`i` is -1 until Stored Proc computes it) /// The stored representation has the following differences vs a 'normal' (frozen/completed) Batch -/// a) `id` and `i` = `-1` as WIP document currently always is -/// b) events are retained as in an `e` array, not top level fields -/// c) contains unfolds (`c`) +/// a) `id` = `-1` +/// b) contains unfolds (`u`) and [] Tip = { /// Partition key, as per Batch @@ -107,35 +114,18 @@ and [] e: BatchEvent[] /// Compaction/Snapshot/Projection events - c: Unfold[] } + u: Unfold[] } /// arguably this should be a high nember to reflect fact it is the freshest ? static member WellKnownDocumentId = "-1" /// Create Position from Tip record context (facilitating 1 RU reads) member x.ToPosition() = { index = x._i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x } -/// A single event from the array held in a batch -and [] - BatchEvent = - { /// Creation date (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) - c: System.DateTimeOffset // ISO 8601 - - /// The Event Type, used to drive deserialization - t: string // required - - /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb - [)>] - d: byte[] // required - - /// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing) - [)>] - [] - m: byte[] } // optional /// Compaction/Snapshot/Projection Event based on the state at a given point in time `i` and Unfold = { /// Base: Stream Position (Version) of State from which this Unfold Event was generated i: int64 /// The Case (Event Type) of this compaction/snapshot, used to drive deserialization - t: string // required + c: string // required /// Event body - Json -> UTF-8 -> Deflate -> Base64 [)>] @@ -152,7 +142,7 @@ type Enum() = { new IIndexedEvent with member __.Index = b._i + int64 offset member __.IsUnfold = false - member __.EventType = x.t + member __.EventType = x.c member __.Data = x.d member __.Meta = x.m }) static member Events(i: int64, e: BatchEvent[]) = @@ -160,26 +150,20 @@ type Enum() = { new IIndexedEvent with member __.Index = i + int64 offset member __.IsUnfold = false - member __.EventType = x.t + member __.EventType = x.c member __.Data = x.d member __.Meta = x.m }) - static member Event(x: Event) = - Seq.singleton - { new IIndexedEvent with - member __.Index = x.i - member __.IsUnfold = false - member __.EventType = x.t - member __.Data = x.d - member __.Meta = x.m } - static member Unfolds(xs: Unfold[]) = seq { + static member Events(b: Batch) = + Enum.Events (b.i, b.e) + static member Unfolds (xs: Unfold[]) = seq { for x in xs -> { new IIndexedEvent with member __.Index = x.i member __.IsUnfold = true - member __.EventType = x.t + member __.EventType = x.c member __.Data = x.d member __.Meta = x.m } } - static member EventsAndUnfolds(x:Tip): IIndexedEvent seq = - Enum.Unfolds x.c + static member EventsAndUnfolds(x: Tip): IIndexedEvent seq = + Enum.Unfolds x.u /// Reference to Collection and name that will be used as the location for the stream type [] CollectionStream = { collectionUri: System.Uri; name: string } with @@ -289,12 +273,12 @@ module Sync = // NB don't nest in a private module, or serialization will fail miserably ;) [] type SyncResponse = { etag: string; nextI: int64; conflicts: BatchEvent[] } - let [] sprocName = "EquinoxSync-SingleEvents-021" // NB need to renumber for any breaking change + let [] sprocName = "EquinoxSync-SingleArray-001" // NB need to renumber for any breaking change let [] sprocBody = """ // Manages the merging of the supplied Request Batch, fulfilling one of the following end-states // 1 Verify no current WIP batch, the incoming `req` becomes the WIP batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO) -// 2 Current WIP batch has space to accommodate the incoming projections (req.c) and events (req.e) - merge them in, replacing any superseded projections +// 2 Current WIP batch has space to accommodate the incoming projections (req.u) and events (req.e) - merge them in, replacing any superseded projections // 3. Current WIP batch would become too large - remove WIP state from active document by replacing the well known id with a correct one; proceed as per 1 function sync(req, expectedVersion) { if (!req) throw new Error("Missing req argument"); @@ -332,7 +316,7 @@ function sync(req, expectedVersion) { if (current && current.e.length + req.e.length > 10) { current._i = current._i + current.e.length; current.e = req.e; - current.c = req.c; + current.u = req.u; // as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place finalize(current); @@ -342,7 +326,7 @@ function sync(req, expectedVersion) { // Append the new events into the current batch Array.prototype.push.apply(current.e, req.e); // Replace all the projections - current.c = req.c; + current.u = req.u; // TODO: should remove only projections being superseded // as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place @@ -364,10 +348,12 @@ function sync(req, expectedVersion) { p: req.p, id: eventI.toString(), i: eventI, - c: e.c, - t: e.t, - d: e.d, - m: e.m + e: [ { + c: e.c, + t: e.t, + d: e.d, + m: e.m + }] }; const isAccepted = collection.createDocument(collectionLink, doc, function (err) { if (err) throw err; @@ -407,16 +393,16 @@ function sync(req, expectedVersion) { let private logged client (stream: CollectionStream) (expectedVersion, req: Tip) (log : ILogger) : Async = async { let verbose = log.IsEnabled Events.LogEventLevel.Debug - let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.c else log + let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length let log = log |> Log.prop "bytes" bytes let writeLog = log |> Log.prop "stream" stream.name |> Log.prop "expectedVersion" expectedVersion - |> Log.prop "count" req.e.Length |> Log.prop "ucount" req.c.Length - let! t, (ru,result) = run client stream (expectedVersion,req) |> Stopwatch.Time + |> Log.prop "count" req.e.Length |> Log.prop "ucount" req.u.Length + let! t, (ru,result) = run client stream (expectedVersion, req) |> Stopwatch.Time let resultLog = let mkMetric ru : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } - let logConflict () = writeLog.Information("EqxCosmos Sync: Conflict writing {eventTypes}", [| for x in req.e -> x.t |]) + let logConflict () = writeLog.Information("EqxCosmos Sync: Conflict writing {eventTypes}", [| for x in req.e -> x.c |]) match result with | Result.Written pos -> log |> Log.event (Log.SyncSuccess (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos @@ -427,7 +413,7 @@ function sync(req, expectedVersion) { logConflict () let log = if verbose then log |> Log.prop "nextExpectedVersion" pos |> Log.propData "conflicts" xs else log log |> Log.event (Log.SyncResync(mkMetric ru)) |> Log.prop "conflict" true - resultLog.Information("EqxCosmos {action:l} {count}+{ucount} {ms}ms rc={ru}", "Sync", req.e.Length, req.c.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru) + resultLog.Information("EqxCosmos {action:l} {count}+{ucount} {ms}ms rc={ru}", "Sync", req.e.Length, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru) return result } let batch (log : ILogger) retryPolicy client pk batch: Async = @@ -435,10 +421,10 @@ function sync(req, expectedVersion) { Log.withLoggedRetries retryPolicy "writeAttempt" call log let mkBatch (stream: Store.CollectionStream) (events: IEvent[]) unfolds: Tip = { p = stream.name; id = Store.Tip.WellKnownDocumentId; _i = -1L(*Server-managed*); _etag = null - e = [| for e in events -> { c = DateTimeOffset.UtcNow; t = e.EventType; d = e.Data; m = e.Meta } |] - c = Array.ofSeq unfolds } + e = [| for e in events -> { t = DateTimeOffset.UtcNow; c = e.EventType; d = e.Data; m = e.Meta } |] + u = Array.ofSeq unfolds } let mkUnfold baseIndex (unfolds: IEvent seq) : Store.Unfold seq = - unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; t = x.EventType; d = x.Data; m = x.Meta } : Store.Unfold) + unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; c = x.EventType; d = x.Data; m = x.Meta } : Store.Unfold) module Initialization = open System.Collections.ObjectModel @@ -449,7 +435,7 @@ function sync(req, expectedVersion) { let createCollection (client: IDocumentClient) (dbUri: Uri) collName ru = async { let pkd = PartitionKeyDefinition() - pkd.Paths.Add(sprintf "/%s" Store.Event.PartitionKeyField) + pkd.Paths.Add(sprintf "/%s" Store.Batch.PartitionKeyField) let colld = DocumentCollection(Id = collName, PartitionKey = pkd) colld.IndexingPolicy.IndexingMode <- IndexingMode.Consistent @@ -458,7 +444,7 @@ function sync(req, expectedVersion) { // Given how long and variable the blacklist would be, we whitelist instead colld.IndexingPolicy.ExcludedPaths <- Collection [|ExcludedPath(Path="/*")|] // NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors - colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Store.Event.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |] + colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Store.Batch.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |] let! coll = client.CreateDocumentCollectionIfNotExistsAsync(dbUri, colld, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect return coll.Resource.Id } @@ -493,9 +479,9 @@ module private Tip = (log 0 0 Log.TipNotFound).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.Found doc -> let log = - let (Log.BatchLen bytes), count = Enum.Unfolds doc.c, doc.c.Length + let (Log.BatchLen bytes), count = Enum.Unfolds doc.u, doc.u.Length log bytes count Log.Tip - let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataUnfolds doc.c |> Log.prop "etag" doc._etag + let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataUnfolds doc.u |> Log.prop "etag" doc._etag log.Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru) return ru, res } type [] Result = NotModified | NotFound | Found of Position * IIndexedEvent[] @@ -518,15 +504,15 @@ module private Tip = let f = if direction = Direction.Forward then "c.i >= @id ORDER BY c.i ASC" else "c.i < @id ORDER BY c.i DESC" SqlQuerySpec("SELECT * FROM c WHERE c.i != -1 AND " + f, SqlParameterCollection [SqlParameter("@id", p.index)]) let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems) - client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() + client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() // Unrolls the Batches in a response - note when reading backwards, the events are emitted in reverse order of index - let private handleResponse direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery) (log: ILogger) + let private handleResponse direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery) (log: ILogger) : Async = async { let! ct = Async.CancellationToken - let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time + let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time let batches, ru = Array.ofSeq res, res.RequestCharge - let events = batches |> Seq.collect Enum.Event |> Array.ofSeq + let events = batches |> Seq.collect Enum.Events |> Array.ofSeq let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } // TODO investigate whether there is a way to avoid the potential cost (or whether there is significance to it) of these null responses @@ -539,9 +525,9 @@ module private Tip = let maybePosition = batches |> Array.tryPick (fun x -> x.TryToPosition()) return events, maybePosition, ru } - let private run (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) + let private run (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) (maxPermittedBatchReads: int option) - (query: IDocumentQuery) + (query: IDocumentQuery) : AsyncSeq = let rec loop batchCount : AsyncSeq = asyncSeq { match maxPermittedBatchReads with diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index 31c144fc9..f7002401a 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -176,7 +176,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName 0L expected test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 12 // was 10, observed 10.57 + verifyRequestChargesMax 14 // observed 12.73 // was 10 capture.Clear() // Try overwriting it (a competing consumer would see the same) diff --git a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs index 721209113..5d99c734e 100644 --- a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs +++ b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs @@ -21,12 +21,9 @@ type VerbatimUtf8Tests() = [] let ``encodes correctly`` () = let encoded = mkUnionEncoder().Encode(A { embed = "\"" }) - let e : Store.Event = - { p = "streamName"; id = string 0; i = 0L; _etag=null - c = DateTimeOffset.MinValue - t = encoded.caseName - d = encoded.payload - m = null } + let e : Store.Batch = + { p = "streamName"; id = string 0; i = 0L; _i = 0L; _etag = null + e = [| { t = DateTimeOffset.MinValue; c = encoded.caseName; d = encoded.payload; m = null } |] } let res = JsonConvert.SerializeObject(e) test <@ res.Contains """"d":{"embed":"\""}""" @> @@ -38,7 +35,7 @@ type Base64ZipUtf8Tests() = let encoded = unionEncoder.Encode(A { embed = String('x',5000) }) let e : Store.Unfold = { i = 42L - t = encoded.caseName + c = encoded.caseName d = encoded.payload m = null } let res = JsonConvert.SerializeObject e @@ -55,12 +52,12 @@ type Base64ZipUtf8Tests() = let encoded = unionEncoder.Encode value let e : Store.Unfold = { i = 42L - t = encoded.caseName + c = encoded.caseName d = encoded.payload m = null } let ser = JsonConvert.SerializeObject(e) test <@ ser.Contains("\"d\":\"") @> let des = JsonConvert.DeserializeObject(ser) - let d : Equinox.UnionCodec.EncodedUnion<_> = { caseName = des.t; payload = des.d } + let d : Equinox.UnionCodec.EncodedUnion<_> = { caseName = des.c; payload = des.d } let decoded = unionEncoder.Decode d test <@ value = decoded @> \ No newline at end of file