Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Tip isa Batch semantics #58

Merged
merged 1 commit into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 29 additions & 43 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,14 @@ module Sync =
// NB don't nest in a private module, or serialization will fail miserably ;)
[<CLIMutable; NoEquality; NoComparison; Newtonsoft.Json.JsonObject(ItemRequired=Newtonsoft.Json.Required.AllowNull)>]
type SyncResponse = { etag: string; n: int64; conflicts: Event[] }
let [<Literal>] sprocName = "EquinoxSync002" // NB need to renumber for any breaking change
let [<Literal>] sprocName = "EquinoxNoTipEvents" // NB need to renumber for any breaking change
let [<Literal>] sprocBody = """

// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states
// 1 Verify no current Tip batch, the incoming `req` becomes the Tip batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO)
// 2 Current Tip batch has space to accommodate the incoming unfolds (req.u) and events (req.e) - merge them in, replacing any superseded unfolds
// 3 Current Tip batch would become too large - remove Tip-specific state from active doc by replacing the well known id with a correct one; proceed as per 1
// 1 perform expectedVersion verification (can request inhibiting of check by supplying -1)
// 2a Verify no current Tip; if so - incoming req.e and defines the 'next' position / unfolds
// 2b If we already have a tip, move position forward, replace unfolds
// 3 insert a new document containing the events as part of the same batch of work
function sync(req, expectedVersion, maxEvents) {
if (!req) throw new Error("Missing req argument");
const collection = getContext().getCollection();
Expand All @@ -316,10 +317,7 @@ function sync(req, expectedVersion, maxEvents) {
// If there is no Tip page, the writer has no possible reason for writing at an index other than zero
response.setBody({ etag: null, n: 0, conflicts: [] });
} else if (current && expectedVersion !== current.n) {
// Where possible, we extract conflicting events from e and/or c in order to avoid another read cycle
// yielding [] triggers the client to go loading the events itself
const conflicts = expectedVersion < current.i ? [] : current.e.slice(expectedVersion - current.i);
response.setBody({ etag: current._etag, n: current.n, conflicts: conflicts });
response.setBody({ etag: current._etag, n: current.n, conflicts: [] });
} else {
executeUpsert(current);
}
Expand All @@ -331,40 +329,27 @@ function sync(req, expectedVersion, maxEvents) {
if (err) throw err;
response.setBody({ etag: doc._etag, n: doc.n, conflicts: null });
}
// `i` is established when first written; `n` needs to stay in step with i+batch.e.length
function pos(batch, i) {
batch.i = i
batch.n = batch.i + batch.e.length;
return batch;
}
// If we have hit a sensible limit for a slice, swap to a new one
if (current && current.e.length + req.e.length > maxEvents) {
// remove the well-known `id` value identifying the batch as being the Tip
current.id = current.i.toString();
// ... As it's no longer a Tip batch, we definitely don't want unfolds taking up space
delete current.u;

// TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches

// as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place
const tipUpdateAccepted = collection.replaceDocument(current._self, current, { etag: current._etag }, callback);
if (!tipUpdateAccepted) throw new Error("Unable to remove Tip markings.");

const isAccepted = collection.createDocument(collectionLink, pos(req,current.n), { disableAutomaticIdGeneration: true }, callback);
if (!isAccepted) throw new Error("Unable to create Tip batch.");
} else if (current) {
// Append the new events into the current batch
Array.prototype.push.apply(current.e, req.e);
// Replace all the unfolds // TODO: should remove only unfolds being superseded
current.u = req.u;
var tip;
if (!current) {
tip = { p: req.p, id: req.id, i: req.e.length, n: req.e.length, e: [], u: req.u };
const tipAccepted = collection.createDocument(collectionLink, tip, { disableAutomaticIdGeneration: true }, callback);
if (!tipAccepted) throw new Error("Unable to create Tip.");
} else {
// TODO Carry forward `u` items not in `req`, together with supporting catchup events from preceding batches
const n = current.n + req.e.length;
tip = { p: current.p, id: current.id, i: n, n: n, e: [], u: req.u };

// as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place
const isAccepted = collection.replaceDocument(current._self, pos(current, current.i), { etag: current._etag }, callback);
if (!isAccepted) throw new Error("Unable to replace Tip batch.");
} else {
const isAccepted = collection.createDocument(collectionLink, pos(req,0), { disableAutomaticIdGeneration: true }, callback);
if (!isAccepted) throw new Error("Unable to create Tip batch.");
const tipAccepted = collection.replaceDocument(current._self, tip, { etag: current._etag }, callback);
if (!tipAccepted) throw new Error("Unable to replace Tip.");
}
// For now, always do an Insert, as Change Feed mechanism does not yet afford us a way to
// a) guarantee an item per write (can be squashed)
// b) with metadata sufficient for us to determine the items added (only etags, no way to convey i/n in feed item)
const i = tip.n - req.e.length;
const batch = { p: tip.p, id: i.toString(), i: i, n: tip.n, e: req.e };
const batchAccepted = collection.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to insert Batch.");
}
}"""

Expand Down Expand Up @@ -499,12 +484,13 @@ module internal Tip =
open FSharp.Control
let private mkQuery (client : IDocumentClient) maxItems (stream: CollectionStream) (direction: Direction) startPos =
let querySpec =
let fields = "c.id, c.i, c._etag, c.n, c.e"
let root = sprintf "SELECT c.id, c.i, c._etag, c.n, c.e FROM c WHERE c.id!=\"%s\"" Tip.WellKnownDocumentId
let tail = sprintf "ORDER BY c.i %s" (if direction = Direction.Forward then "ASC" else "DESC")
match startPos with
| None -> SqlQuerySpec(sprintf "SELECT %s FROM c ORDER BY c.i " fields + if direction = Direction.Forward then "ASC" else "DESC")
| None -> SqlQuerySpec(sprintf "%s %s" root tail)
| Some { index = positionSoExclusiveWhenBackward } ->
let f = if direction = Direction.Forward then "c.n > @startPos ORDER BY c.i ASC" else "c.i < @startPos ORDER BY c.i DESC"
SqlQuerySpec(sprintf "SELECT %s FROM c WHERE " fields + f, SqlParameterCollection [SqlParameter("@startPos", positionSoExclusiveWhenBackward)])
let cond = if direction = Direction.Forward then "c.n > @startPos" else "c.i < @startPos"
SqlQuerySpec(sprintf "%s AND %s %s" root cond tail, SqlParameterCollection [SqlParameter("@startPos", positionSoExclusiveWhenBackward)])
let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems)
client.CreateDocumentQuery<Batch>(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery()

Expand Down
18 changes: 13 additions & 5 deletions tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Tests(testOutputHelper) =
let! res = Events.append ctx streamName index <| TestEvents.Create(0,1)
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 10
verifyRequestChargesMax 12 // 11.78 // WAS 10
// Clear the counters
capture.Clear()

Expand Down Expand Up @@ -169,16 +169,24 @@ type Tests(testOutputHelper) =
let! res = Events.append ctx streamName 0L expected
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 11 // 10.33
verifyRequestChargesMax 12 // 11.35 WAS 11 // 10.33
capture.Clear()

// Try overwriting it (a competing consumer would see the same)
let! res = Events.append ctx streamName 0L <| TestEvents.Create(-42,2)
// This time we get passed the conflicting events - we pay a little for that, but that's unavoidable
match res with
#if EVENTS_IN_TIP
| AppendResult.Conflict (1L, e) -> verifyCorrectEvents 0L expected e
#else
| AppendResult.ConflictUnknown 1L -> ()
#endif
| x -> x |> failwithf "Unexpected %A"
#if EVENTS_IN_TIP
test <@ [EqxAct.Resync] = capture.ExternalCalls @>
#else
test <@ [EqxAct.Conflict] = capture.ExternalCalls @>
#endif
verifyRequestChargesMax 5 // 4.02
capture.Clear()
}
Expand Down Expand Up @@ -236,7 +244,7 @@ type Tests(testOutputHelper) =
| _ -> None
// validate that, despite only requesting max 1 item, we only needed one trip (which contained only one item)
[1,1] =! capture.ChooseCalls queryRoundTripsAndItemCounts
verifyRequestChargesMax 3 // 2.97
verifyRequestChargesMax 4 // 3.02 // WAS 3 // 2.97
}

(* Backward *)
Expand All @@ -256,7 +264,7 @@ type Tests(testOutputHelper) =
verifyCorrectEventsBackward 4L expected res

test <@ [EqxAct.ResponseBackward; EqxAct.QueryBackward] = capture.ExternalCalls @>
verifyRequestChargesMax 3
verifyRequestChargesMax 4 // 3.04 // WAS 3
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
Expand Down Expand Up @@ -300,5 +308,5 @@ type Tests(testOutputHelper) =
| EqxEvent (Equinox.Cosmos.Store.Log.Event.Query (Equinox.Cosmos.Store.Direction.Backward, responses, { count = c })) -> Some (responses,c)
| _ -> None
[1,5] =! capture.ChooseCalls queryRoundTripsAndItemCounts
verifyRequestChargesMax 3 // 2.98
verifyRequestChargesMax 4 // 3.04 // WAS 3 // 2.98
}
6 changes: 6 additions & 0 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,15 @@ type Tests(testOutputHelper) =
&& has sku21 21 && has sku22 22 @>
// Intended conflicts pertained
let conflict = function EqxAct.Conflict | EqxAct.Resync as x -> Some x | _ -> None
#if EVENTS_IN_TIP
test <@ let c2 = List.choose conflict capture2.ExternalCalls
[EqxAct.Resync] = List.choose conflict capture1.ExternalCalls
&& [EqxAct.Resync] = c2 @>
#else
test <@ let c2 = List.choose conflict capture2.ExternalCalls
[EqxAct.Conflict] = List.choose conflict capture1.ExternalCalls
&& [EqxAct.Conflict] = c2 @>
#endif
}

let singleBatchBackwards = [EqxAct.ResponseBackward; EqxAct.QueryBackward]
Expand Down