Skip to content

Commit

Permalink
Remove tip isa Batch semantics (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 14, 2018
1 parent ef16f17 commit 5e95bf0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 48 deletions.
72 changes: 29 additions & 43 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,14 @@ module Sync =
// NB don't nest in a private module, or serialization will fail miserably ;)
[<CLIMutable; NoEquality; NoComparison; Newtonsoft.Json.JsonObject(ItemRequired=Newtonsoft.Json.Required.AllowNull)>]
type SyncResponse = { etag: string; n: int64; conflicts: Event[] }
let [<Literal>] sprocName = "EquinoxSync002" // NB need to renumber for any breaking change
let [<Literal>] sprocName = "EquinoxNoTipEvents" // NB need to renumber for any breaking change
let [<Literal>] sprocBody = """
// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states
// 1 Verify no current Tip batch, the incoming `req` becomes the Tip batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO)
// 2 Current Tip batch has space to accommodate the incoming unfolds (req.u) and events (req.e) - merge them in, replacing any superseded unfolds
// 3 Current Tip batch would become too large - remove Tip-specific state from active doc by replacing the well known id with a correct one; proceed as per 1
// 1 perform expectedVersion verification (can request inhibiting of check by supplying -1)
// 2a Verify no current Tip; if so - incoming req.e and defines the 'next' position / unfolds
// 2b If we already have a tip, move position forward, replace unfolds
// 3 insert a new document containing the events as part of the same batch of work
function sync(req, expectedVersion, maxEvents) {
if (!req) throw new Error("Missing req argument");
const collection = getContext().getCollection();
Expand All @@ -316,10 +317,7 @@ function sync(req, expectedVersion, maxEvents) {
// If there is no Tip page, the writer has no possible reason for writing at an index other than zero
response.setBody({ etag: null, n: 0, conflicts: [] });
} else if (current && expectedVersion !== current.n) {
// Where possible, we extract conflicting events from e and/or c in order to avoid another read cycle
// yielding [] triggers the client to go loading the events itself
const conflicts = expectedVersion < current.i ? [] : current.e.slice(expectedVersion - current.i);
response.setBody({ etag: current._etag, n: current.n, conflicts: conflicts });
response.setBody({ etag: current._etag, n: current.n, conflicts: [] });
} else {
executeUpsert(current);
}
Expand All @@ -331,40 +329,27 @@ function sync(req, expectedVersion, maxEvents) {
if (err) throw err;
response.setBody({ etag: doc._etag, n: doc.n, conflicts: null });
}
// `i` is established when first written; `n` needs to stay in step with i+batch.e.length
function pos(batch, i) {
batch.i = i
batch.n = batch.i + batch.e.length;
return batch;
}
// If we have hit a sensible limit for a slice, swap to a new one
if (current && current.e.length + req.e.length > maxEvents) {
// remove the well-known `id` value identifying the batch as being the Tip
current.id = current.i.toString();
// ... As it's no longer a Tip batch, we definitely don't want unfolds taking up space
delete current.u;
// TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches
// as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place
const tipUpdateAccepted = collection.replaceDocument(current._self, current, { etag: current._etag }, callback);
if (!tipUpdateAccepted) throw new Error("Unable to remove Tip markings.");
const isAccepted = collection.createDocument(collectionLink, pos(req,current.n), { disableAutomaticIdGeneration: true }, callback);
if (!isAccepted) throw new Error("Unable to create Tip batch.");
} else if (current) {
// Append the new events into the current batch
Array.prototype.push.apply(current.e, req.e);
// Replace all the unfolds // TODO: should remove only unfolds being superseded
current.u = req.u;
var tip;
if (!current) {
tip = { p: req.p, id: req.id, i: req.e.length, n: req.e.length, e: [], u: req.u };
const tipAccepted = collection.createDocument(collectionLink, tip, { disableAutomaticIdGeneration: true }, callback);
if (!tipAccepted) throw new Error("Unable to create Tip.");
} else {
// TODO Carry forward `u` items not in `req`, together with supporting catchup events from preceding batches
const n = current.n + req.e.length;
tip = { p: current.p, id: current.id, i: n, n: n, e: [], u: req.u };
// as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place
const isAccepted = collection.replaceDocument(current._self, pos(current, current.i), { etag: current._etag }, callback);
if (!isAccepted) throw new Error("Unable to replace Tip batch.");
} else {
const isAccepted = collection.createDocument(collectionLink, pos(req,0), { disableAutomaticIdGeneration: true }, callback);
if (!isAccepted) throw new Error("Unable to create Tip batch.");
const tipAccepted = collection.replaceDocument(current._self, tip, { etag: current._etag }, callback);
if (!tipAccepted) throw new Error("Unable to replace Tip.");
}
// For now, always do an Insert, as Change Feed mechanism does not yet afford us a way to
// a) guarantee an item per write (can be squashed)
// b) with metadata sufficient for us to determine the items added (only etags, no way to convey i/n in feed item)
const i = tip.n - req.e.length;
const batch = { p: tip.p, id: i.toString(), i: i, n: tip.n, e: req.e };
const batchAccepted = collection.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to insert Batch.");
}
}"""

Expand Down Expand Up @@ -499,12 +484,13 @@ module internal Tip =
open FSharp.Control
let private mkQuery (client : IDocumentClient) maxItems (stream: CollectionStream) (direction: Direction) startPos =
let querySpec =
let fields = "c.id, c.i, c._etag, c.n, c.e"
let root = sprintf "SELECT c.id, c.i, c._etag, c.n, c.e FROM c WHERE c.id!=\"%s\"" Tip.WellKnownDocumentId
let tail = sprintf "ORDER BY c.i %s" (if direction = Direction.Forward then "ASC" else "DESC")
match startPos with
| None -> SqlQuerySpec(sprintf "SELECT %s FROM c ORDER BY c.i " fields + if direction = Direction.Forward then "ASC" else "DESC")
| None -> SqlQuerySpec(sprintf "%s %s" root tail)
| Some { index = positionSoExclusiveWhenBackward } ->
let f = if direction = Direction.Forward then "c.n > @startPos ORDER BY c.i ASC" else "c.i < @startPos ORDER BY c.i DESC"
SqlQuerySpec(sprintf "SELECT %s FROM c WHERE " fields + f, SqlParameterCollection [SqlParameter("@startPos", positionSoExclusiveWhenBackward)])
let cond = if direction = Direction.Forward then "c.n > @startPos" else "c.i < @startPos"
SqlQuerySpec(sprintf "%s AND %s %s" root cond tail, SqlParameterCollection [SqlParameter("@startPos", positionSoExclusiveWhenBackward)])
let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems)
client.CreateDocumentQuery<Batch>(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery()

Expand Down
18 changes: 13 additions & 5 deletions tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Tests(testOutputHelper) =
let! res = Events.append ctx streamName index <| TestEvents.Create(0,1)
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 10
verifyRequestChargesMax 12 // 11.78 // WAS 10
// Clear the counters
capture.Clear()

Expand Down Expand Up @@ -169,16 +169,24 @@ type Tests(testOutputHelper) =
let! res = Events.append ctx streamName 0L expected
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 11 // 10.33
verifyRequestChargesMax 12 // 11.35 WAS 11 // 10.33
capture.Clear()

// Try overwriting it (a competing consumer would see the same)
let! res = Events.append ctx streamName 0L <| TestEvents.Create(-42,2)
// This time we get passed the conflicting events - we pay a little for that, but that's unavoidable
match res with
#if EVENTS_IN_TIP
| AppendResult.Conflict (1L, e) -> verifyCorrectEvents 0L expected e
#else
| AppendResult.ConflictUnknown 1L -> ()
#endif
| x -> x |> failwithf "Unexpected %A"
#if EVENTS_IN_TIP
test <@ [EqxAct.Resync] = capture.ExternalCalls @>
#else
test <@ [EqxAct.Conflict] = capture.ExternalCalls @>
#endif
verifyRequestChargesMax 5 // 4.02
capture.Clear()
}
Expand Down Expand Up @@ -236,7 +244,7 @@ type Tests(testOutputHelper) =
| _ -> None
// validate that, despite only requesting max 1 item, we only needed one trip (which contained only one item)
[1,1] =! capture.ChooseCalls queryRoundTripsAndItemCounts
verifyRequestChargesMax 3 // 2.97
verifyRequestChargesMax 4 // 3.02 // WAS 3 // 2.97
}

(* Backward *)
Expand All @@ -256,7 +264,7 @@ type Tests(testOutputHelper) =
verifyCorrectEventsBackward 4L expected res

test <@ [EqxAct.ResponseBackward; EqxAct.QueryBackward] = capture.ExternalCalls @>
verifyRequestChargesMax 3
verifyRequestChargesMax 4 // 3.04 // WAS 3
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
Expand Down Expand Up @@ -300,5 +308,5 @@ type Tests(testOutputHelper) =
| EqxEvent (Equinox.Cosmos.Store.Log.Event.Query (Equinox.Cosmos.Store.Direction.Backward, responses, { count = c })) -> Some (responses,c)
| _ -> None
[1,5] =! capture.ChooseCalls queryRoundTripsAndItemCounts
verifyRequestChargesMax 3 // 2.98
verifyRequestChargesMax 4 // 3.04 // WAS 3 // 2.98
}
6 changes: 6 additions & 0 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,15 @@ type Tests(testOutputHelper) =
&& has sku21 21 && has sku22 22 @>
// Intended conflicts pertained
let conflict = function EqxAct.Conflict | EqxAct.Resync as x -> Some x | _ -> None
#if EVENTS_IN_TIP
test <@ let c2 = List.choose conflict capture2.ExternalCalls
[EqxAct.Resync] = List.choose conflict capture1.ExternalCalls
&& [EqxAct.Resync] = c2 @>
#else
test <@ let c2 = List.choose conflict capture2.ExternalCalls
[EqxAct.Conflict] = List.choose conflict capture1.ExternalCalls
&& [EqxAct.Conflict] = c2 @>
#endif
}

let singleBatchBackwards = [EqxAct.ResponseBackward; EqxAct.QueryBackward]
Expand Down

0 comments on commit 5e95bf0

Please sign in to comment.