Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify Tip vs Batch formats #54

Merged
merged 1 commit into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 62 additions & 76 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ open Newtonsoft.Json

/// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds)
type [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
Event =
Batch =
{ /// DocDb-mandated Partition Key, must be maintained within the document
/// Not actually required if running in single partition mode, but for simplicity, we always write it
p: string // "{streamName}"
Expand All @@ -52,15 +52,32 @@ type [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
[<JsonProperty(DefaultValueHandling=DefaultValueHandling.Ignore, Required=Required.Default)>]
_etag: string
/// When we encounter the Tip, we're interested in the 'real i', which is kept in -_i for now
[<JsonProperty(DefaultValueHandling=DefaultValueHandling.Ignore, Required=Required.Default)>]
_i: int64

/// Same as `id`; necessitated by fact that it's not presently possible to do an ORDER BY on the row key
/// base 'i' value for the Events held herein
i: int64 // {index}

/// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
c: System.DateTimeOffset // ISO 8601
/// The events at this offset in the stream
e: BatchEvent[] }
/// Unless running in single partion mode (which would restrict us to 10GB per collection)
/// we need to nominate a partition key that will be in every document
static member PartitionKeyField = "p"
/// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use
static member IndexedFields = [Batch.PartitionKeyField; "i"]
/// If we encounter the tip (id=-1) doc, we're interested in its etag so we can re-sync for 1 RU
member x.TryToPosition() =
if x.id <> Tip.WellKnownDocumentId then None
else Some { index = x._i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x }
/// A single event from the array held in a batch
and [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
BatchEvent =
{ /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
t: System.DateTimeOffset // ISO 8601

/// The Case (Event Type), used to drive deserialization
t: string // required
c: string // required

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.VerbatimUtf8JsonConverter>)>]
Expand All @@ -70,23 +87,13 @@ type [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.VerbatimUtf8JsonConverter>)>]
[<JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] } // optional
/// Unless running in single partion mode (which would restrict us to 10GB per collection)
/// we need to nominate a partition key that will be in every document
static member PartitionKeyField = "p"
/// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use
static member IndexedFields = [Event.PartitionKeyField; "i"]
/// If we encounter a -1 doc, we're interested in its etag so we can re-read for one RU
member x.TryToPosition() =
if x.id <> Tip.WellKnownDocumentId then None
else Some { index = (let ``x.e.LongLength`` = 1L in x.i+``x.e.LongLength``); etag = match x._etag with null -> None | x -> Some x }
/// The Special 'Pending' Batch Format
/// NB this Type does double duty as
/// a) transport for when we read it
/// b) a way of encoding a batch that the stored procedure will write in to the actual document (`i` is -1 until Stored Proc computes it)
/// The stored representation has the following differences vs a 'normal' (frozen/completed) Batch
/// a) `id` and `i` = `-1` as WIP document currently always is
/// b) events are retained as in an `e` array, not top level fields
/// c) contains unfolds (`c`)
/// a) `id` = `-1`
/// b) contains unfolds (`u`)
and [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
Tip =
{ /// Partition key, as per Batch
Expand All @@ -107,35 +114,18 @@ and [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
e: BatchEvent[]

/// Compaction/Snapshot/Projection events
c: Unfold[] }
u: Unfold[] }
/// arguably this should be a high nember to reflect fact it is the freshest ?
static member WellKnownDocumentId = "-1"
/// Create Position from Tip record context (facilitating 1 RU reads)
member x.ToPosition() = { index = x._i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x }
/// A single event from the array held in a batch
and [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
BatchEvent =
{ /// Creation date (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
c: System.DateTimeOffset // ISO 8601

/// The Event Type, used to drive deserialization
t: string // required

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.VerbatimUtf8JsonConverter>)>]
d: byte[] // required

/// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing)
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.VerbatimUtf8JsonConverter>)>]
[<JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] } // optional
/// Compaction/Snapshot/Projection Event based on the state at a given point in time `i`
and Unfold =
{ /// Base: Stream Position (Version) of State from which this Unfold Event was generated
i: int64

/// The Case (Event Type) of this compaction/snapshot, used to drive deserialization
t: string // required
c: string // required

/// Event body - Json -> UTF-8 -> Deflate -> Base64
[<JsonConverter(typeof<Equinox.Cosmos.Internal.Json.Base64ZipUtf8JsonConverter>)>]
Expand All @@ -152,34 +142,28 @@ type Enum() =
{ new IIndexedEvent with
member __.Index = b._i + int64 offset
member __.IsUnfold = false
member __.EventType = x.t
member __.EventType = x.c
member __.Data = x.d
member __.Meta = x.m })
static member Events(i: int64, e: BatchEvent[]) =
e |> Seq.mapi (fun offset x ->
{ new IIndexedEvent with
member __.Index = i + int64 offset
member __.IsUnfold = false
member __.EventType = x.t
member __.EventType = x.c
member __.Data = x.d
member __.Meta = x.m })
static member Event(x: Event) =
Seq.singleton
{ new IIndexedEvent with
member __.Index = x.i
member __.IsUnfold = false
member __.EventType = x.t
member __.Data = x.d
member __.Meta = x.m }
static member Unfolds(xs: Unfold[]) = seq {
static member Events(b: Batch) =
Enum.Events (b.i, b.e)
static member Unfolds (xs: Unfold[]) = seq {
for x in xs -> { new IIndexedEvent with
member __.Index = x.i
member __.IsUnfold = true
member __.EventType = x.t
member __.EventType = x.c
member __.Data = x.d
member __.Meta = x.m } }
static member EventsAndUnfolds(x:Tip): IIndexedEvent seq =
Enum.Unfolds x.c
static member EventsAndUnfolds(x: Tip): IIndexedEvent seq =
Enum.Unfolds x.u

/// Reference to Collection and name that will be used as the location for the stream
type [<NoComparison>] CollectionStream = { collectionUri: System.Uri; name: string } with
Expand Down Expand Up @@ -289,12 +273,12 @@ module Sync =
// NB don't nest in a private module, or serialization will fail miserably ;)
[<CLIMutable; NoEquality; NoComparison; Newtonsoft.Json.JsonObject(ItemRequired=Newtonsoft.Json.Required.AllowNull)>]
type SyncResponse = { etag: string; nextI: int64; conflicts: BatchEvent[] }
let [<Literal>] sprocName = "EquinoxSync-SingleEvents-021" // NB need to renumber for any breaking change
let [<Literal>] sprocName = "EquinoxSync-SingleArray-001" // NB need to renumber for any breaking change
let [<Literal>] sprocBody = """

// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states
// 1 Verify no current WIP batch, the incoming `req` becomes the WIP batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO)
// 2 Current WIP batch has space to accommodate the incoming projections (req.c) and events (req.e) - merge them in, replacing any superseded projections
// 2 Current WIP batch has space to accommodate the incoming projections (req.u) and events (req.e) - merge them in, replacing any superseded projections
// 3. Current WIP batch would become too large - remove WIP state from active document by replacing the well known id with a correct one; proceed as per 1
function sync(req, expectedVersion) {
if (!req) throw new Error("Missing req argument");
Expand Down Expand Up @@ -332,7 +316,7 @@ function sync(req, expectedVersion) {
if (current && current.e.length + req.e.length > 10) {
current._i = current._i + current.e.length;
current.e = req.e;
current.c = req.c;
current.u = req.u;

// as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place
finalize(current);
Expand All @@ -342,7 +326,7 @@ function sync(req, expectedVersion) {
// Append the new events into the current batch
Array.prototype.push.apply(current.e, req.e);
// Replace all the projections
current.c = req.c;
current.u = req.u;
// TODO: should remove only projections being superseded

// as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place
Expand All @@ -364,10 +348,12 @@ function sync(req, expectedVersion) {
p: req.p,
id: eventI.toString(),
i: eventI,
c: e.c,
t: e.t,
d: e.d,
m: e.m
e: [ {
c: e.c,
t: e.t,
d: e.d,
m: e.m
}]
};
const isAccepted = collection.createDocument(collectionLink, doc, function (err) {
if (err) throw err;
Expand Down Expand Up @@ -407,16 +393,16 @@ function sync(req, expectedVersion) {
let private logged client (stream: CollectionStream) (expectedVersion, req: Tip) (log : ILogger)
: Async<Result> = async {
let verbose = log.IsEnabled Events.LogEventLevel.Debug
let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.c else log
let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log
let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length
let log = log |> Log.prop "bytes" bytes
let writeLog =
log |> Log.prop "stream" stream.name |> Log.prop "expectedVersion" expectedVersion
|> Log.prop "count" req.e.Length |> Log.prop "ucount" req.c.Length
let! t, (ru,result) = run client stream (expectedVersion,req) |> Stopwatch.Time
|> Log.prop "count" req.e.Length |> Log.prop "ucount" req.u.Length
let! t, (ru,result) = run client stream (expectedVersion, req) |> Stopwatch.Time
let resultLog =
let mkMetric ru : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru }
let logConflict () = writeLog.Information("EqxCosmos Sync: Conflict writing {eventTypes}", [| for x in req.e -> x.t |])
let logConflict () = writeLog.Information("EqxCosmos Sync: Conflict writing {eventTypes}", [| for x in req.e -> x.c |])
match result with
| Result.Written pos ->
log |> Log.event (Log.SyncSuccess (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos
Expand All @@ -427,18 +413,18 @@ function sync(req, expectedVersion) {
logConflict ()
let log = if verbose then log |> Log.prop "nextExpectedVersion" pos |> Log.propData "conflicts" xs else log
log |> Log.event (Log.SyncResync(mkMetric ru)) |> Log.prop "conflict" true
resultLog.Information("EqxCosmos {action:l} {count}+{ucount} {ms}ms rc={ru}", "Sync", req.e.Length, req.c.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru)
resultLog.Information("EqxCosmos {action:l} {count}+{ucount} {ms}ms rc={ru}", "Sync", req.e.Length, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru)
return result }

let batch (log : ILogger) retryPolicy client pk batch: Async<Result> =
let call = logged client pk batch
Log.withLoggedRetries retryPolicy "writeAttempt" call log
let mkBatch (stream: Store.CollectionStream) (events: IEvent[]) unfolds: Tip =
{ p = stream.name; id = Store.Tip.WellKnownDocumentId; _i = -1L(*Server-managed*); _etag = null
e = [| for e in events -> { c = DateTimeOffset.UtcNow; t = e.EventType; d = e.Data; m = e.Meta } |]
c = Array.ofSeq unfolds }
e = [| for e in events -> { t = DateTimeOffset.UtcNow; c = e.EventType; d = e.Data; m = e.Meta } |]
u = Array.ofSeq unfolds }
let mkUnfold baseIndex (unfolds: IEvent seq) : Store.Unfold seq =
unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; t = x.EventType; d = x.Data; m = x.Meta } : Store.Unfold)
unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; c = x.EventType; d = x.Data; m = x.Meta } : Store.Unfold)

module Initialization =
open System.Collections.ObjectModel
Expand All @@ -449,7 +435,7 @@ function sync(req, expectedVersion) {

let createCollection (client: IDocumentClient) (dbUri: Uri) collName ru = async {
let pkd = PartitionKeyDefinition()
pkd.Paths.Add(sprintf "/%s" Store.Event.PartitionKeyField)
pkd.Paths.Add(sprintf "/%s" Store.Batch.PartitionKeyField)
let colld = DocumentCollection(Id = collName, PartitionKey = pkd)

colld.IndexingPolicy.IndexingMode <- IndexingMode.Consistent
Expand All @@ -458,7 +444,7 @@ function sync(req, expectedVersion) {
// Given how long and variable the blacklist would be, we whitelist instead
colld.IndexingPolicy.ExcludedPaths <- Collection [|ExcludedPath(Path="/*")|]
// NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors
colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Store.Event.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |]
colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Store.Batch.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |]
let! coll = client.CreateDocumentCollectionIfNotExistsAsync(dbUri, colld, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect
return coll.Resource.Id }

Expand Down Expand Up @@ -493,9 +479,9 @@ module private Tip =
(log 0 0 Log.TipNotFound).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru)
| ReadResult.Found doc ->
let log =
let (Log.BatchLen bytes), count = Enum.Unfolds doc.c, doc.c.Length
let (Log.BatchLen bytes), count = Enum.Unfolds doc.u, doc.u.Length
log bytes count Log.Tip
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataUnfolds doc.c |> Log.prop "etag" doc._etag
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataUnfolds doc.u |> Log.prop "etag" doc._etag
log.Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru)
return ru, res }
type [<RequireQualifiedAccess; NoComparison; NoEquality>] Result = NotModified | NotFound | Found of Position * IIndexedEvent[]
Expand All @@ -518,15 +504,15 @@ module private Tip =
let f = if direction = Direction.Forward then "c.i >= @id ORDER BY c.i ASC" else "c.i < @id ORDER BY c.i DESC"
SqlQuerySpec("SELECT * FROM c WHERE c.i != -1 AND " + f, SqlParameterCollection [SqlParameter("@id", p.index)])
let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems)
client.CreateDocumentQuery<Event>(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery()
client.CreateDocumentQuery<Batch>(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery()

// Unrolls the Batches in a response - note when reading backwards, the events are emitted in reverse order of index
let private handleResponse direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery<Event>) (log: ILogger)
let private handleResponse direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery<Batch>) (log: ILogger)
: Async<IIndexedEvent[] * Position option * float> = async {
let! ct = Async.CancellationToken
let! t, (res : Client.FeedResponse<Event>) = query.ExecuteNextAsync<Event>(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
let! t, (res : Client.FeedResponse<Batch>) = query.ExecuteNextAsync<Batch>(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
let batches, ru = Array.ofSeq res, res.RequestCharge
let events = batches |> Seq.collect Enum.Event |> Array.ofSeq
let events = batches |> Seq.collect Enum.Events |> Array.ofSeq
let (Log.BatchLen bytes), count = events, events.Length
let reqMetric : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru }
// TODO investigate whether there is a way to avoid the potential cost (or whether there is significance to it) of these null responses
Expand All @@ -539,9 +525,9 @@ module private Tip =
let maybePosition = batches |> Array.tryPick (fun x -> x.TryToPosition())
return events, maybePosition, ru }

let private run (log : ILogger) (readSlice: IDocumentQuery<Event> -> ILogger -> Async<IIndexedEvent[] * Position option * float>)
let private run (log : ILogger) (readSlice: IDocumentQuery<Batch> -> ILogger -> Async<IIndexedEvent[] * Position option * float>)
(maxPermittedBatchReads: int option)
(query: IDocumentQuery<Event>)
(query: IDocumentQuery<Batch>)
: AsyncSeq<IIndexedEvent[] * Position option * float> =
let rec loop batchCount : AsyncSeq<IIndexedEvent[] * Position option * float> = asyncSeq {
match maxPermittedBatchReads with
Expand Down
2 changes: 1 addition & 1 deletion tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type Tests(testOutputHelper) =
let! res = Events.append ctx streamName 0L expected
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 12 // was 10, observed 10.57
verifyRequestChargesMax 14 // observed 12.73 // was 10
capture.Clear()

// Try overwriting it (a competing consumer would see the same)
Expand Down
Loading