From 35d6b7e87451cf934256b29896ad3d148daa1d56 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 29 Nov 2018 12:18:01 +0000 Subject: [PATCH] Multi-event Batches (#48) * Multi-event batches; Tip now a Batch too * Handle startIndex within multi-item batches --- cli/Equinox.Cli/Program.fs | 11 +- src/Equinox.Cosmos/Cosmos.fs | 221 +++++++++--------- .../CosmosCoreIntegration.fs | 106 +++++---- .../CosmosFixtures.fs | 3 + .../CosmosIntegration.fs | 12 +- .../JsonConverterTests.fs | 2 +- 6 files changed, 191 insertions(+), 164 deletions(-) diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index 5ad3884c3..772798580 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -77,6 +77,7 @@ and [] CosmosArguments = | [] Database of string | [] Collection of string | [] RetriesWaitTime of int + | [] PageSize of int | [] Provision of ParseResults | [] Run of ParseResults @@ -89,6 +90,7 @@ and [] CosmosArguments = | Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)." | Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" + | PageSize _ -> "Specify maximum number of events to record on a page before switching to a new one (default: 1)" | Provision _ -> "Initialize a store collection." | Run _ -> "Run a load test." and CosmosProvisionArguments = @@ -119,7 +121,7 @@ module Cosmos = let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) = EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime) .Connect("equinox-cli", discovery) - let createGateway connection maxItems = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems)) + let createGateway connection (maxItems,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems, maxEventsPerSlice=maxEvents)) [] type Store = @@ -306,9 +308,10 @@ let main argv = let collName = sargs.GetResult(Collection, defaultArg (read "EQUINOX_COSMOS_COLLECTION") "equinox-test") let timeout = sargs.GetResult(Timeout,5.) |> float |> TimeSpan.FromSeconds let (retries, maxRetryWaitTime) as operationThrottling = sargs.GetResult(Retries, 1), sargs.GetResult(RetriesWaitTime, 5) - log.Information("Using CosmosDb Connection {connection} Database: {database} Collection: {collection}. " + + let pageSize = sargs.GetResult(PageSize,1) + log.Information("Using CosmosDb Connection {connection} Database: {database} Collection: {collection} with page size: {pageSize}. " + "Request timeout: {timeout} with {retries} retries; throttling MaxRetryWaitTime {maxRetryWaitTime}", - connUri, dbName, collName, timeout, retries, maxRetryWaitTime) + connUri, dbName, collName, pageSize, timeout, retries, maxRetryWaitTime) let conn = Cosmos.connect log discovery timeout operationThrottling |> Async.RunSynchronously match sargs.TryGetSubCommand() with | Some (Provision args) -> @@ -317,7 +320,7 @@ let main argv = Equinox.Cosmos.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously 0 | Some (Run targs) -> - let conn = Store.Cosmos (Cosmos.createGateway conn defaultBatchSize, dbName, collName) + let conn = Store.Cosmos (Cosmos.createGateway conn (defaultBatchSize,pageSize), dbName, collName) let res = runTest log conn targs let stats = [ "Read", RuCounterSink.Read diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index f85bc7c26..ce4093ba3 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -40,6 +40,7 @@ type [] Batch = { /// DocDb-mandated Partition Key, must be maintained within the document /// Not actually required if running in single partition mode, but for simplicity, we always write it + [] // Not requested in queries p: string // "{streamName}" /// DocDb-mandated unique row key; needs to be unique within any partition it is maintained; must be string @@ -52,24 +53,24 @@ type [] /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed [] _etag: string - /// When we encounter the Tip, we're interested in the 'real i', which is kept in -_i for now - [] - _i: int64 /// base 'i' value for the Events held herein i: int64 // {index} + // `i` value for successor batch (to facilitate identifying which Batch a given startPos is within) + n: int64 // {index} + /// The events at this offset in the stream e: BatchEvent[] } /// Unless running in single partion mode (which would restrict us to 10GB per collection) /// we need to nominate a partition key that will be in every document static member PartitionKeyField = "p" /// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use - static member IndexedFields = [Batch.PartitionKeyField; "i"] + static member IndexedFields = [Batch.PartitionKeyField; "i"; "n"] /// If we encounter the tip (id=-1) doc, we're interested in its etag so we can re-sync for 1 RU member x.TryToPosition() = if x.id <> Tip.WellKnownDocumentId then None - else Some { index = x._i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x } + else Some { index = x.n; etag = match x._etag with null -> None | x -> Some x } /// A single event from the array held in a batch and [] BatchEvent = @@ -87,6 +88,7 @@ and [] [)>] [] m: byte[] } // optional + /// The Special 'Pending' Batch Format /// NB this Type does double duty as /// a) transport for when we read it @@ -96,7 +98,8 @@ and [] /// b) contains unfolds (`u`) and [] Tip = - { /// Partition key, as per Batch + { [] // Not requested in queries + /// Partition key, as per Batch p: string // "{streamName}" /// Document Id within partition, as per Batch id: string // "{-1}" - Well known IdConstant used while this remains the pending batch @@ -108,7 +111,10 @@ and [] _etag: string /// base 'i' value for the Events held herein - _i: int64 + i: int64 + + /// `i` value for successor batch (to facilitate identifying which Batch a given startPos is within) + n: int64 // {index} /// Events e: BatchEvent[] @@ -118,7 +124,7 @@ and [] /// arguably this should be a high nember to reflect fact it is the freshest ? static member WellKnownDocumentId = "-1" /// Create Position from Tip record context (facilitating 1 RU reads) - member x.ToPosition() = { index = x._i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x } + member x.ToPosition() = { index = x.n; etag = match x._etag with null -> None | x -> Some x } /// Compaction/Snapshot/Projection Event based on the state at a given point in time `i` and Unfold = { /// Base: Stream Position (Version) of State from which this Unfold Event was generated @@ -140,21 +146,31 @@ type Enum() = static member Events(b: Tip) = b.e |> Seq.mapi (fun offset x -> { new IIndexedEvent with - member __.Index = b._i + int64 offset + member __.Index = b.i + int64 offset member __.IsUnfold = false member __.EventType = x.c member __.Data = x.d member __.Meta = x.m }) - static member Events(i: int64, e: BatchEvent[]) = - e |> Seq.mapi (fun offset x -> - { new IIndexedEvent with - member __.Index = i + int64 offset - member __.IsUnfold = false - member __.EventType = x.c - member __.Data = x.d - member __.Meta = x.m }) - static member Events(b: Batch) = - Enum.Events (b.i, b.e) + static member Events(i: int64, e: BatchEvent[], startIndex, backward) = seq { + let isValidGivenStartIndex backward si i = + match si with + | Some si when backward -> i < si + | Some si -> i >= si + | _ -> true + for offset in 0..e.Length-1 do + let index = i + int64 offset + if isValidGivenStartIndex backward startIndex index then + let x = e.[offset] + yield { + new IIndexedEvent with + member __.Index = index + member __.IsUnfold = false + member __.EventType = x.c + member __.Data = x.d + member __.Meta = x.m } } + static member Events(b: Batch, startIndex, backward) = + Enum.Events(b.i, b.e, startIndex, backward) + |> if backward then System.Linq.Enumerable.Reverse else id static member Unfolds (xs: Unfold[]) = seq { for x in xs -> { new IIndexedEvent with member __.Index = x.i @@ -163,7 +179,10 @@ type Enum() = member __.Data = x.d member __.Meta = x.m } } static member EventsAndUnfolds(x: Tip): IIndexedEvent seq = - Enum.Unfolds x.u + Enum.Events x + |> Seq.append (Enum.Unfolds x.u) + // where Index is equal, unfolds get delivered after the events so the fold semantics can be 'idempotent' + |> Seq.sortBy (fun x -> x.Index, x.IsUnfold) /// Reference to Collection and name that will be used as the location for the stream type [] CollectionStream = { collectionUri: System.Uri; name: string } with @@ -272,35 +291,34 @@ module private DocDb = module Sync = // NB don't nest in a private module, or serialization will fail miserably ;) [] - type SyncResponse = { etag: string; nextI: int64; conflicts: BatchEvent[] } - let [] sprocName = "EquinoxSync-SingleArray-001" // NB need to renumber for any breaking change + type SyncResponse = { etag: string; n: int64; conflicts: BatchEvent[] } + let [] sprocName = "EquinoxSync002" // NB need to renumber for any breaking change let [] sprocBody = """ // Manages the merging of the supplied Request Batch, fulfilling one of the following end-states -// 1 Verify no current WIP batch, the incoming `req` becomes the WIP batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO) -// 2 Current WIP batch has space to accommodate the incoming projections (req.u) and events (req.e) - merge them in, replacing any superseded projections -// 3. Current WIP batch would become too large - remove WIP state from active document by replacing the well known id with a correct one; proceed as per 1 -function sync(req, expectedVersion) { +// 1 Verify no current Tip batch, the incoming `req` becomes the Tip batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO) +// 2 Current Tip batch has space to accommodate the incoming unfolds (req.u) and events (req.e) - merge them in, replacing any superseded unfolds +// 3 Current Tip batch would become too large - remove Tip-specific state from active doc by replacing the well known id with a correct one; proceed as per 1 +function sync(req, expectedVersion, maxEvents) { if (!req) throw new Error("Missing req argument"); const collection = getContext().getCollection(); const collectionLink = collection.getSelfLink(); const response = getContext().getResponse(); - // Locate the WIP (-1) batch (which may not exist) - const wipDocId = collection.getAltLink() + "/docs/" + req.id; - const isAccepted = collection.readDocument(wipDocId, {}, function (err, current) { + // Locate the Tip (-1) batch (which may not exist) + const tipDocId = collection.getAltLink() + "/docs/" + req.id; + const isAccepted = collection.readDocument(tipDocId, {}, function (err, current) { // Verify we dont have a conflicting write if (expectedVersion === -1) { executeUpsert(current); } else if (!current && expectedVersion !== 0) { - // If there is no WIP page, the writer has no possible reason for writing at an index other than zero - response.setBody({ etag: null, nextI: 0, conflicts: [] }); - } else if (current && expectedVersion !== current._i + current.e.length) { + // If there is no Tip page, the writer has no possible reason for writing at an index other than zero + response.setBody({ etag: null, n: 0, conflicts: [] }); + } else if (current && expectedVersion !== current.n) { // Where possible, we extract conflicting events from e and/or c in order to avoid another read cycle // yielding [] triggers the client to go loading the events itself - const conflicts = expectedVersion < current._i ? [] : current.e.slice(expectedVersion - current._i); - const nextI = current._i + current.e.length; - response.setBody({ etag: current._etag, nextI: nextI, conflicts: conflicts }); + const conflicts = expectedVersion < current.i ? [] : current.e.slice(expectedVersion - current.i); + response.setBody({ etag: current._etag, n: current.n, conflicts: conflicts }); } else { executeUpsert(current); } @@ -310,62 +328,43 @@ function sync(req, expectedVersion) { function executeUpsert(current) { function callback(err, doc) { if (err) throw err; - response.setBody({ etag: doc._etag, nextI: doc._i + doc.e.length, conflicts: null }); + response.setBody({ etag: doc._etag, n: doc.n, conflicts: null }); } - // If we have hit a sensible limit for a slice in the WIP document, trim the events - if (current && current.e.length + req.e.length > 10) { - current._i = current._i + current.e.length; - current.e = req.e; - current.u = req.u; + // `i` is established when first written; `n` needs to stay in step with i+batch.e.length + function pos(batch, i) { + batch.i = i + batch.n = batch.i + batch.e.length; + return batch; + } + // If we have hit a sensible limit for a slice, swap to a new one + if (current && current.e.length + req.e.length > maxEvents) { + // remove the well-known `id` value identifying the batch as being the Tip + current.id = current.i.toString(); + // ... As it's no longer a Tip batch, we definitely don't want unfolds taking up space + delete current.u; + + // TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches // as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place - finalize(current); - const isAccepted = collection.replaceDocument(current._self, current, { etag: current._etag }, callback); - if (!isAccepted) throw new Error("Unable to restart WIP batch."); + const tipUpdateAccepted = collection.replaceDocument(current._self, current, { etag: current._etag }, callback); + if (!tipUpdateAccepted) throw new Error("Unable to remove Tip markings."); + + const isAccepted = collection.createDocument(collectionLink, pos(req,current.n), { disableAutomaticIdGeneration: true }, callback); + if (!isAccepted) throw new Error("Unable to create Tip batch."); } else if (current) { // Append the new events into the current batch Array.prototype.push.apply(current.e, req.e); - // Replace all the projections + // Replace all the unfolds // TODO: should remove only unfolds being superseded current.u = req.u; - // TODO: should remove only projections being superseded // as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place - finalize(current); - const isAccepted = collection.replaceDocument(current._self, current, { etag: current._etag }, callback); - if (!isAccepted) throw new Error("Unable to replace WIP batch."); + const isAccepted = collection.replaceDocument(current._self, pos(current, current.i), { etag: current._etag }, callback); + if (!isAccepted) throw new Error("Unable to replace Tip batch."); } else { - current = req; - current._i = 0; - // concurrency control is by virtue of fact that any conflicting writer will encounter a primary key violation (which will result in a retry) - finalize(current); - const isAccepted = collection.createDocument(collectionLink, current, { disableAutomaticIdGeneration: true }, callback); - if (!isAccepted) throw new Error("Unable to create WIP batch."); - } - for (i = 0; i < req.e.length; i++) { - const e = req.e[i]; - const eventI = current._i + current.e.length - req.e.length + i; - const doc = { - p: req.p, - id: eventI.toString(), - i: eventI, - e: [ { - c: e.c, - t: e.t, - d: e.d, - m: e.m - }] - }; - const isAccepted = collection.createDocument(collectionLink, doc, function (err) { - if (err) throw err; - }); - if (!isAccepted) throw new Error("Unable to add event " + doc.i); + const isAccepted = collection.createDocument(collectionLink, pos(req,0), { disableAutomaticIdGeneration: true }, callback); + if (!isAccepted) throw new Error("Unable to create Tip batch."); } } - - function finalize(current) { - current.i = -1; - current.id = current.i.toString(); - } }""" [] @@ -374,23 +373,23 @@ function sync(req, expectedVersion) { | Conflict of Position * events: IIndexedEvent[] | ConflictUnknown of Position - let private run (client: IDocumentClient) (stream: CollectionStream) (expectedVersion: int64 option, req: Tip) + let private run (client: IDocumentClient) (stream: CollectionStream) (expectedVersion: int64 option, req: Tip, maxEvents: int) : Async = async { let sprocLink = sprintf "%O/sprocs/%s" stream.collectionUri sprocName let opts = Client.RequestOptions(PartitionKey=PartitionKey(stream.name)) let! ct = Async.CancellationToken let ev = match expectedVersion with Some ev -> Position.FromI ev | None -> Position.FromAppendAtEnd let! (res : Client.StoredProcedureResponse) = - client.ExecuteStoredProcedureAsync(sprocLink, opts, ct, box req, box ev.index) |> Async.AwaitTaskCorrect + client.ExecuteStoredProcedureAsync(sprocLink, opts, ct, box req, box ev.index, box maxEvents) |> Async.AwaitTaskCorrect - let newPos = { index = res.Response.nextI; etag = Option.ofObj res.Response.etag } + let newPos = { index = res.Response.n; etag = Option.ofObj res.Response.etag } return res.RequestCharge, res.Response.conflicts |> function | null -> Result.Written newPos | [||] when newPos.index = 0L -> Result.Conflict (newPos, Array.empty) | [||] -> Result.ConflictUnknown newPos - | xs -> Result.Conflict (newPos, Enum.Events (ev.index, xs) |> Array.ofSeq) } + | xs -> Result.Conflict (newPos, Enum.Events(ev.index, xs, None, false) |> Array.ofSeq) } - let private logged client (stream: CollectionStream) (expectedVersion, req: Tip) (log : ILogger) + let private logged client (stream: CollectionStream) (expectedVersion, req: Tip, maxEvents) (log : ILogger) : Async = async { let verbose = log.IsEnabled Events.LogEventLevel.Debug let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.u else log @@ -399,7 +398,7 @@ function sync(req, expectedVersion) { let writeLog = log |> Log.prop "stream" stream.name |> Log.prop "expectedVersion" expectedVersion |> Log.prop "count" req.e.Length |> Log.prop "ucount" req.u.Length - let! t, (ru,result) = run client stream (expectedVersion, req) |> Stopwatch.Time + let! t, (ru,result) = run client stream (expectedVersion, req, maxEvents) |> Stopwatch.Time let resultLog = let mkMetric ru : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } let logConflict () = writeLog.Information("EqxCosmos Sync: Conflict writing {eventTypes}", [| for x in req.e -> x.c |]) @@ -420,7 +419,7 @@ function sync(req, expectedVersion) { let call = logged client pk batch Log.withLoggedRetries retryPolicy "writeAttempt" call log let mkBatch (stream: Store.CollectionStream) (events: IEvent[]) unfolds: Tip = - { p = stream.name; id = Store.Tip.WellKnownDocumentId; _i = -1L(*Server-managed*); _etag = null + { p = stream.name; id = Store.Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null e = [| for e in events -> { t = DateTimeOffset.UtcNow; c = e.EventType; d = e.Data; m = e.Meta } |] u = Array.ofSeq unfolds } let mkUnfold baseIndex (unfolds: IEvent seq) : Store.Unfold seq = @@ -498,11 +497,12 @@ module private Tip = open Microsoft.Azure.Documents.Linq let private mkQuery (client : IDocumentClient) maxItems (stream: CollectionStream) (direction: Direction) (startPos: Position option) = let querySpec = + let fields = "c.id, c.i, c._etag, c.n, c.e" match startPos with - | None -> SqlQuerySpec("SELECT * FROM c WHERE c.i!=-1 ORDER BY c.i " + if direction = Direction.Forward then "ASC" else "DESC") + | None -> SqlQuerySpec(sprintf "SELECT %s FROM c ORDER BY c.i " fields + if direction = Direction.Forward then "ASC" else "DESC") | Some p -> - let f = if direction = Direction.Forward then "c.i >= @id ORDER BY c.i ASC" else "c.i < @id ORDER BY c.i DESC" - SqlQuerySpec("SELECT * FROM c WHERE c.i != -1 AND " + f, SqlParameterCollection [SqlParameter("@id", p.index)]) + let f = if direction = Direction.Forward then "c.n > @id ORDER BY c.i ASC" else "c.i < @id ORDER BY c.i DESC" + SqlQuerySpec(sprintf "SELECT %s FROM c WHERE " fields + f, SqlParameterCollection [SqlParameter("@id", p.index)]) let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems) client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() @@ -512,14 +512,14 @@ module private Tip = let! ct = Async.CancellationToken let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time let batches, ru = Array.ofSeq res, res.RequestCharge - let events = batches |> Seq.collect Enum.Events |> Array.ofSeq + let startIndex = match startPos with Some { index = i } -> Some i | _ -> None + let events = batches |> Seq.collect (fun b -> Enum.Events(b, startIndex, backward=(direction = Direction.Backward))) |> Array.ofSeq let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } - // TODO investigate whether there is a way to avoid the potential cost (or whether there is significance to it) of these null responses - let log = if batches.Length = 0 && count = 0 && ru = 0. then log else let evt = Log.Response (direction, reqMetric) in log |> Log.event evt + let log = let evt = Log.Response (direction, reqMetric) in log |> Log.event evt let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEvents events let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i }) - (log |> Log.prop "startIndex" (match startPos with Some { index = i } -> Nullable i | _ -> Nullable()) |> Log.prop "bytes" bytes) + (log |> Log.prop "startIndex" (match startIndex with Some i -> Nullable i | None -> Nullable()) |> Log.prop "bytes" bytes) .Information("EqxCosmos {action:l} {count}/{batches} {direction} {ms}ms i={index} rc={ru}", "Response", count, batches.Length, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru) let maybePosition = batches |> Array.tryPick (fun x -> x.TryToPosition()) @@ -541,15 +541,13 @@ module private Tip = yield! loop (batchCount + 1) } loop 0 - let private logQuery direction batchSize streamName interval (responsesCount, events : IIndexedEvent []) nextI (ru: float) (log : ILogger) = + let private logQuery direction batchSize streamName interval (responsesCount, events : IIndexedEvent []) n (ru: float) (log : ILogger) = let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } let action = match direction with Direction.Forward -> "QueryF" | Direction.Backward -> "QueryB" - // TODO investigate whether there is a way to avoid the potential cost (or whether there is significance to it) of these null responses - let log = if count = 0 && ru = 0. then log else let evt = Log.Event.Query (direction, responsesCount, reqMetric) in log |> Log.event evt - (log |> Log.prop "bytes" bytes |> Log.prop "batchSize" batchSize).Information( - "EqxCosmos {action:l} {stream} v{nextI} {count}/{responses} {ms}ms rc={ru}", - action, streamName, nextI, count, responsesCount, (let e = interval.Elapsed in e.TotalMilliseconds), ru) + (log |> Log.prop "bytes" bytes |> Log.prop "batchSize" batchSize |> Log.event (Log.Event.Query (direction, responsesCount, reqMetric))).Information( + "EqxCosmos {action:l} {stream} v{n} {count}/{responses} {ms}ms rc={ru}", + action, streamName, n, count, responsesCount, (let e = interval.Elapsed in e.TotalMilliseconds), ru) let private calculateUsedVersusDroppedPayload stopIndex (xs: IIndexedEvent[]) : int * int = let mutable used, dropped = 0, 0 @@ -676,25 +674,29 @@ module Internal = type LoadFromTokenResult<'event> = Unchanged | Found of Storage.StreamToken * 'event[] /// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts) -type EqxConnection(client: IDocumentClient, ?readRetryPolicy: IRetryPolicy, ?writeRetryPolicy: IRetryPolicy) = +type EqxConnection(client: IDocumentClient, ?readRetryPolicy: IRetryPolicy, ?writeRetryPolicy) = member __.Client = client member __.TipRetryPolicy = readRetryPolicy member __.QueryRetryPolicy = readRetryPolicy member __.WriteRetryPolicy = writeRetryPolicy -/// Defines the policies in force regarding how to constrain query responses +/// Defines the policies in force regarding how to a) split up calls b) limit the number of events per slice type EqxBatchingPolicy ( // Max items to request in query response. Defaults to 10. ?defaultMaxItems : int, // Dynamic version of `defaultMaxItems`, allowing one to react to dynamic configuration changes. Default to using `defaultMaxItems` ?getDefaultMaxItems : unit -> int, /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices`. Default: unlimited. - ?maxRequests) = + ?maxRequests, + /// Maximum number of events to accumualte within the `WipBatch` before switching to a new one when adding Events. Defaults to 10. + ?maxEventsPerSlice) = let getdefaultMaxItems = defaultArg getDefaultMaxItems (fun () -> defaultArg defaultMaxItems 10) /// Limit for Maximum number of `Batch` records in a single query batch response member __.MaxItems = getdefaultMaxItems () - /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices` + /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxItems` member __.MaxRequests = maxRequests + /// Maximum number of events to accumulate within the `Tip` before switching to a new one when adding Events + member __.MaxEventsPerSlice = defaultArg maxEventsPerSlice 10 type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = let (|FromUnfold|_|) (tryDecode: #IEvent -> 'event option) (isOrigin: 'event -> bool) (xs:#IEvent[]) : Option<'event[]> = @@ -732,7 +734,7 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = | _ -> let! res = __.Read log stream Direction.Forward (Some pos) (tryDecode,isOrigin) return LoadFromTokenResult.Found res } member __.Sync log stream (expectedVersion, batch: Store.Tip): Async = async { - let! wr = Sync.batch log conn.WriteRetryPolicy conn.Client stream (expectedVersion,batch) + let! wr = Sync.batch log conn.WriteRetryPolicy conn.Client stream (expectedVersion,batch,batching.MaxItems) match wr with | Sync.Result.Conflict (pos',events) -> return InternalSyncResult.Conflict (Token.create stream pos',events) | Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create stream pos') @@ -1006,9 +1008,13 @@ type EqxContext /// Defaults to 10 ?defaultMaxItems, /// Alternate way of specifying defaultMaxItems which facilitates reading it from a cached dynamic configuration - ?getDefaultMaxItems) = + ?getDefaultMaxItems, + /// Threshold defining the number of events a slice is allowed to hold before switching to a new Batch is triggered. + /// Defaults to 1 + ?maxEventsPerSlice) = let getDefaultMaxItems = match getDefaultMaxItems with Some f -> f | None -> fun () -> defaultArg defaultMaxItems 10 - let batching = EqxBatchingPolicy(getDefaultMaxItems=getDefaultMaxItems) + let maxEventsPerSlice = defaultArg maxEventsPerSlice 1 + let batching = EqxBatchingPolicy(getDefaultMaxItems=getDefaultMaxItems, maxEventsPerSlice=maxEventsPerSlice) let gateway = EqxGateway(conn, batching) let maxCountPredicate count = @@ -1026,7 +1032,8 @@ type EqxContext member internal __.GetLazy((stream, startPos), ?batchSize, ?direction) : AsyncSeq = let direction = defaultArg direction Direction.Forward - let batching = EqxBatchingPolicy(defaultArg batchSize 10) + let batchSize = defaultArg batchSize batching.MaxItems * maxEventsPerSlice + let batching = EqxBatchingPolicy(if batchSize < maxEventsPerSlice then 1 else batchSize/maxEventsPerSlice) gateway.ReadLazy batching logger stream direction startPos (Some,fun _ -> false) member internal __.GetInternal((stream, startPos), ?maxCount, ?direction) = async { diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index f7002401a..29864744e 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -32,7 +32,7 @@ type Tests(testOutputHelper) = incr testIterations sprintf "events-%O-%i" name !testIterations let mkContextWithItemLimit conn defaultBatchSize = - EqxContext(conn,collections,log,?defaultMaxItems=defaultBatchSize) + EqxContext(conn,collections,log,?defaultMaxItems=defaultBatchSize,maxEventsPerSlice=10) let mkContext conn = mkContextWithItemLimit conn None let verifyRequestChargesMax rus = @@ -48,7 +48,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName index <| EventData.Create(0,1) test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 14 // observed 12.03 // was 10 + verifyRequestChargesMax 10 // Clear the counters capture.Clear() @@ -85,7 +85,6 @@ type Tests(testOutputHelper) = let xs, baseIndex = if direction = Direction.Forward then xs, baseIndex else Array.rev xs, baseIndex - int64 (Array.length expected) + 1L - test <@ expected.Length = xs.Length @> test <@ [for i in 0..expected.Length - 1 -> baseIndex + int64 i] = [for r in xs -> r.Index] @> test <@ [for e in expected -> e.EventType] = [ for r in xs -> r.EventType ] @> for i,x,y in Seq.mapi2 (fun i x y -> i,x,y) [for e in expected -> e.Data] [for r in xs -> r.Data] do @@ -107,28 +106,25 @@ type Tests(testOutputHelper) = capture.Clear() let mutable pos = 0L - let ae = false // TODO fix bug for appendBatchSize in [4; 5; 9] do - if ae then - let! res = Events.appendAtEnd ctx streamName <| EventData.Create (int pos,appendBatchSize) - pos <- pos + int64 appendBatchSize - //let! res = Events.append ctx streamName pos (Array.replicate appendBatchSize event) - test <@ [EqxAct.Append] = capture.ExternalCalls @> - pos =! res - else - let! res = Events.append ctx streamName pos <| EventData.Create (int pos,appendBatchSize) - pos <- pos + int64 appendBatchSize - //let! res = Events.append ctx streamName pos (Array.replicate appendBatchSize event) - test <@ [EqxAct.Append] = capture.ExternalCalls @> - AppendResult.Ok pos =! res - verifyRequestChargesMax 50 // was 20, observed 41.64 // 15.59 observed + let! res = Events.appendAtEnd ctx streamName <| EventData.Create (int pos,appendBatchSize) + test <@ [EqxAct.Append] = capture.ExternalCalls @> + pos <- pos + int64 appendBatchSize + pos =! res + verifyRequestChargesMax 20 // 15.59 observed + capture.Clear() + + let! res = Events.getNextIndex ctx streamName + test <@ [EqxAct.Tip] = capture.ExternalCalls @> + verifyRequestChargesMax 2 + pos =! res capture.Clear() let! res = Events.appendAtEnd ctx streamName <| EventData.Create (int pos,42) pos <- pos + 42L pos =! res test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 180 // observed 167.32 // was 20 + verifyRequestChargesMax 20 capture.Clear() let! res = Events.getNextIndex ctx streamName @@ -140,11 +136,10 @@ type Tests(testOutputHelper) = // Demonstrate benefit/mechanism for using the Position-based API to avail of the etag tracking let stream = ctx.CreateStream streamName - let max = 2000 // observed to time out server side // WAS 5000 - let extrasCount = match extras with x when x * 100 > max -> max | x when x < 1 -> 1 | x -> x*100 + let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100 let! _pos = ctx.NonIdempotentAppend(stream, EventData.Create (int pos,extrasCount)) test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 7000 // 6867.7 observed // was 300 // 278 observed + verifyRequestChargesMax 300 // 278 observed capture.Clear() let! pos = ctx.Sync(stream,?position=None) @@ -176,7 +171,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName 0L expected test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 14 // observed 12.73 // was 10 + verifyRequestChargesMax 10 capture.Clear() // Try overwriting it (a competing consumer would see the same) @@ -186,7 +181,7 @@ type Tests(testOutputHelper) = | AppendResult.Conflict (1L, e) -> verifyCorrectEvents 0L expected e | x -> x |> failwithf "Unexpected %A" test <@ [EqxAct.Resync] = capture.ExternalCalls @> - verifyRequestChargesMax 5 // observed 4.21 // was 4 + verifyRequestChargesMax 5 // 4.02 capture.Clear() } @@ -205,29 +200,28 @@ type Tests(testOutputHelper) = verifyCorrectEvents 1L expected res - test <@ List.replicate 2 EqxAct.ResponseForward @ [EqxAct.QueryForward] = capture.ExternalCalls @> - verifyRequestChargesMax 8 // observed 6.14 // was 3 + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + verifyRequestChargesMax 4 // 3.14 // was 3 before introduction of multi-event batches } [] let ``get (in 2 batches)`` (TestStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContextWithItemLimit conn (Some 2) + let ctx = mkContextWithItemLimit conn (Some 1) let! expected = add6EventsIn2Batches ctx streamName - let expected = Array.tail expected |> Array.take 3 + let expected = expected |> Array.take 3 - let! res = Events.get ctx streamName 1L 3 + let! res = Events.get ctx streamName 0L 3 - verifyCorrectEvents 1L expected res + verifyCorrectEvents 0L expected res // 2 items atm test <@ [EqxAct.ResponseForward; EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> - verifyRequestChargesMax 7 // observed 6.14 // was 6 - } + verifyRequestChargesMax 6 } // 5.77 [] - let getAll (TestStream streamName) = Async.RunSynchronously <| async { + let ``get Lazy`` (TestStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log let ctx = mkContextWithItemLimit conn (Some 1) @@ -242,7 +236,7 @@ type Tests(testOutputHelper) = let queryRoundTripsAndItemCounts = function EqxEvent (Log.Query (Direction.Forward, responses, { count = c })) -> Some (responses,c) | _ -> None // validate that, despite only requesting max 1 item, we only needed one trip (which contained only one item) [1,1] =! capture.ChooseCalls queryRoundTripsAndItemCounts - verifyRequestChargesMax 4 // 3.07 // was 3 // 2.94 + verifyRequestChargesMax 3 // 2.97 } (* Backward *) @@ -250,39 +244,55 @@ type Tests(testOutputHelper) = [] let getBackwards (TestStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContextWithItemLimit conn (Some 2) + let ctx = mkContextWithItemLimit conn (Some 1) let! expected = add6EventsIn2Batches ctx streamName // We want to skip reading the last - let expected = Array.take 5 expected + let expected = Array.take 5 expected |> Array.tail - let! res = Events.getBackwards ctx streamName 4L 5 + let! res = Events.getBackwards ctx streamName 4L 4 verifyCorrectEventsBackward 4L expected res - test <@ List.replicate 3 EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @> - verifyRequestChargesMax 10 // observed 8.98 // was 3 + test <@ [EqxAct.ResponseBackward; EqxAct.QueryBackward] = capture.ExternalCalls @> + verifyRequestChargesMax 3 } - // TODO 2 batches backward test + [] + let ``getBackwards (2 batches)`` (TestStream streamName) = Async.RunSynchronously <| async { + let! conn = connectToSpecifiedCosmosOrSimulator log + let ctx = mkContextWithItemLimit conn (Some 1) + + let! expected = add6EventsIn2Batches ctx streamName + + // We want to skip reading the last two, which means getting both, but disregarding some of the second batch + let expected = Array.take 4 expected + + let! res = Events.getBackwards ctx streamName 3L 4 + + verifyCorrectEventsBackward 3L expected res + + test <@ List.replicate 2 EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @> + verifyRequestChargesMax 6 // 5.77 + } [] - let getAllBackwards (TestStream streamName) = Async.RunSynchronously <| async { + let ``getBackwards Lazy`` (TestStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContextWithItemLimit conn (Some 2) + let ctx = mkContextWithItemLimit conn (Some 1) let! expected = add6EventsIn2Batches ctx streamName capture.Clear() - let! res = Events.getAllBackwards ctx streamName 10L 2 |> AsyncSeq.concatSeq |> AsyncSeq.takeWhileInclusive (fun x -> x.Index <> 2L) |> AsyncSeq.toArrayAsync - let expected = expected |> Array.skip 2 + let! res = Events.getAllBackwards ctx streamName 10L 1 |> AsyncSeq.concatSeq |> AsyncSeq.takeWhileInclusive (fun x -> x.Index <> 2L) |> AsyncSeq.toArrayAsync + let expected = expected |> Array.skip 2 // omit index 0, 1 as we vote to finish at 2L verifyCorrectEventsBackward 5L expected res - // only 2 batches of 2 items triggered - test <@ List.replicate 2 EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @> - // validate that we didnt trigger loading of the last item + // only 1 request of 1 item triggered + test <@ [EqxAct.ResponseBackward; EqxAct.QueryBackward] = capture.ExternalCalls @> + // validate that, despite only requesting max 1 item, we only needed one trip, bearing 5 items (from which one item was omitted) let queryRoundTripsAndItemCounts = function EqxEvent (Log.Query (Direction.Backward, responses, { count = c })) -> Some (responses,c) | _ -> None - [2,4] =! capture.ChooseCalls queryRoundTripsAndItemCounts - verifyRequestChargesMax 7 // observed 6.03 // was 3 // 2.95 + [1,5] =! capture.ChooseCalls queryRoundTripsAndItemCounts + verifyRequestChargesMax 3 // 2.98 } \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs b/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs index 467c1bf57..b6515d81d 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs @@ -34,4 +34,7 @@ let collections = let createEqxStore connection batchSize = let gateway = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=batchSize)) + EqxStore(gateway, collections) +let createEqxStoreWithMaxEventsPerSlice connection batchSize maxEventsPerSlice = + let gateway = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=batchSize, maxEventsPerSlice=maxEventsPerSlice)) EqxStore(gateway, collections) \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 49a21caf0..1ca91754f 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -18,6 +18,10 @@ module Cart = let store = createEqxStore connection batchSize let resolveStream = EqxStreamBuilder(store, codec, fold, initial).Create Backend.Cart.Service(log, resolveStream) + let createServiceWithoutOptimizationAndMaxItems connection batchSize maxEventsPerSlice log = + let store = createEqxStoreWithMaxEventsPerSlice connection batchSize maxEventsPerSlice + let resolveStream = EqxStreamBuilder(store, codec, fold, initial).Create + Backend.Cart.Service(log, resolveStream) let projection = "Compacted",snd snapshot let createServiceWithProjection connection batchSize log = let store = createEqxStore connection batchSize @@ -66,8 +70,8 @@ type Tests(testOutputHelper) = let! conn = connectToSpecifiedCosmosOrSimulator log let maxItemsPerRequest = 2 - let maxEventsPerBatch = 1 - let service = Cart.createServiceWithoutOptimization conn maxItemsPerRequest log + let maxEventsPerBatch = 3 + let service = Cart.createServiceWithoutOptimizationAndMaxItems conn maxItemsPerRequest maxEventsPerBatch log capture.Clear() // for re-runs of the test let cartId = Guid.NewGuid() |> CartId @@ -82,7 +86,7 @@ type Tests(testOutputHelper) = | 1 -> 1 // it does cost a single trip to determine there are 0 items | i -> ceil(float (i-1) * float eventsPerAction / float maxItemsPerRequest / float maxEventsPerBatch) |> int test <@ List.replicate expectedBatchesOf2Items EqxAct.ResponseBackward @ [EqxAct.QueryBackward; EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 39 // 37.15 + verifyRequestChargesMax 25 // 20.59 capture.Clear() // Validate basic operation; Key side effect: Log entries will be emitted to `capture` @@ -93,7 +97,7 @@ type Tests(testOutputHelper) = // Need 6 trips of 2 maxItemsPerRequest to read 12 events test <@ let expectedResponses = ceil(float expectedEventCount/float maxItemsPerRequest/float maxEventsPerBatch) |> int List.replicate expectedResponses EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @> - verifyRequestChargesMax 20 // 18.47 + verifyRequestChargesMax 7 // 5.93 } [] diff --git a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs index 5d99c734e..24e207fb9 100644 --- a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs +++ b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs @@ -22,7 +22,7 @@ type VerbatimUtf8Tests() = let ``encodes correctly`` () = let encoded = mkUnionEncoder().Encode(A { embed = "\"" }) let e : Store.Batch = - { p = "streamName"; id = string 0; i = 0L; _i = 0L; _etag = null + { p = "streamName"; id = string 0; i = -1L; n = -1L; _etag = null e = [| { t = DateTimeOffset.MinValue; c = encoded.caseName; d = encoded.payload; m = null } |] } let res = JsonConvert.SerializeObject(e) test <@ res.Contains """"d":{"embed":"\""}""" @>