diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index b88073de4..d94ccef06 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -293,13 +293,14 @@ module Sync = // NB don't nest in a private module, or serialization will fail miserably ;) [] type SyncResponse = { etag: string; n: int64; conflicts: Event[] } - let [] sprocName = "EquinoxSync002" // NB need to renumber for any breaking change + let [] sprocName = "EquinoxNoTipEvents" // NB need to renumber for any breaking change let [] sprocBody = """ // Manages the merging of the supplied Request Batch, fulfilling one of the following end-states -// 1 Verify no current Tip batch, the incoming `req` becomes the Tip batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO) -// 2 Current Tip batch has space to accommodate the incoming unfolds (req.u) and events (req.e) - merge them in, replacing any superseded unfolds -// 3 Current Tip batch would become too large - remove Tip-specific state from active doc by replacing the well known id with a correct one; proceed as per 1 +// 1 perform expectedVersion verification (can request inhibiting of check by supplying -1) +// 2a Verify no current Tip; if so - incoming req.e and defines the 'next' position / unfolds +// 2b If we already have a tip, move position forward, replace unfolds +// 3 insert a new document containing the events as part of the same batch of work function sync(req, expectedVersion, maxEvents) { if (!req) throw new Error("Missing req argument"); const collection = getContext().getCollection(); @@ -316,10 +317,7 @@ function sync(req, expectedVersion, maxEvents) { // If there is no Tip page, the writer has no possible reason for writing at an index other than zero response.setBody({ etag: null, n: 0, conflicts: [] }); } else if (current && expectedVersion !== current.n) { - // Where possible, we extract conflicting events from e and/or c in order to avoid another read cycle - // yielding [] triggers the client to go loading the events itself - const conflicts = expectedVersion < current.i ? [] : current.e.slice(expectedVersion - current.i); - response.setBody({ etag: current._etag, n: current.n, conflicts: conflicts }); + response.setBody({ etag: current._etag, n: current.n, conflicts: [] }); } else { executeUpsert(current); } @@ -331,40 +329,27 @@ function sync(req, expectedVersion, maxEvents) { if (err) throw err; response.setBody({ etag: doc._etag, n: doc.n, conflicts: null }); } - // `i` is established when first written; `n` needs to stay in step with i+batch.e.length - function pos(batch, i) { - batch.i = i - batch.n = batch.i + batch.e.length; - return batch; - } - // If we have hit a sensible limit for a slice, swap to a new one - if (current && current.e.length + req.e.length > maxEvents) { - // remove the well-known `id` value identifying the batch as being the Tip - current.id = current.i.toString(); - // ... As it's no longer a Tip batch, we definitely don't want unfolds taking up space - delete current.u; - - // TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches - - // as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place - const tipUpdateAccepted = collection.replaceDocument(current._self, current, { etag: current._etag }, callback); - if (!tipUpdateAccepted) throw new Error("Unable to remove Tip markings."); - - const isAccepted = collection.createDocument(collectionLink, pos(req,current.n), { disableAutomaticIdGeneration: true }, callback); - if (!isAccepted) throw new Error("Unable to create Tip batch."); - } else if (current) { - // Append the new events into the current batch - Array.prototype.push.apply(current.e, req.e); - // Replace all the unfolds // TODO: should remove only unfolds being superseded - current.u = req.u; + var tip; + if (!current) { + tip = { p: req.p, id: req.id, i: req.e.length, n: req.e.length, e: [], u: req.u }; + const tipAccepted = collection.createDocument(collectionLink, tip, { disableAutomaticIdGeneration: true }, callback); + if (!tipAccepted) throw new Error("Unable to create Tip."); + } else { + // TODO Carry forward `u` items not in `req`, together with supporting catchup events from preceding batches + const n = current.n + req.e.length; + tip = { p: current.p, id: current.id, i: n, n: n, e: [], u: req.u }; // as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place - const isAccepted = collection.replaceDocument(current._self, pos(current, current.i), { etag: current._etag }, callback); - if (!isAccepted) throw new Error("Unable to replace Tip batch."); - } else { - const isAccepted = collection.createDocument(collectionLink, pos(req,0), { disableAutomaticIdGeneration: true }, callback); - if (!isAccepted) throw new Error("Unable to create Tip batch."); + const tipAccepted = collection.replaceDocument(current._self, tip, { etag: current._etag }, callback); + if (!tipAccepted) throw new Error("Unable to replace Tip."); } + // For now, always do an Insert, as Change Feed mechanism does not yet afford us a way to + // a) guarantee an item per write (can be squashed) + // b) with metadata sufficient for us to determine the items added (only etags, no way to convey i/n in feed item) + const i = tip.n - req.e.length; + const batch = { p: tip.p, id: i.toString(), i: i, n: tip.n, e: req.e }; + const batchAccepted = collection.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true }); + if (!batchAccepted) throw new Error("Unable to insert Batch."); } }""" @@ -499,12 +484,13 @@ module internal Tip = open FSharp.Control let private mkQuery (client : IDocumentClient) maxItems (stream: CollectionStream) (direction: Direction) startPos = let querySpec = - let fields = "c.id, c.i, c._etag, c.n, c.e" + let root = sprintf "SELECT c.id, c.i, c._etag, c.n, c.e FROM c WHERE c.id!=\"%s\"" Tip.WellKnownDocumentId + let tail = sprintf "ORDER BY c.i %s" (if direction = Direction.Forward then "ASC" else "DESC") match startPos with - | None -> SqlQuerySpec(sprintf "SELECT %s FROM c ORDER BY c.i " fields + if direction = Direction.Forward then "ASC" else "DESC") + | None -> SqlQuerySpec(sprintf "%s %s" root tail) | Some { index = positionSoExclusiveWhenBackward } -> - let f = if direction = Direction.Forward then "c.n > @startPos ORDER BY c.i ASC" else "c.i < @startPos ORDER BY c.i DESC" - SqlQuerySpec(sprintf "SELECT %s FROM c WHERE " fields + f, SqlParameterCollection [SqlParameter("@startPos", positionSoExclusiveWhenBackward)]) + let cond = if direction = Direction.Forward then "c.n > @startPos" else "c.i < @startPos" + SqlQuerySpec(sprintf "%s AND %s %s" root cond tail, SqlParameterCollection [SqlParameter("@startPos", positionSoExclusiveWhenBackward)]) let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems) client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index 388fb2d12..1ced8d174 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -46,7 +46,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName index <| TestEvents.Create(0,1) test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 10 + verifyRequestChargesMax 12 // 11.78 // WAS 10 // Clear the counters capture.Clear() @@ -169,16 +169,24 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName 0L expected test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 11 // 10.33 + verifyRequestChargesMax 12 // 11.35 WAS 11 // 10.33 capture.Clear() // Try overwriting it (a competing consumer would see the same) let! res = Events.append ctx streamName 0L <| TestEvents.Create(-42,2) // This time we get passed the conflicting events - we pay a little for that, but that's unavoidable match res with +#if EVENTS_IN_TIP | AppendResult.Conflict (1L, e) -> verifyCorrectEvents 0L expected e +#else + | AppendResult.ConflictUnknown 1L -> () +#endif | x -> x |> failwithf "Unexpected %A" +#if EVENTS_IN_TIP test <@ [EqxAct.Resync] = capture.ExternalCalls @> +#else + test <@ [EqxAct.Conflict] = capture.ExternalCalls @> +#endif verifyRequestChargesMax 5 // 4.02 capture.Clear() } @@ -236,7 +244,7 @@ type Tests(testOutputHelper) = | _ -> None // validate that, despite only requesting max 1 item, we only needed one trip (which contained only one item) [1,1] =! capture.ChooseCalls queryRoundTripsAndItemCounts - verifyRequestChargesMax 3 // 2.97 + verifyRequestChargesMax 4 // 3.02 // WAS 3 // 2.97 } (* Backward *) @@ -256,7 +264,7 @@ type Tests(testOutputHelper) = verifyCorrectEventsBackward 4L expected res test <@ [EqxAct.ResponseBackward; EqxAct.QueryBackward] = capture.ExternalCalls @> - verifyRequestChargesMax 3 + verifyRequestChargesMax 4 // 3.04 // WAS 3 } [] @@ -300,5 +308,5 @@ type Tests(testOutputHelper) = | EqxEvent (Equinox.Cosmos.Store.Log.Event.Query (Equinox.Cosmos.Store.Direction.Backward, responses, { count = c })) -> Some (responses,c) | _ -> None [1,5] =! capture.ChooseCalls queryRoundTripsAndItemCounts - verifyRequestChargesMax 3 // 2.98 + verifyRequestChargesMax 4 // 3.04 // WAS 3 // 2.98 } \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index ef40a1e43..23db4fa03 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -175,9 +175,15 @@ type Tests(testOutputHelper) = && has sku21 21 && has sku22 22 @> // Intended conflicts pertained let conflict = function EqxAct.Conflict | EqxAct.Resync as x -> Some x | _ -> None +#if EVENTS_IN_TIP test <@ let c2 = List.choose conflict capture2.ExternalCalls [EqxAct.Resync] = List.choose conflict capture1.ExternalCalls && [EqxAct.Resync] = c2 @> +#else + test <@ let c2 = List.choose conflict capture2.ExternalCalls + [EqxAct.Conflict] = List.choose conflict capture1.ExternalCalls + && [EqxAct.Conflict] = c2 @> +#endif } let singleBatchBackwards = [EqxAct.ResponseBackward; EqxAct.QueryBackward]