From 25ebc33739d3b8bba2a72fb25da279657a1139b2 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 21 Oct 2020 10:38:11 +0100 Subject: [PATCH] Events in Tip Optimization (#251) --- CHANGELOG.md | 1 + DOCUMENTATION.md | 27 ++-- README.md | 8 +- samples/Infrastructure/Storage.fs | 12 +- src/Equinox.CosmosStore/CosmosStore.fs | 72 ++++++---- .../CosmosCoreIntegration.fs | 109 ++++++++------- .../CosmosFixtures.fs | 17 ++- .../CosmosIntegration.fs | 127 ++++++++++-------- 8 files changed, 210 insertions(+), 163 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c617429fb..8d64d2bd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - Reorganized `QueryRetryPolicy` to handle `IAsyncEnumerable` coming in Cosmos SDK V4 [#246](https://github.com/jet/equinox/pull/246) :pray: [@ylibrach](https://github.com/ylibrach) - Added Secondary store fallback for Event loading, enabling Streams to be hot-migrated (archived to a secondary/clone, then pruned from the primary/active) between Primary and Secondary stores [#247](https://github.com/jet/equinox/pull/247) - Replaced `BatchingPolicy`, `RetryPolicy` with `TipOptions`, `QueryOptions` to better align with Cosmos SDK V4 [#253](https://github.com/jet/equinox/pull/253) + - Added support for accumulating events in Tip [#251](https://github.com/jet/equinox/pull/251) see also [#110](https://github.com/jet/equinox/pull/110) - target `EventStore.Client` v `20.6` (instead of v `5.0.x`) [#224](https://github.com/jet/equinox/pull/224) - Retarget `netcoreapp2.1` apps to `netcoreapp3.1` with `SystemTextJson` - Retarget Todobackend to `aspnetcore` v `3.1` diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 6c5c918ba..e0e8bd76a 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -1740,11 +1740,10 @@ based on the events presented. ## Sync stored procedure -This covers what the most complete possible implementation of the JS Stored +This covers the V3 implementation of the JS Stored Procedure (see -[source](https://github.com/jet/equinox/blob/tip-isa-batch/src/Equinox.Cosmos/Cosmos.fs#L302)) -does when presented with a batch to be written. (NB The present implementation -is slightly simplified; see [source](src/Equinox.CosmosStore/CosmosStore.fs#L404). +[source](https://github.com/jet/equinox/blob/master/src/Equinox.CosmosStore/CosmosStore.fs#L396)) +does when presented with a batch to be written. The `sync` stored procedure takes as input, a document that is almost identical to the format of the _`Tip`_ batch (in fact, if the stream is found to be @@ -1762,15 +1761,14 @@ stream). The request includes the following elements: expectedVersion check is fulfilled - `u`: array of `unfold`ed events (aka snapshots) that supersede items with equivalent `c`ase values -- `maxEvents`: the maximum number of events permitted to be retained in the Tip (subject to that not exceeding the `maxStringifyLen` rule). For example: - - - if `e` contains 2 events, the _tip_ document's `e` has 2 events and the - `maxEvents` is `5`, the events get appended onto the tip's `e`vents - - if the total length including the new `e`vents would exceed `maxEvents`, the Tip is 'renamed' (gets its `id` set to `i.toString()`) to become a - batch (with the new `e`vents included in that calved Batch), and the new Tip - has a zero-length `e`vents array, and a set of `u`nfolds - (as an atomic transaction on the server side) - +- `maxEventsInTip`: the maximum number of events permitted to be retained in the Tip (subject to that not exceeding the `maxStringifyLen` rule). For example: + + - if `e` contains 2 events, the _tip_ document's `e` has 2 events and the `maxEventsInTip` is `5`, the events get appended onto the tip's `e`vents + - if the total length including the new `e`vents would exceed `maxEventsInTip`, the Tip is 'renamed' (gets its `id` set to `i.toString()`) to become a + batch (with the new `e`vents included in that calved Batch), and the new Tip has a zero-length `e`vents array + as a `Batch`, and a set of `u`nfolds (as an atomic + transaction on the server side) +- `maxStringifyLen`: secondary constraint on the events retained in the tip (in addition to `maxEventsInTip` constraint) - constrains the maximum length of the events being buffered in the Tip by applying a size limit in characters (as computed via `JSON.stringify(events).length`) - (PROPOSAL/FUTURE) `thirdPartyUnfoldRetention`: how many events to keep before the base (`i`) of the batch if required by lagging `u`nfolds which would otherwise fall out of scope as a result of the appends in this batch (this @@ -2256,9 +2254,6 @@ rich relative to the need of consumers to date. Some things remain though: services and for blue/green deploy scenarios); TBD how we decide when a union that's no longer in use gets removed) [#108](https://github.com/jet/equinox/issues/108) -- performance, efficiency and concurrency improvements based on - [`tip-isa-batch`](https://github.com/jet/equinox/tree/tip-isa-batch) schema - generalization [#109](https://github.com/jet/equinox/issues/109) - low level performance improvements in loading logic (reducing allocations etc) ## Wouldn't it be nice - `Equinox.DynamoDb` diff --git a/README.md b/README.md index adc5db6a5..762d596d7 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ Some aspects of the implementation are distilled from [`Jet.com` systems dating - **`Equinox.CosmosStore` 'Tip with Unfolds' schema**: (In contrast to `Equinox.EventStore`'s `AccessStrategy.RollingSnapshots`) when using `Equinox.CosmosStore`, optimized loading and command processing is managed via the `Tip`; a document per stream that facilitates Syncing via a single point-read as the 'Tip' maintains: a) the present Position of the stream - i.e. the index at which the next events will be appended, which is used for [optimistic concurrency control](https://en.wikipedia.org/wiki/Optimistic_concurrency_control) b) ephemeral (`deflate+base64` compressed by default, in order to optimize RU costs) [_unfolds_](DOCUMENTATION.md#Cosmos-Storage-Model) - c) (potentially in future) a holding buffer for events since those unfolded events ([presently removed](https://github.com/jet/equinox/pull/58), but [should return](DOCUMENTATION.md#Roadmap), see [#109](https://github.com/jet/equinox/pull/109)) + c) pending events, up to a specified count (or `JSON.stringify` length). When the events in tip accumulation limit is reached, they are shifted out to a thereafter-immutable `Batch`. This yields many of the benefits of the in-stream Rolling Snapshots approach while reducing latency and RU provisioning requirements due to meticulously tuned Request Charge costs:- - when the stream is empty, the initial `Load` operation involves a single point read that yields a `404 NotFound` response, costing 1.0 RU @@ -49,11 +49,9 @@ Some aspects of the implementation are distilled from [`Jet.com` systems dating - when coupled with snapshots/unfolds mechanism, a cache miss on `Reload` only triggers paying the cost for reading of the compressed snapshot stored in the Tip (i.e. instead of a `302`, the `IfNoneMatch` yields a `200` and returns all relevant information in the same roundtrip) - writes are via a single invocation of the `Sync` stored procedure which: a) does a point read - b) performs a concurrency check and then either... - c) applies the write OR returns the conflicting unfolds + b) performs a concurrency check and then either + c) applies the write OR returns the conflicting events and unfolds - no additional round trips to the store needed at either the `Load`, `Reload` or `Sync` points in the flow - -It should be noted that from a querying perspective, the `Tip` shares the same structure as `Batch` documents (a potential future extension would be to carry some events in the `Tip` as [some interim versions of the implementation once did](https://github.com/jet/equinox/pull/58), see also [#109](https://github.com/jet/equinox/pull/109). - **`Equinox.CosmosStore` `RollingState` and `Custom` 'non-event-sourced' modes**: Uses 'Tip with Unfolds' encoding to avoid having to write event documents at all - this enables one to build, reason about and test your aggregates in the normal manner, but inhibit event documents from being generated. This enables one to benefit from the caching and consistency management mechanisms without having to bear the cost of writing and storing the events themselves (and/or dealing with an ever-growing store size). Search for `transmute` or `RollingState` in the `samples` and/or see [the `Checkpoint` Aggregate in Propulsion](https://github.com/jet/propulsion/blob/master/src/Propulsion.EventStore/Checkpoint.fs). One chief use of this mechanism is for tracking Summary Event feeds in [the `dotnet-templates` `summaryConsumer` template](https://github.com/jet/dotnet-templates/tree/master/propulsion-summary-consumer). ## Components diff --git a/samples/Infrastructure/Storage.fs b/samples/Infrastructure/Storage.fs index 0bec93904..dfbfabd34 100644 --- a/samples/Infrastructure/Storage.fs +++ b/samples/Infrastructure/Storage.fs @@ -45,6 +45,8 @@ module Cosmos = | [] Connection2 of string | [] Database2 of string | [] Container2 of string + | [] TipMaxEvents of int + | [] TipMaxJsonLength of int | [] QueryMaxItems of int interface IArgParserTemplate with member a.Usage = @@ -60,6 +62,8 @@ module Cosmos = | Connection2 _ -> "specify a connection string for Secondary Cosmos account. Default: use same as Primary Connection" | Database2 _ -> "specify a database name for Secondary store. Default: use same as Primary Database" | Container2 _ -> "specify a container name for store. Default: use same as Primary Container" + | TipMaxEvents _ -> "specify maximum number of events to hold in Tip before calving off to a frozen Batch. Default: 256" + | TipMaxJsonLength _ -> "specify maximum length of JSON (as measured by JSON.stringify) to hold in Tip before calving off to a frozen Batch. Default: 30,000" | QueryMaxItems _ -> "specify maximum number of batches of events to retrieve in per query response. Default: 10" type Info(args : ParseResults) = member __.Mode = args.GetResult(ConnectionMode,Microsoft.Azure.Cosmos.ConnectionMode.Direct) @@ -76,6 +80,8 @@ module Cosmos = member __.Timeout = args.GetResult(Timeout,5.) |> TimeSpan.FromSeconds member __.Retries = args.GetResult(Retries,1) member __.MaxRetryWaitTime = args.GetResult(RetriesWaitTimeS, 5.) |> TimeSpan.FromSeconds + member __.TipMaxEvents = args.GetResult(TipMaxEvents, 256) + member __.TipMaxJsonLength = args.GetResult(TipMaxJsonLength, 30000) member __.QueryMaxItems = args.GetResult(QueryMaxItems, 10) /// Standing up an Equinox instance is necessary to run for test purposes; You'll need to either: @@ -106,9 +112,9 @@ module Cosmos = CosmosStoreConnection(client, databaseId, containerId) | (client, databaseId, containerId), Some (client2, db2, cont2) -> CosmosStoreConnection(client, databaseId, containerId, client2 = client2, databaseId2 = db2, containerId2 = cont2) - log.Information("CosmosStore Max Items in Query: {queryMaxItems}", - a.QueryMaxItems) - let ctx = CosmosStoreContext(conn, queryMaxItems = a.QueryMaxItems) + log.Information("CosmosStore Max Events in Tip: {maxTipEvents}e {maxTipJsonLength}b Items in Query: {queryMaxItems}", + a.TipMaxEvents, a.TipMaxJsonLength, a.QueryMaxItems) + let ctx = CosmosStoreContext(conn, queryMaxItems = a.QueryMaxItems, tipMaxEvents = a.TipMaxEvents, tipMaxJsonLength = a.TipMaxJsonLength) let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching StorageConfig.Cosmos (ctx, cacheStrategy, unfolds) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 1dfbeaa35..4a8cb6c4a 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -389,12 +389,12 @@ module private MicrosoftAzureCosmosWrappers = // NB don't nest in a private module, or serialization will fail miserably ;) [] -type SyncResponse = { etag: string; n: int64; conflicts: Unfold[] } +type SyncResponse = { etag: string; n: int64; conflicts: Unfold[]; e : Event[] } module internal SyncStoredProc = - let [] name = "EquinoxRollingUnfolds5" // NB need to rename/number for any breaking change + let [] name = "EquinoxEventsInTip4" // NB need to rename/number for any breaking change let [] body = """ -// Manages the merging of the supplied Request Batch into the stream +// Manages the merging of the supplied Request Batch into the stream, potentially storing events in the Tip // 0 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index) // High level end-states: @@ -402,7 +402,8 @@ module internal SyncStoredProc = // 1b if there is a Tip, but incoming request includes an event -> generate a batch document + create empty Tip // 2a if stream empty, but incoming request includes an event -> generate a batch document + create empty Tip // 2b if no current Tip, and no events being written -> the incoming `req` becomes the Tip batch -function sync(req, expIndex, expEtag) { + +function sync(req, expIndex, expEtag, maxEventsInTip, maxStringifyLen) { if (!req) throw new Error("Missing req argument"); const collectionLink = __.getSelfLink(); const response = getContext().getResponse(); @@ -415,12 +416,13 @@ function sync(req, expIndex, expEtag) { executeUpsert(current); } else if (!current && ((expIndex === -2 && expEtag !== null) || expIndex > 0)) { // If there is no Tip page, the writer has no possible reason for writing at an index other than zero, and an etag exp must be fulfilled - response.setBody({ etag: null, n: 0, conflicts: [] }); + response.setBody({ etag: null, n: 0, conflicts: [], e: [] }); } else if (current && ((expIndex === -2 && expEtag !== current._etag) || (expIndex !== -2 && expIndex !== current.n))) { // Where possible, we extract conflicting events from e and/or u in order to avoid another read cycle; // yielding [] triggers the client to go loading the events itself // if we're working based on etags, the `u`nfolds likely bear relevant info as state-bearing unfolds - response.setBody({ etag: current._etag, n: current.n, conflicts: current.u || [] }); + const recentEvents = expIndex < current.i ? [] : current.e.slice(expIndex - current.i); + response.setBody({ etag: current._etag, n: current.n, conflicts: current.u || [], e: recentEvents }); } else { executeUpsert(current); } @@ -429,10 +431,10 @@ function sync(req, expIndex, expEtag) { function executeUpsert(tip) { function callback(err, doc) { if (err) throw err; - response.setBody({ etag: doc._etag, n: doc.n, conflicts: null }); + response.setBody({ etag: doc._etag, n: doc.n, conflicts: null, e: [] }); } function shouldCalveBatch(events) { - return events.length > 0; + return events.length > maxEventsInTip || JSON.stringify(events).length > maxStringifyLen; } if (tip) { Array.prototype.push.apply(tip.e, req.e); @@ -481,7 +483,7 @@ module internal Sync = | Conflict of Position * events: ITimelineEvent[] | ConflictUnknown of Position - let private run (container : Container, stream : string) (exp, req: Tip) + let private run (container : Container, stream : string) (maxEventsInTip, maxStringifyLen) (exp, req: Tip) : Async = async { let ep = match exp with @@ -489,21 +491,22 @@ module internal Sync = | SyncExp.Etag et -> Position.fromEtag et | SyncExp.Any -> Position.fromAppendAtEnd let! ct = Async.CancellationToken - let args = [| box req; box ep.index; box (Option.toObj ep.etag)|] + let args = [| box req; box ep.index; box (Option.toObj ep.etag); box maxEventsInTip; box maxStringifyLen |] let! (res : Scripts.StoredProcedureExecuteResponse) = container.Scripts.ExecuteStoredProcedureAsync(SyncStoredProc.name, PartitionKey stream, args, cancellationToken = ct) |> Async.AwaitTaskCorrect let newPos = { index = res.Resource.n; etag = Option.ofObj res.Resource.etag } match res.Resource.conflicts with | null -> return res.RequestCharge, Result.Written newPos - | [||] when newPos.index = 0L -> return res.RequestCharge, Result.Conflict (newPos, Array.empty) - | [||] -> return res.RequestCharge, Result.ConflictUnknown newPos - | unfolds -> // stored proc only returns unfolds with index >= req.i - no need to trim to a minIndex - let events = Enum.Unfolds unfolds |> Array.ofSeq + // ConflictUnknown is to be yielded if we believe querying is going to be necessary (as there are no unfolds, and no relevant events in the Tip) + | [||] when res.Resource.e.Length = 0 && newPos.index > ep.index -> + return res.RequestCharge, Result.ConflictUnknown newPos + | unfolds -> // stored proc only returns events and unfolds with index >= req.i - no need to trim to a minIndex + let events = (Enum.Events(ep.index, res.Resource.e), Enum.Unfolds unfolds) ||> Seq.append |> Array.ofSeq return res.RequestCharge, Result.Conflict (newPos, events) } - let private logged (container,stream) (exp : SyncExp, req: Tip) (log : ILogger) + let private logged (container,stream) (maxEventsInTip, maxStringifyLen) (exp : SyncExp, req: Tip) (log : ILogger) : Async = async { - let! t, (ru, result) = run (container,stream) (exp, req) |> Stopwatch.Time + let! t, (ru, result) = run (container,stream) (maxEventsInTip, maxStringifyLen) (exp, req) |> Stopwatch.Time let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length let log = let inline mkMetric ru : Log.Measurement = { stream = stream; interval = t; bytes = bytes; count = count; ru = ru } @@ -526,8 +529,8 @@ module internal Sync = "Sync", stream, count, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru, bytes, exp) return result } - let batch (log : ILogger) retryPolicy containerStream expBatch: Async = - let call = logged containerStream expBatch + let batch (log : ILogger) (retryPolicy, maxEventsInTip, maxStringifyLen) containerStream expBatch: Async = + let call = logged containerStream (maxEventsInTip, maxStringifyLen) expBatch Log.withLoggedRetries retryPolicy "writeAttempt" call log let private mkEvent (e : IEventData<_>) = @@ -675,7 +678,7 @@ module internal Tip = match minIndex with None -> () | Some x -> yield "c.n > @minPos", fun (q : QueryDefinition) -> q.WithParameter("@minPos", x) match maxIndex with None -> () | Some x -> yield "c.i < @maxPos", fun (q : QueryDefinition) -> q.WithParameter("@maxPos", x) ] let whereClause = - let notTip = sprintf "c.id!=\"%s\"" Tip.WellKnownDocumentId // until tip-isa-batch, we have a guarantee there are no events in Tip + let notTip = sprintf "c.id!=\"%s\"" Tip.WellKnownDocumentId let conditions = Seq.map fst args if List.isEmpty args && includeTip then null else "WHERE " + String.Join(" AND ", if includeTip then conditions else Seq.append conditions (Seq.singleton notTip)) @@ -793,7 +796,7 @@ module internal Tip = (tryDecode : ITimelineEvent -> 'event option, isOrigin: 'event -> bool) (direction, minIndex, maxIndex) : AsyncSeq<'event[]> = asyncSeq { - let query = mkQuery log (container,stream) false maxItems (direction, minIndex, maxIndex) + let query = mkQuery log (container,stream) true maxItems (direction, minIndex, maxIndex) let readPage = mapPage direction stream (minIndex, maxIndex) maxRequests let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream @@ -1020,10 +1023,20 @@ type QueryOptions member __.MaxRequests = maxRequests /// Defines the policies in force regarding +/// - accumulation/retention of Events in Tip /// - retrying read and write operations for the Tip type TipOptions - ( []?readRetryPolicy, + ( /// Maximum number of events permitted in Tip. When this is exceeded, events are moved out to a standalone Batch. Default: 0. + []?maxEvents, + /// Maximum serialized size (length of JSON.stringify representation) to permit to accumulate in Tip before they get moved out to a standalone Batch. Default: 30_000. + []?maxJsonLength, + []?readRetryPolicy, []?writeRetryPolicy) = + let maxEvents, maxJsonLength = defaultArg maxEvents 0, defaultArg maxJsonLength 30_000 + /// Maximum number of events permitted in Tip. When this is exceeded, events are moved out to a standalone Batch. Default: 0 + member __.MaxEvents = maxEvents + /// Maximum serialized size (length of JSON.stringify representation) to permit to accumulate in Tip before they get moved out to a standalone Batch. Default: 30_000. + member __.MaxJsonLength = maxJsonLength member __.ReadRetryPolicy = readRetryPolicy member __.WriteRetryPolicy = writeRetryPolicy @@ -1032,9 +1045,9 @@ type StoreClient(container : Container, fallback : Container option, query : Que let loadTip log stream pos = Tip.tryLoad log tip.ReadRetryPolicy (container, stream) (pos, None) // Always yields events forward, regardless of direction - member internal __.Read(log, stream, direction, (tryDecode, isOrigin), ?minIndex, ?maxIndex, ?tip, ?forceExcludeTip): Async = async { + member internal __.Read(log, stream, direction, (tryDecode, isOrigin), ?minIndex, ?maxIndex, ?tip): Async = async { let tip = tip |> Option.map (Query.scanTip (tryDecode,isOrigin)) - let includeTip = Option.isNone tip && forceExcludeTip <> Some true + let includeTip = Option.isNone tip let walk log gateway = Query.scan log (gateway,stream) includeTip query.MaxItems query.MaxRequests direction (tryDecode, isOrigin) let walkFallback = match fallback with @@ -1048,7 +1061,7 @@ type StoreClient(container : Container, fallback : Container option, query : Que Query.walkLazy log (container,stream) batching.MaxItems batching.MaxRequests (tryDecode,isOrigin) (direction, minIndex, maxIndex) member con.Load(log, (stream, maybePos), (tryDecode, isOrigin), checkUnfolds): Async = - if not checkUnfolds then con.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), forceExcludeTip = true) + if not checkUnfolds then con.Read(log, stream, Direction.Backward, (tryDecode, isOrigin)) else async { match! loadTip log stream maybePos with | Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty, Array.empty @@ -1073,7 +1086,7 @@ type StoreClient(container : Container, fallback : Container option, query : Que member internal __.Sync(log, stream, exp, batch: Tip): Async = async { if Array.isEmpty batch.e && Array.isEmpty batch.u then invalidOp "Must write either events or unfolds." - match! Sync.batch log tip.WriteRetryPolicy (container, stream) (exp, batch) with + match! Sync.batch log (tip.WriteRetryPolicy, tip.MaxEvents, tip.MaxJsonLength) (container, stream) (exp, batch) with | Sync.Result.Conflict (pos',events) -> return InternalSyncResult.Conflict (pos',events) | Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create stream pos') | Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create stream pos') } @@ -1205,8 +1218,13 @@ type CosmosStoreContext(connection : CosmosStoreConnection, ?queryOptions, ?tipO new(connection : CosmosStoreConnection, ?defaultMaxItems, ?getDefaultMaxItems, ?maxRequests, ?tipOptions) = let queryOptions = QueryOptions(?defaultMaxItems = defaultMaxItems, ?getDefaultMaxItems = getDefaultMaxItems, ?maxRequests = maxRequests) CosmosStoreContext(connection, queryOptions, ?tipOptions = tipOptions) - new(connection : CosmosStoreConnection, ?queryMaxItems) = - let tipOptions = TipOptions() + new(connection : CosmosStoreConnection, ?queryMaxItems, + /// Maximum number of events permitted in Tip. When this is exceeded, events are moved out to a standalone Batch. Default: 0 + /// NOTE Equinox.Cosmos versions <= 3.0.0 cannot read events in Tip, hence using a non-zero value will not be interoperable. + ?tipMaxEvents, + /// Maximum serialized size (length of JSON.stringify representation) permitted in Tip before they get moved out to a standalone Batch. Default: 30_000. + ?tipMaxJsonLength) = + let tipOptions = TipOptions(?maxEvents = tipMaxEvents, ?maxJsonLength = tipMaxJsonLength) CosmosStoreContext(connection, tipOptions = tipOptions, ?defaultMaxItems = queryMaxItems) member val QueryOptions = queryOptions |> Option.defaultWith QueryOptions member val TipOptions = tipOptions |> Option.defaultWith TipOptions diff --git a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs index 39f0db349..fb93fef5f 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs @@ -34,16 +34,18 @@ type Tests(testOutputHelper) = let tripRequestCharges = [ for e, c in capture.RequestCharges -> sprintf "%A" e, c ] test <@ float rus >= Seq.sum (Seq.map snd tripRequestCharges) @> - [] - let append (TestStream streamName) = Async.RunSynchronously <| async { + [] + let append (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { capture.Clear() - let ctx = createPrimaryEventsContext log defaultQueryMaxItems + let ctx = createPrimaryEventsContext log defaultQueryMaxItems (if eventsInTip then 1 else 0) let index = 0L let! res = Events.append ctx streamName index <| TestEvents.Create(0,1) test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 34 // 33.07 + if eventsInTip then verifyRequestChargesMax 21 // 20.42 + else verifyRequestChargesMax 34 // 33.07 + // Clear the counters capture.Clear() @@ -51,14 +53,15 @@ type Tests(testOutputHelper) = test <@ AppendResult.Ok 6L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> // We didnt request small batches or splitting so it's not dramatically more expensive to write N events - verifyRequestChargesMax 41 // 40.68 + if eventsInTip then verifyRequestChargesMax 40 // 39.13 + else verifyRequestChargesMax 41 // 40.68 } // It's conceivable that in the future we might allow zero-length batches as long as a sync mechanism leveraging the etags and unfolds update mechanisms // As it stands with the NoTipEvents stored proc, permitting empty batches a) yields an invalid state b) provides no conceivable benefit [] let ``append Throws when passed an empty batch`` (TestStream streamName) = Async.RunSynchronously <| async { - let ctx = createPrimaryEventsContext log defaultQueryMaxItems + let ctx = createPrimaryEventsContext log defaultQueryMaxItems 10 let index = 0L let! res = Events.append ctx streamName index (TestEvents.Create(0,0)) |> Async.Catch @@ -100,12 +103,12 @@ type Tests(testOutputHelper) = let verifyCorrectEventsBackward = verifyCorrectEventsEx Equinox.CosmosStore.Core.Direction.Backward let verifyCorrectEvents = verifyCorrectEventsEx Equinox.CosmosStore.Core.Direction.Forward - [] - let ``appendAtEnd and getNextIndex`` (extras, TestStream streamName) = Async.RunSynchronously <| async { + [] + let ``appendAtEnd and getNextIndex`` (eventsInTip, extras, TestStream streamName) = Async.RunSynchronously <| async { // If a fail triggers a rerun, we need to dump the previous log entries captured capture.Clear() - let ctx = createPrimaryEventsContext log 1 + let ctx = createPrimaryEventsContext log 1 (if eventsInTip then 1 else 0) let! pos = Events.getNextIndex ctx streamName test <@ [EqxAct.TipNotFound] = capture.ExternalCalls @> @@ -119,7 +122,8 @@ type Tests(testOutputHelper) = test <@ [EqxAct.Append] = capture.ExternalCalls @> pos <- pos + int64 appendBatchSize pos =! res - verifyRequestChargesMax 46 // 45.16 + if eventsInTip then verifyRequestChargesMax 50 // 49.58 + else verifyRequestChargesMax 46 // 45.16 capture.Clear() let! res = Events.getNextIndex ctx streamName @@ -132,7 +136,8 @@ type Tests(testOutputHelper) = pos <- pos + 42L pos =! res test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 50 // 49.74 + if eventsInTip then verifyRequestChargesMax 45 // was 44.65 + else verifyRequestChargesMax 50 // 49.74 capture.Clear() let! res = Events.getNextIndex ctx streamName @@ -160,10 +165,10 @@ type Tests(testOutputHelper) = verifyRequestChargesMax 1 // for a 302 by definition - when an etag IfNotMatch is honored, you only pay one RU } - [] - let ``append - fails on non-matching`` (TestStream streamName) = Async.RunSynchronously <| async { + [] + let ``append - fails on non-matching`` (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { capture.Clear() - let ctx = createPrimaryEventsContext log 10 + let ctx = createPrimaryEventsContext log 10 (if eventsInTip then 1 else 0) // Attempt to write, skipping Index 0 let! res = Events.append ctx streamName 1L <| TestEvents.Create(0,1) @@ -171,7 +176,8 @@ type Tests(testOutputHelper) = test <@ [EqxAct.Resync] = capture.ExternalCalls @> // The response aligns with a normal conflict in that it passes the entire set of conflicting events () test <@ AppendResult.Conflict (0L,[||]) = res @> - verifyRequestChargesMax 12 // 11.18 + if eventsInTip then verifyRequestChargesMax 12 // 11.49 + else verifyRequestChargesMax 12 // 11.18 capture.Clear() // Now write at the correct position @@ -179,23 +185,27 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName 0L expected test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 40 // 39.39 + if eventsInTip then verifyRequestChargesMax 27 // 26.4 + else verifyRequestChargesMax 40 // 39.39 capture.Clear() // Try overwriting it (a competing consumer would see the same) let! res = Events.append ctx streamName 0L <| TestEvents.Create(-42,2) // This time we get passed the conflicting events - we pay a little for that, but that's unavoidable - match res, capture.ExternalCalls with - | AppendResult.ConflictUnknown 1L, [EqxAct.Conflict] -> + match eventsInTip, res, capture.ExternalCalls with + | true, AppendResult.Conflict (1L, e), [EqxAct.Resync] -> + verifyCorrectEvents 0L expected e + verifyRequestChargesMax 10 // 9.93 + | false, AppendResult.ConflictUnknown 1L, [EqxAct.Conflict] -> verifyRequestChargesMax 12 // 11.9 | x -> x |> failwithf "Unexpected %A" } (* Forward *) - [] - let get (TestStream streamName) = Async.RunSynchronously <| async { - let ctx = createPrimaryEventsContext log 3 + [] + let get (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { + let ctx = createPrimaryEventsContext log 3 (if eventsInTip then 10 else 0) // We're going to ignore the first, to prove we can let! expected = add6EventsIn2Batches ctx streamName @@ -206,12 +216,13 @@ type Tests(testOutputHelper) = verifyCorrectEvents 1L expected res test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> - verifyRequestChargesMax 5 // (4.15) // WAS 13 with SDK bugs// 12.81 + if eventsInTip then verifyRequestChargesMax 3 + else verifyRequestChargesMax 5 // (4.15) // WAS 13 with SDK bugs// 12.81 } - [] - let ``get in 2 batches`` (TestStream streamName) = Async.RunSynchronously <| async { - let ctx = createPrimaryEventsContext log 1 + [] + let ``get in 2 batches`` (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { + let ctx = createPrimaryEventsContext log 1 (if eventsInTip then 1 else 0) let! expected = add6EventsIn2BatchesEx ctx streamName 2 let expected = expected |> Array.take 3 @@ -225,9 +236,9 @@ type Tests(testOutputHelper) = verifyRequestChargesMax 7 // 6.01 } - [] - let ``get Lazy`` (TestStream streamName) = Async.RunSynchronously <| async { - let ctx = createPrimaryEventsContext log 1 + [] + let ``get Lazy`` (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { + let ctx = createPrimaryEventsContext log 1 (if eventsInTip then 3 else 0) let! expected = add6EventsIn2BatchesEx ctx streamName 4 @@ -241,14 +252,14 @@ type Tests(testOutputHelper) = | _ -> None // validate that, because we stopped after 1 item, we only needed one trip (which contained 4 events) [1,4] =! capture.ChooseCalls queryRoundTripsAndItemCounts - verifyRequestChargesMax 4 // 3.06 + verifyRequestChargesMax 3 // 2.97 } (* Backward *) - [] - let getBackwards (TestStream streamName) = Async.RunSynchronously <| async { - let ctx = createPrimaryEventsContext log 1 + [] + let getBackwards (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { + let ctx = createPrimaryEventsContext log 1 (if eventsInTip then 1 else 0) let! expected = add6EventsIn2Batches ctx streamName @@ -263,9 +274,9 @@ type Tests(testOutputHelper) = verifyRequestChargesMax 3 } - [] - let ``getBackwards in 2 batches`` (TestStream streamName) = Async.RunSynchronously <| async { - let ctx = createPrimaryEventsContext log 1 + [] + let ``getBackwards in 2 batches`` (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { + let ctx = createPrimaryEventsContext log 1 (if eventsInTip then 1 else 0) let! expected = add6EventsIn2BatchesEx ctx streamName 2 @@ -280,9 +291,9 @@ type Tests(testOutputHelper) = verifyRequestChargesMax 7 // 6.01 } - [] - let ``getBackwards Lazy`` (TestStream streamName) = Async.RunSynchronously <| async { - let ctx = createPrimaryEventsContext log 1 + [] + let ``getBackwards Lazy`` (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { + let ctx = createPrimaryEventsContext log 1 (if eventsInTip then 3 else 0) let! expected = add6EventsIn2BatchesEx ctx streamName 4 @@ -295,7 +306,7 @@ type Tests(testOutputHelper) = verifyCorrectEventsBackward 5L expected res // only 1 request of 1 item triggered - let pages = 1 // in V2 (and V3 master for now), Tip gets filtered out of the querying explicitly + let pages = if eventsInTip then 1 else 2 // in V2, the query excluded the Tip; in V3 with tipMaxEvents, we return the tip but it has no data test <@ [yield! Seq.replicate pages EqxAct.ResponseBackward; EqxAct.QueryBackward] = capture.ExternalCalls @> // validate that, despite only requesting max 1 item, we only needed one trip, bearing 5 items (from which one item was omitted) let queryRoundTripsAndItemCounts = function @@ -303,14 +314,17 @@ type Tests(testOutputHelper) = | _ -> None let expectedPagesAndEvents = [pages, 2] expectedPagesAndEvents =! capture.ChooseCalls queryRoundTripsAndItemCounts - verifyRequestChargesMax 6 // 5.66 + if eventsInTip then verifyRequestChargesMax 3 // 2.98 + else verifyRequestChargesMax 6 // 5.66 } (* Prune *) - [] - let prune (TestStream streamName) = Async.RunSynchronously <| async { - let ctx = createPrimaryEventsContext log 10 + [] + let prune (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { + if eventsInTip then () else // TODO + + let ctx = createPrimaryEventsContext log 10 (if eventsInTip then 1 else 0) let! expected = add6EventsIn2Batches ctx streamName // Trigger deletion of first batch @@ -352,10 +366,11 @@ type Tests(testOutputHelper) = (* Fallback *) - [] - let fallback (TestStream streamName) = Async.RunSynchronously <| async { + [] + let fallback (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { + if eventsInTip then () else // TODO - let ctx1 = createPrimaryEventsContext log defaultQueryMaxItems + let ctx1 = createPrimaryEventsContext log defaultQueryMaxItems 0 let ctx2 = createSecondaryEventsContext log defaultQueryMaxItems let ctx12 = createFallbackEventsContext log defaultQueryMaxItems @@ -400,7 +415,7 @@ type Tests(testOutputHelper) = test <@ [||] = res @> verifyRequestChargesMax 3 // 2.99 - // Fallback queries secondary + // Fallback queries secondary (unless we actually delete the Tip too) // TODO demonstrate Primary read is only of Tip when using snapshots capture.Clear() let! res = Events.get ctx12 streamName 0L Int32.MaxValue diff --git a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs index 30e102bde..54886ba04 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs @@ -43,22 +43,27 @@ let connectWithFallback log = let client = createClient log name discovery CosmosStoreConnection(client, databaseId, containerId, containerId2 = containerId2) -let createPrimaryContext log queryMaxItems = +let createPrimaryContextEx log queryMaxItems tipMaxEvents = let conn = connectPrimary log - CosmosStoreContext(conn, queryMaxItems = queryMaxItems) + CosmosStoreContext(conn, queryMaxItems = queryMaxItems, tipMaxEvents = tipMaxEvents) + +let defaultTipMaxEvents = 10 + +let createPrimaryContext log queryMaxItems = + createPrimaryContextEx log queryMaxItems defaultTipMaxEvents let createSecondaryContext log queryMaxItems = let conn = connectSecondary log - CosmosStoreContext(conn, queryMaxItems = queryMaxItems) + CosmosStoreContext(conn, queryMaxItems = queryMaxItems, tipMaxEvents = defaultTipMaxEvents) let createFallbackContext log queryMaxItems = let conn = connectWithFallback log - CosmosStoreContext(conn, queryMaxItems = queryMaxItems) + CosmosStoreContext(conn, queryMaxItems = queryMaxItems, tipMaxEvents = defaultTipMaxEvents) let defaultQueryMaxItems = 10 -let createPrimaryEventsContext log queryMaxItems = - let context = createPrimaryContext log queryMaxItems +let createPrimaryEventsContext log queryMaxItems tipMaxItems = + let context = createPrimaryContextEx log queryMaxItems tipMaxItems Equinox.CosmosStore.Core.EventsContext(context, log) let createSecondaryEventsContext log queryMaxItems = diff --git a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs index 97c6d7dbd..50a2e1be9 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs @@ -70,17 +70,17 @@ type Tests(testOutputHelper) = let tripRequestCharges = [ for e, c in capture.RequestCharges -> sprintf "%A" e, c ] test <@ float rus >= Seq.sum (Seq.map snd tripRequestCharges) @> - [] - let ``Can roundtrip against Cosmos, correctly batching the reads (without special-casing tip)`` (cartContext, skuId) = Async.RunSynchronously <| async { + [] + let ``Can roundtrip against Cosmos, correctly batching the reads (without special-casing tip)`` (eventsInTip, cartContext, skuId) = Async.RunSynchronously <| async { capture.Clear() // for re-runs of the test let addRemoveCount = 40 let eventsPerAction = addRemoveCount * 2 - 1 let queryMaxItems = 3 - let context = createPrimaryContext log queryMaxItems + let context = createPrimaryContextEx log queryMaxItems (if eventsInTip then eventsPerAction else 0) let service = Cart.createServiceWithoutOptimization log context let expectedResponses n = - let expectedBatches = 1 + n + let expectedBatches = 1 + if eventsInTip then n / 2 else n max 1 (int (ceil (float expectedBatches / float queryMaxItems))) let cartId = % Guid.NewGuid() @@ -89,7 +89,8 @@ type Tests(testOutputHelper) = for i in [1..transactions] do do! addAndThenRemoveItemsManyTimesExceptTheLastOne cartContext cartId skuId service addRemoveCount test <@ i = i && List.replicate (expectedResponses (i-1)) EqxAct.ResponseBackward @ [EqxAct.QueryBackward; EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 79 // 78.37 [3.15; 75.22] + if eventsInTip then verifyRequestChargesMax 76 // 76.0 [3.72; 72.28] + else verifyRequestChargesMax 79 // 78.37 [3.15; 75.22] capture.Clear() // Validate basic operation; Key side effect: Log entries will be emitted to `capture` @@ -98,15 +99,16 @@ type Tests(testOutputHelper) = test <@ addRemoveCount = match state with { items = [{ quantity = quantity }] } -> quantity | _ -> failwith "nope" @> test <@ List.replicate (expectedResponses transactions) EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @> - verifyRequestChargesMax 15 // 14.01 + if eventsInTip then verifyRequestChargesMax 8 // 7.46 + else verifyRequestChargesMax 15 // 14.01 } - [] - let ``Can roundtrip against Cosmos, managing sync conflicts by retrying`` (ctx, initialState) = Async.RunSynchronously <| async { + [] + let ``Can roundtrip against Cosmos, managing sync conflicts by retrying`` (eventsInTip, ctx, initialState) = Async.RunSynchronously <| async { capture.Clear() let log1, capture1 = log, capture let queryMaxItems = 3 - let context = createPrimaryContext log1 queryMaxItems + let context = createPrimaryContextEx log1 queryMaxItems (if eventsInTip then 10 else 0) // Ensure batching is included at some point in the proceedings let cartContext, (sku11, sku12, sku21, sku22) = ctx @@ -173,17 +175,22 @@ type Tests(testOutputHelper) = && has sku21 21 && has sku22 22 @> // Intended conflicts arose let conflict = function EqxAct.Conflict | EqxAct.Resync as x -> Some x | _ -> None - test <@ let c2 = List.choose conflict capture2.ExternalCalls - [EqxAct.Conflict] = List.choose conflict capture1.ExternalCalls - && [EqxAct.Conflict] = c2 @> + if eventsInTip then + test <@ let c2 = List.choose conflict capture2.ExternalCalls + [EqxAct.Resync] = List.choose conflict capture1.ExternalCalls + && [EqxAct.Resync] = c2 @> + else + test <@ let c2 = List.choose conflict capture2.ExternalCalls + [EqxAct.Conflict] = List.choose conflict capture1.ExternalCalls + && [EqxAct.Conflict] = c2 @> } let singleBatchBackwards = [EqxAct.ResponseBackward; EqxAct.QueryBackward] let batchBackwardsAndAppend = singleBatchBackwards @ [EqxAct.Append] - [] - let ``Can correctly read and update against Cosmos with LatestKnownEvent Access Strategy`` value = Async.RunSynchronously <| async { - let context = createPrimaryContext log 1 + [] + let ``Can correctly read and update against Cosmos with LatestKnownEvent Access Strategy`` (eventsInTip, value) = Async.RunSynchronously <| async { + let context = createPrimaryContextEx log 1 (if eventsInTip then 1 else 0) let service = ContactPreferences.createService log context let id = ContactPreferences.Id (let g = System.Guid.NewGuid() in g.ToString "N") @@ -199,35 +206,36 @@ type Tests(testOutputHelper) = test <@ [EqxAct.Tip; EqxAct.Append; EqxAct.Tip] = capture.ExternalCalls @> - (* Verify pruning does not affect the copies of the events maintained as Unfolds *) + if not eventsInTip then + (* Verify pruning does not affect the copies of the events maintained as Unfolds *) - // Needs to share the same context (with inner CosmosClient) for the session token to be threaded through - // If we run on an independent context, we won't see (and hence prune) the full set of events - let ctx = Core.EventsContext(context, log) - let streamName = ContactPreferences.streamName id |> FsCodec.StreamName.toString + // Needs to share the same context (with inner CosmosClient) for the session token to be threaded through + // If we run on an independent context, we won't see (and hence prune) the full set of events + let ctx = Core.EventsContext(context, log) + let streamName = ContactPreferences.streamName id |> FsCodec.StreamName.toString - // Prune all the events - let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 14L - test <@ deleted = 14 && deferred = 0 && trimmedPos = 14L @> + // Prune all the events + let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 14L + test <@ deleted = 14 && deferred = 0 && trimmedPos = 14L @> - // Prove they're gone - capture.Clear() - let! res = Core.Events.get ctx streamName 0L Int32.MaxValue - test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> - test <@ [||] = res @> - verifyRequestChargesMax 3 // 2.99 + // Prove they're gone + capture.Clear() + let! res = Core.Events.get ctx streamName 0L Int32.MaxValue + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + test <@ [||] = res @> + verifyRequestChargesMax 3 // 2.99 - // But we can still read (there's no cache so we'll definitely be reading) - capture.Clear() - let! _ = service.Read id - test <@ value = result @> - test <@ [EqxAct.Tip] = capture.ExternalCalls @> - verifyRequestChargesMax 1 + // But we can still read (there's no cache so we'll definitely be reading) + capture.Clear() + let! _ = service.Read id + test <@ value = result @> + test <@ [EqxAct.Tip] = capture.ExternalCalls @> + verifyRequestChargesMax 1 } [] let ``Can correctly read and update Contacts against Cosmos with RollingUnfolds Access Strategy`` value = Async.RunSynchronously <| async { - let context = createPrimaryContext log 1 + let context = createPrimaryContextEx log 1 10 let service = ContactPreferences.createServiceWithLatestKnownEvent context log CachingStrategy.NoCaching let id = ContactPreferences.Id (let g = System.Guid.NewGuid() in g.ToString "N") @@ -244,11 +252,11 @@ type Tests(testOutputHelper) = test <@ [EqxAct.Tip; EqxAct.Append; EqxAct.Tip] = capture.ExternalCalls @> } - [] + [] let ``Can roundtrip Cart against Cosmos with RollingUnfolds, detecting conflicts based on _etag`` (ctx, initialState) = Async.RunSynchronously <| async { let log1, capture1 = log, capture capture1.Clear() - let context = createPrimaryContext log1 1 + let context = createPrimaryContextEx log1 1 10 let cartContext, (sku11, sku12, sku21, sku22) = ctx let cartId = % Guid.NewGuid() @@ -322,7 +330,7 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip against Cosmos, using Snapshotting to avoid queries`` (cartContext, skuId) = Async.RunSynchronously <| async { let queryMaxItems = 10 - let context = createPrimaryContext log queryMaxItems + let context = createPrimaryContextEx log queryMaxItems 10 let createServiceIndexed () = Cart.createServiceWithSnapshotStrategy log context let service1, service2 = createServiceIndexed (), createServiceIndexed () capture.Clear() @@ -367,9 +375,9 @@ type Tests(testOutputHelper) = verifyRequestChargesMax 1 } - [] - let ``Can roundtrip against Cosmos, correctly using Snapshotting and Cache to avoid redundant reads`` (cartContext, skuId) = Async.RunSynchronously <| async { - let context = createPrimaryContext log 10 + [] + let ``Can roundtrip against Cosmos, correctly using Snapshotting and Cache to avoid redundant reads`` (eventsInTip, cartContext, skuId) = Async.RunSynchronously <| async { + let context = createPrimaryContextEx log 10 (if eventsInTip then 1 else 0) let cache = Equinox.Cache("cart", sizeMb = 50) let createServiceCached () = Cart.createServiceWithSnapshotStrategyAndCaching log context cache let service1, service2 = createServiceCached (), createServiceCached () @@ -403,24 +411,25 @@ type Tests(testOutputHelper) = do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne cartContext cartId skuId service1 1 test <@ [EqxAct.Append] = capture.ExternalCalls @> - (* Verify pruning does not affect snapshots, and does not touch the Tip *) + if not eventsInTip then + (* Verify pruning does not affect snapshots, and does not touch the Tip *) - let ctx = Core.EventsContext(context, log) - let streamName = Cart.streamName cartId |> FsCodec.StreamName.toString - // Prune all the events - let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 13L - test <@ deleted = 13 && deferred = 0 && trimmedPos = 13L @> + let ctx = Core.EventsContext(context, log) + let streamName = Cart.streamName cartId |> FsCodec.StreamName.toString + // Prune all the events + let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 13L + test <@ deleted = 13 && deferred = 0 && trimmedPos = 13L @> - // Prove they're gone - capture.Clear() - let! res = Core.Events.get ctx streamName 0L Int32.MaxValue - test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> - test <@ [||] = res @> - verifyRequestChargesMax 3 // 2.99 + // Prove they're gone + capture.Clear() + let! res = Core.Events.get ctx streamName 0L Int32.MaxValue + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + test <@ [||] = res @> + verifyRequestChargesMax 3 // 2.99 - // But we can still read (service2 shares the cache so is aware of the last writes, and pruning does not invalidate the Tip) - capture.Clear() - let! _ = service2.Read cartId - test <@ [EqxAct.TipNotModified] = capture.ExternalCalls @> - verifyRequestChargesMax 1 + // But we can still read (service2 shares the cache so is aware of the last writes, and pruning does not invalidate the Tip) + capture.Clear() + let! _ = service2.Read cartId + test <@ [EqxAct.TipNotModified] = capture.ExternalCalls @> + verifyRequestChargesMax 1 }