Skip to content

Commit

Permalink
Regularize events as arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 27, 2018
1 parent d33749d commit ef5eecc
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 85 deletions.
138 changes: 62 additions & 76 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ open Newtonsoft.Json

/// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds)
type [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
Event =
Batch =
{ /// DocDb-mandated Partition Key, must be maintained within the document
/// Not actually required if running in single partition mode, but for simplicity, we always write it
p: string // "{streamName}"
Expand All @@ -52,15 +52,32 @@ type [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
[<JsonProperty(DefaultValueHandling=DefaultValueHandling.Ignore, Required=Required.Default)>]
_etag: string
/// When we encounter the Tip, we're interested in the 'real i', which is kept in -_i for now
[<JsonProperty(DefaultValueHandling=DefaultValueHandling.Ignore, Required=Required.Default)>]
_i: int64

/// Same as `id`; necessitated by fact that it's not presently possible to do an ORDER BY on the row key
/// base 'i' value for the Events held herein
i: int64 // {index}

/// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
c: System.DateTimeOffset // ISO 8601
/// The events at this offset in the stream
e: BatchEvent[] }
/// Unless running in single partion mode (which would restrict us to 10GB per collection)
/// we need to nominate a partition key that will be in every document
static member PartitionKeyField = "p"
/// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use
static member IndexedFields = [Batch.PartitionKeyField; "i"]
/// If we encounter the tip (id=-1) doc, we're interested in its etag so we can re-sync for 1 RU
member x.TryToPosition() =
if x.id <> Tip.WellKnownDocumentId then None
else Some { index = x._i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x }
/// A single event from the array held in a batch
and [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
BatchEvent =
{ /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
t: System.DateTimeOffset // ISO 8601

/// The Case (Event Type), used to drive deserialization
t: string // required
c: string // required

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.VerbatimUtf8JsonConverter>)>]
Expand All @@ -70,23 +87,13 @@ type [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.VerbatimUtf8JsonConverter>)>]
[<JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] } // optional
/// Unless running in single partion mode (which would restrict us to 10GB per collection)
/// we need to nominate a partition key that will be in every document
static member PartitionKeyField = "p"
/// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use
static member IndexedFields = [Event.PartitionKeyField; "i"]
/// If we encounter a -1 doc, we're interested in its etag so we can re-read for one RU
member x.TryToPosition() =
if x.id <> Tip.WellKnownDocumentId then None
else Some { index = (let ``x.e.LongLength`` = 1L in x.i+``x.e.LongLength``); etag = match x._etag with null -> None | x -> Some x }
/// The Special 'Pending' Batch Format
/// NB this Type does double duty as
/// a) transport for when we read it
/// b) a way of encoding a batch that the stored procedure will write in to the actual document (`i` is -1 until Stored Proc computes it)
/// The stored representation has the following differences vs a 'normal' (frozen/completed) Batch
/// a) `id` and `i` = `-1` as WIP document currently always is
/// b) events are retained as in an `e` array, not top level fields
/// c) contains unfolds (`c`)
/// a) `id` = `-1`
/// b) contains unfolds (`u`)
and [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
Tip =
{ /// Partition key, as per Batch
Expand All @@ -107,35 +114,18 @@ and [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
e: BatchEvent[]

/// Compaction/Snapshot/Projection events
c: Unfold[] }
u: Unfold[] }
/// arguably this should be a high nember to reflect fact it is the freshest ?
static member WellKnownDocumentId = "-1"
/// Create Position from Tip record context (facilitating 1 RU reads)
member x.ToPosition() = { index = x._i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x }
/// A single event from the array held in a batch
and [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
BatchEvent =
{ /// Creation date (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
c: System.DateTimeOffset // ISO 8601

/// The Event Type, used to drive deserialization
t: string // required

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.VerbatimUtf8JsonConverter>)>]
d: byte[] // required

/// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing)
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.VerbatimUtf8JsonConverter>)>]
[<JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] } // optional
/// Compaction/Snapshot/Projection Event based on the state at a given point in time `i`
and Unfold =
{ /// Base: Stream Position (Version) of State from which this Unfold Event was generated
i: int64

/// The Case (Event Type) of this compaction/snapshot, used to drive deserialization
t: string // required
c: string // required

/// Event body - Json -> UTF-8 -> Deflate -> Base64
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.Base64ZipUtf8JsonConverter>)>]
Expand All @@ -152,34 +142,28 @@ type Enum() =
{ new IIndexedEvent with
member __.Index = b._i + int64 offset
member __.IsUnfold = false
member __.EventType = x.t
member __.EventType = x.c
member __.Data = x.d
member __.Meta = x.m })
static member Events(i: int64, e: BatchEvent[]) =
e |> Seq.mapi (fun offset x ->
{ new IIndexedEvent with
member __.Index = i + int64 offset
member __.IsUnfold = false
member __.EventType = x.t
member __.EventType = x.c
member __.Data = x.d
member __.Meta = x.m })
static member Event(x: Event) =
Seq.singleton
{ new IIndexedEvent with
member __.Index = x.i
member __.IsUnfold = false
member __.EventType = x.t
member __.Data = x.d
member __.Meta = x.m }
static member Unfolds(xs: Unfold[]) = seq {
static member Events(b: Batch) =
Enum.Events (b.i, b.e)
static member Unfolds (xs: Unfold[]) = seq {
for x in xs -> { new IIndexedEvent with
member __.Index = x.i
member __.IsUnfold = true
member __.EventType = x.t
member __.EventType = x.c
member __.Data = x.d
member __.Meta = x.m } }
static member EventsAndUnfolds(x:Tip): IIndexedEvent seq =
Enum.Unfolds x.c
static member EventsAndUnfolds(x: Tip): IIndexedEvent seq =
Enum.Unfolds x.u

/// Reference to Collection and name that will be used as the location for the stream
type [<NoComparison>] CollectionStream = { collectionUri: System.Uri; name: string } with
Expand Down Expand Up @@ -289,12 +273,12 @@ module Sync =
// NB don't nest in a private module, or serialization will fail miserably ;)
[<CLIMutable; NoEquality; NoComparison; Newtonsoft.Json.JsonObject(ItemRequired=Newtonsoft.Json.Required.AllowNull)>]
type SyncResponse = { etag: string; nextI: int64; conflicts: BatchEvent[] }
let [<Literal>] sprocName = "EquinoxSync-SingleEvents-021" // NB need to renumber for any breaking change
let [<Literal>] sprocName = "EquinoxSync-SingleArray-001" // NB need to renumber for any breaking change
let [<Literal>] sprocBody = """
// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states
// 1 Verify no current WIP batch, the incoming `req` becomes the WIP batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO)
// 2 Current WIP batch has space to accommodate the incoming projections (req.c) and events (req.e) - merge them in, replacing any superseded projections
// 2 Current WIP batch has space to accommodate the incoming projections (req.u) and events (req.e) - merge them in, replacing any superseded projections
// 3. Current WIP batch would become too large - remove WIP state from active document by replacing the well known id with a correct one; proceed as per 1
function sync(req, expectedVersion) {
if (!req) throw new Error("Missing req argument");
Expand Down Expand Up @@ -332,7 +316,7 @@ function sync(req, expectedVersion) {
if (current && current.e.length + req.e.length > 10) {
current._i = current._i + current.e.length;
current.e = req.e;
current.c = req.c;
current.u = req.u;
// as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place
finalize(current);
Expand All @@ -342,7 +326,7 @@ function sync(req, expectedVersion) {
// Append the new events into the current batch
Array.prototype.push.apply(current.e, req.e);
// Replace all the projections
current.c = req.c;
current.u = req.u;
// TODO: should remove only projections being superseded
// as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place
Expand All @@ -364,10 +348,12 @@ function sync(req, expectedVersion) {
p: req.p,
id: eventI.toString(),
i: eventI,
c: e.c,
t: e.t,
d: e.d,
m: e.m
e: [ {
c: e.c,
t: e.t,
d: e.d,
m: e.m
}]
};
const isAccepted = collection.createDocument(collectionLink, doc, function (err) {
if (err) throw err;
Expand Down Expand Up @@ -407,16 +393,16 @@ function sync(req, expectedVersion) {
let private logged client (stream: CollectionStream) (expectedVersion, req: Tip) (log : ILogger)
: Async<Result> = async {
let verbose = log.IsEnabled Events.LogEventLevel.Debug
let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.c else log
let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log
let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length
let log = log |> Log.prop "bytes" bytes
let writeLog =
log |> Log.prop "stream" stream.name |> Log.prop "expectedVersion" expectedVersion
|> Log.prop "count" req.e.Length |> Log.prop "ucount" req.c.Length
let! t, (ru,result) = run client stream (expectedVersion,req) |> Stopwatch.Time
|> Log.prop "count" req.e.Length |> Log.prop "ucount" req.u.Length
let! t, (ru,result) = run client stream (expectedVersion, req) |> Stopwatch.Time
let resultLog =
let mkMetric ru : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru }
let logConflict () = writeLog.Information("EqxCosmos Sync: Conflict writing {eventTypes}", [| for x in req.e -> x.t |])
let logConflict () = writeLog.Information("EqxCosmos Sync: Conflict writing {eventTypes}", [| for x in req.e -> x.c |])
match result with
| Result.Written pos ->
log |> Log.event (Log.SyncSuccess (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos
Expand All @@ -427,18 +413,18 @@ function sync(req, expectedVersion) {
logConflict ()
let log = if verbose then log |> Log.prop "nextExpectedVersion" pos |> Log.propData "conflicts" xs else log
log |> Log.event (Log.SyncResync(mkMetric ru)) |> Log.prop "conflict" true
resultLog.Information("EqxCosmos {action:l} {count}+{ucount} {ms}ms rc={ru}", "Sync", req.e.Length, req.c.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru)
resultLog.Information("EqxCosmos {action:l} {count}+{ucount} {ms}ms rc={ru}", "Sync", req.e.Length, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru)
return result }

let batch (log : ILogger) retryPolicy client pk batch: Async<Result> =
let call = logged client pk batch
Log.withLoggedRetries retryPolicy "writeAttempt" call log
let mkBatch (stream: Store.CollectionStream) (events: IEvent[]) unfolds: Tip =
{ p = stream.name; id = Store.Tip.WellKnownDocumentId; _i = -1L(*Server-managed*); _etag = null
e = [| for e in events -> { c = DateTimeOffset.UtcNow; t = e.EventType; d = e.Data; m = e.Meta } |]
c = Array.ofSeq unfolds }
e = [| for e in events -> { t = DateTimeOffset.UtcNow; c = e.EventType; d = e.Data; m = e.Meta } |]
u = Array.ofSeq unfolds }
let mkUnfold baseIndex (unfolds: IEvent seq) : Store.Unfold seq =
unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; t = x.EventType; d = x.Data; m = x.Meta } : Store.Unfold)
unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; c = x.EventType; d = x.Data; m = x.Meta } : Store.Unfold)

module Initialization =
open System.Collections.ObjectModel
Expand All @@ -449,7 +435,7 @@ function sync(req, expectedVersion) {

let createCollection (client: IDocumentClient) (dbUri: Uri) collName ru = async {
let pkd = PartitionKeyDefinition()
pkd.Paths.Add(sprintf "/%s" Store.Event.PartitionKeyField)
pkd.Paths.Add(sprintf "/%s" Store.Batch.PartitionKeyField)
let colld = DocumentCollection(Id = collName, PartitionKey = pkd)

colld.IndexingPolicy.IndexingMode <- IndexingMode.Consistent
Expand All @@ -458,7 +444,7 @@ function sync(req, expectedVersion) {
// Given how long and variable the blacklist would be, we whitelist instead
colld.IndexingPolicy.ExcludedPaths <- Collection [|ExcludedPath(Path="/*")|]
// NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors
colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Store.Event.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |]
colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Store.Batch.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |]
let! coll = client.CreateDocumentCollectionIfNotExistsAsync(dbUri, colld, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect
return coll.Resource.Id }

Expand Down Expand Up @@ -493,9 +479,9 @@ module private Tip =
(log 0 0 Log.TipNotFound).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru)
| ReadResult.Found doc ->
let log =
let (Log.BatchLen bytes), count = Enum.Unfolds doc.c, doc.c.Length
let (Log.BatchLen bytes), count = Enum.Unfolds doc.u, doc.u.Length
log bytes count Log.Tip
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataUnfolds doc.c |> Log.prop "etag" doc._etag
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataUnfolds doc.u |> Log.prop "etag" doc._etag
log.Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru)
return ru, res }
type [<RequireQualifiedAccess; NoComparison; NoEquality>] Result = NotModified | NotFound | Found of Position * IIndexedEvent[]
Expand All @@ -518,15 +504,15 @@ module private Tip =
let f = if direction = Direction.Forward then "c.i >= @id ORDER BY c.i ASC" else "c.i < @id ORDER BY c.i DESC"
SqlQuerySpec("SELECT * FROM c WHERE c.i != -1 AND " + f, SqlParameterCollection [SqlParameter("@id", p.index)])
let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems)
client.CreateDocumentQuery<Event>(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery()
client.CreateDocumentQuery<Batch>(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery()

// Unrolls the Batches in a response - note when reading backwards, the events are emitted in reverse order of index
let private handleResponse direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery<Event>) (log: ILogger)
let private handleResponse direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery<Batch>) (log: ILogger)
: Async<IIndexedEvent[] * Position option * float> = async {
let! ct = Async.CancellationToken
let! t, (res : Client.FeedResponse<Event>) = query.ExecuteNextAsync<Event>(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
let! t, (res : Client.FeedResponse<Batch>) = query.ExecuteNextAsync<Batch>(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
let batches, ru = Array.ofSeq res, res.RequestCharge
let events = batches |> Seq.collect Enum.Event |> Array.ofSeq
let events = batches |> Seq.collect Enum.Events |> Array.ofSeq
let (Log.BatchLen bytes), count = events, events.Length
let reqMetric : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru }
// TODO investigate whether there is a way to avoid the potential cost (or whether there is significance to it) of these null responses
Expand All @@ -539,9 +525,9 @@ module private Tip =
let maybePosition = batches |> Array.tryPick (fun x -> x.TryToPosition())
return events, maybePosition, ru }

let private run (log : ILogger) (readSlice: IDocumentQuery<Event> -> ILogger -> Async<IIndexedEvent[] * Position option * float>)
let private run (log : ILogger) (readSlice: IDocumentQuery<Batch> -> ILogger -> Async<IIndexedEvent[] * Position option * float>)
(maxPermittedBatchReads: int option)
(query: IDocumentQuery<Event>)
(query: IDocumentQuery<Batch>)
: AsyncSeq<IIndexedEvent[] * Position option * float> =
let rec loop batchCount : AsyncSeq<IIndexedEvent[] * Position option * float> = asyncSeq {
match maxPermittedBatchReads with
Expand Down
Loading

0 comments on commit ef5eecc

Please sign in to comment.