From 2fa22c44addb5d5442524ee3d42264924ffc7393 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 22 Nov 2018 18:06:27 +0000 Subject: [PATCH] Remove Cosmos RollingSnapshots --- cli/Equinox.Cli/Program.fs | 17 +- samples/Store/Domain/Cart.fs | 1 - samples/Store/Domain/ContactPreferences.fs | 4 +- samples/Store/Domain/Favorites.fs | 1 - samples/Store/Integration/CartIntegration.fs | 30 +- .../ContactPreferencesIntegration.fs | 20 +- .../Store/Integration/FavoritesIntegration.fs | 4 +- samples/Store/Integration/LogIntegration.fs | 11 +- src/Equinox.Cosmos/Cosmos.fs | 568 +++++++----------- src/Equinox/Equinox.fs | 2 +- .../CosmosCoreIntegration.fs | 37 +- .../CosmosFixtures.fs | 15 +- .../CosmosIntegration.fs | 84 +-- .../CosmosTokenTests.fs | 80 --- .../Equinox.Cosmos.Integration.fsproj | 1 - 15 files changed, 311 insertions(+), 564 deletions(-) delete mode 100644 tests/Equinox.Cosmos.Integration/CosmosTokenTests.fs diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index 760e4bbc8..1dd951d8e 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -135,7 +135,7 @@ module Test = clients.[clientIndex % clients.Length] let selectClient = async { return async { return selectClient() } } Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest - let fold, initial, compact, index = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact, Domain.Favorites.Folds.index + let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect() let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings) let codec = genCodec() @@ -150,19 +150,16 @@ module Test = let c = Equinox.Cosmos.Builder.Caching.Cache("Cli", sizeMb = 50) Equinox.Cosmos.Builder.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some else None - let resolveStream streamName = + let resolveStream = match store with | Store.Mem store -> - Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName) + Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create | Store.Es gateway -> - GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create(streamName) + GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create | Store.Cosmos (gateway, databaseId, connectionId) -> - if targs.Contains Indexed then - EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.IndexedSearch index, ?caching = eqxCache) - .Create(databaseId, connectionId, streamName) - else - EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact, ?caching = eqxCache) - .Create(databaseId, connectionId, streamName) + let store = EqxStore(gateway, EqxCollections(databaseId, connectionId)) + if targs.Contains Indexed then EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection compact, ?caching = eqxCache).Create + else EqxStreamBuilder(store, codec, fold, initial, ?access=None, ?caching = eqxCache).Create Backend.Favorites.Service(log, resolveStream) let runFavoriteTest (service : Backend.Favorites.Service) clientId = async { let sku = Guid.NewGuid() |> SkuId diff --git a/samples/Store/Domain/Cart.fs b/samples/Store/Domain/Cart.fs index 3c9253b3a..cfa206e61 100644 --- a/samples/Store/Domain/Cart.fs +++ b/samples/Store/Domain/Cart.fs @@ -43,7 +43,6 @@ module Folds = | Events.ItemWaiveReturnsChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with returnsWaived = e.waived } | i -> i)) let fold state = Seq.fold evolve state let compact = Events.Compaction.EventType, fun state -> Events.Compacted (State.toSnapshot state) - let index = (fun et -> et = Events.Compaction.EventType), fun state -> seq [ yield Events.Compacted (State.toSnapshot state) ] type Context = { time: System.DateTime; requestId : RequestId } type Command = | AddItem of Context * SkuId * quantity: int diff --git a/samples/Store/Domain/ContactPreferences.fs b/samples/Store/Domain/ContactPreferences.fs index 45ce4acd0..88cf9ded7 100644 --- a/samples/Store/Domain/ContactPreferences.fs +++ b/samples/Store/Domain/ContactPreferences.fs @@ -7,9 +7,11 @@ module Events = type Preferences = { manyPromotions : bool; littlePromotions : bool; productReview : bool; quickSurveys : bool } type Value = { email : string; preferences : Preferences } + let [] EventTypeName = "contactPreferencesChanged" type Event = - | []Updated of Value + | []Updated of Value interface TypeShape.UnionContract.IUnionContract + let eventTypeNames = System.Collections.Generic.HashSet([EventTypeName]) module Folds = type State = Events.Preferences diff --git a/samples/Store/Domain/Favorites.fs b/samples/Store/Domain/Favorites.fs index 97fd0ed1e..819f016c4 100644 --- a/samples/Store/Domain/Favorites.fs +++ b/samples/Store/Domain/Favorites.fs @@ -38,7 +38,6 @@ module Folds = for e in events do evolve s e s.AsState() let compact = Events.Compaction.EventType, fun state -> Events.Compacted { net = state } - let index = (fun x -> x = Events.Compaction.EventType), fun state -> seq [ Events.Compacted { net = state } ] type Command = | Favorite of date : System.DateTimeOffset * skuIds : SkuId list diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index ecd69d7eb..ae4992a69 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -17,15 +17,15 @@ let createServiceMem log store = let codec = Equinox.EventStore.Integration.EventStoreIntegration.genCodec() -let resolveGesStreamWithCompactionEventType gateway streamName = - GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create(streamName) -let resolveGesStreamWithoutCompactionSemantics gateway streamName = - GesStreamBuilder(gateway, codec, fold, initial).Create(streamName) +let resolveGesStreamWithRollingSnapshots gateway = + GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create +let resolveGesStreamWithoutCustomAccessStrategy gateway = + GesStreamBuilder(gateway, codec, fold, initial).Create -let resolveEqxStreamWithCompactionEventType gateway (StoreCollection args) = - EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args) -let resolveEqxStreamWithoutCompactionSemantics gateway (StoreCollection args) = - EqxStreamBuilder(gateway, codec, fold, initial).Create(args) +let resolveEqxStreamWithProjection gateway = + EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection compact).Create +let resolveEqxStreamWithoutCustomAccessStrategy gateway = + EqxStreamBuilder(gateway, codec, fold, initial).Create let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count = service.FlowAsync(cartId, fun _ctx execute -> @@ -60,24 +60,24 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async { - let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCompactionSemantics + let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy do! act service args } [] - let ``Can roundtrip against EventStore, correctly folding the events with compaction`` args = Async.RunSynchronously <| async { - let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithCompactionEventType + let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async { + let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots do! act service args } [] - let ``Can roundtrip against Cosmos, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async { - let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithoutCompactionSemantics + let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = Async.RunSynchronously <| async { + let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithoutCustomAccessStrategy do! act service args } [] - let ``Can roundtrip against Cosmos, correctly folding the events with compaction`` args = Async.RunSynchronously <| async { - let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithCompactionEventType + let ``Can roundtrip against Cosmos, correctly folding the events with With Projection`` args = Async.RunSynchronously <| async { + let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithProjection do! act service args } \ No newline at end of file diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index b9efe313c..1a3d27be1 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -16,15 +16,15 @@ let createServiceMem log store = Backend.ContactPreferences.Service(log, MemoryStreamBuilder(store, fold, initial).Create) let codec = genCodec() -let resolveStreamGesWithCompactionSemantics gateway streamName = - GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(streamName) -let resolveStreamGesWithoutCompactionSemantics gateway streamName = - GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(streamName) +let resolveStreamGesWithCompactionSemantics gateway = + GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create +let resolveStreamGesWithoutCompactionSemantics gateway = + GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create -let resolveStreamEqxWithCompactionSemantics gateway (StoreCollection args) = - EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args) -let resolveStreamEqxWithoutCompactionSemantics gateway (StoreCollection args) = - EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args) +let resolveStreamEqxWithCompactionSemantics gateway = + EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType Domain.ContactPreferences.Events.eventTypeNames).Create +let resolveStreamEqxWithoutCompactionSemantics gateway = + EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper @@ -63,12 +63,12 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async { - let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics + let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithoutCompactionSemantics do! act service args } [] let ``Can roundtrip against Cosmos, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async { - let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithCompactionSemantics + let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithCompactionSemantics do! act service args } \ No newline at end of file diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index 7b21144ca..9a6b7d9f9 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -20,7 +20,7 @@ let createServiceGes gateway log = Backend.Favorites.Service(log, GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create) let createServiceEqx gateway log = - let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args) + let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection compact).Create Backend.Favorites.Service(log, resolveStream) type Tests(testOutputHelper) = @@ -57,7 +57,7 @@ type Tests(testOutputHelper) = let ``Can roundtrip against Cosmos, correctly folding the events`` args = Async.RunSynchronously <| async { let log = createLog () let! conn = connectToSpecifiedCosmosOrSimulator log - let gateway = createEqxGateway conn defaultBatchSize + let gateway = createEqxStore conn defaultBatchSize let service = createServiceEqx gateway log do! act service args } \ No newline at end of file diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs index 8f72bac45..b683966d3 100644 --- a/samples/Store/Integration/LogIntegration.fs +++ b/samples/Store/Integration/LogIntegration.fs @@ -96,9 +96,10 @@ type Tests() = do! CartIntegration.addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service itemCount let! state = service.Read cartId test <@ itemCount = match state with { items = [{ quantity = quantity }] } -> quantity | _ -> failwith "nope" @> - // Even though we've gone over a page, we only need a single read to read the state (plus the one from the execute) + // Because we're using Access Strategies that enable us to read our state in a single roundtrip... + // (even though we've gone over a page), we only need a single read to read the state (plus the one from the execute) let contains (s : string) (x : string) = x.Contains s - test <@ let reads = buffer |> Seq.filter (fun s -> s |> contains resultTag) + test <@ let reads = buffer |> Seq.filter (contains resultTag) 2 = Seq.length reads @> } // Protip: Debug this test to view standard metrics rendering @@ -109,7 +110,7 @@ type Tests() = let log = createLoggerWithMetricsExtraction buffer.Enqueue let! conn = connectToLocalEventStoreNode log let gateway = createGesGateway conn batchSize - let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithCompactionEventType gateway) + let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithRollingSnapshots gateway) let itemCount = batchSize / 2 + 1 let cartId = Domain.Infrastructure.CartId(System.Guid.NewGuid()) do! act buffer service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration" @@ -121,8 +122,8 @@ type Tests() = let buffer = ConcurrentQueue() let log = createLoggerWithMetricsExtraction buffer.Enqueue let! conn = connectToSpecifiedCosmosOrSimulator log - let gateway = createEqxGateway conn batchSize - let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithCompactionEventType gateway) + let gateway = createEqxStore conn batchSize + let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithProjection gateway) let itemCount = batchSize / 2 + 1 let cartId = Domain.Infrastructure.CartId(System.Guid.NewGuid()) do! act buffer service itemCount context cartId skuId "EqxReadStreamEventsBackwardAsync-Duration" diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 37e8c69bc..79c75db30 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -56,8 +56,6 @@ type Base64ZipUtf8JsonConverter() = namespace Equinox.Cosmos.Events -open Equinox.Store.Infrastructure // Option shims for downlevel frameworks - /// Common form for either a raw Event or a Projection type IEvent = /// The Event Type, used to drive deserialization @@ -88,10 +86,6 @@ type [] Position = { index: int64; etag: string option } with if Array.isEmpty xs then Position.FromKnownEmpty else Position.FromI (1L + Seq.max (seq { for x in xs -> x.Index })) -/// Reference to Storage Partition -type [] Stream = { collectionUri: System.Uri; name: string } with - static member Create(collectionUri, name) = { collectionUri = collectionUri; name = name } - namespace Equinox.Cosmos.Store open Equinox.Cosmos.Events @@ -193,8 +187,7 @@ and [] [] m: byte[] } // optional /// Projection based on the state at a given point in time `i` -and [] - Projection = +and Projection = { /// Base: Max index rolled into this projection i: int64 @@ -251,6 +244,10 @@ type Enum() = // where Index is equal, projections get delivered after the events so the fold semantics can be 'idempotent' |> Seq.sortBy (fun x -> x.Index, x.IsProjection) +/// Reference to Collection and name that will be used as the location for the stream +type [] CollectionStream = { collectionUri: System.Uri; name: string } with + static member Create(collectionUri, name) = { collectionUri = collectionUri; name = name } + namespace Equinox.Cosmos open Equinox @@ -261,18 +258,12 @@ open FSharp.Control open Microsoft.Azure.Documents open Serilog open System +open System.Collections.Generic [] type Direction = Forward | Backward with override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward" -[] -type AccessStrategy<'event,'state> = - | EventsAreState - | //[] - RollingSnapshots of eventType: string * compact: ('state -> 'event) - | IndexedSearch of (string -> bool) * index: ('state -> 'event seq) - module Log = [] type Measurement = { stream: string; interval: StopwatchInterval; bytes: int; count: int; ru: float } @@ -460,7 +451,7 @@ function sync(req, expectedVersion) { | Conflict of Position * events: IOrderedEvent[] | ConflictUnknown of Position - let private run (client: IDocumentClient) (stream: Stream) (expectedVersion: int64 option, req: WipBatch) + let private run (client: IDocumentClient) (stream: CollectionStream) (expectedVersion: int64 option, req: WipBatch) : Async = async { let sprocLink = sprintf "%O/sprocs/%s" stream.collectionUri sprocName let opts = Client.RequestOptions(PartitionKey=PartitionKey(stream.name)) @@ -476,7 +467,7 @@ function sync(req, expectedVersion) { | [||] -> Result.ConflictUnknown newPos | xs -> Result.Conflict (newPos, Enum.Events (ev.index, xs) |> Array.ofSeq) } - let private logged client (stream: Stream) (expectedVersion, req: WipBatch) (log : ILogger) + let private logged client (stream: CollectionStream) (expectedVersion, req: WipBatch) (log : ILogger) : Async = async { let verbose = log.IsEnabled Events.LogEventLevel.Debug let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataProjections req.c else log @@ -505,7 +496,7 @@ function sync(req, expectedVersion) { let batch (log : ILogger) retryPolicy client pk batch: Async = let call = logged client pk batch Log.withLoggedRetries retryPolicy "writeAttempt" call log - let mkBatch (stream: Events.Stream) (events: IEvent[]) projections : WipBatch = + let mkBatch (stream: Store.CollectionStream) (events: IEvent[]) projections: WipBatch = { p = stream.name; id = Store.WipBatch.WellKnownDocumentId; _i = -1L(*Server-managed*); _etag = null e = [| for e in events -> { c = DateTimeOffset.UtcNow; t = e.EventType; d = e.Data; m = e.Meta } |] c = Array.ofSeq projections } @@ -548,15 +539,15 @@ function sync(req, expectedVersion) { //let! _aux = createAux client dbUri collName auxRu return! createProc log client collUri } -module private Read = - let private getIndex (client: IDocumentClient) (stream: Stream, maybePos: Position option) = +module private Index = + let private get (client: IDocumentClient) (stream: CollectionStream, maybePos: Position option) = let coll = DocDbCollection(client, stream.collectionUri) let ac = match maybePos with Some { etag=Some etag } -> Client.AccessCondition(Type=Client.AccessConditionType.IfNoneMatch, Condition=etag) | _ -> null let ro = Client.RequestOptions(PartitionKey=PartitionKey(stream.name), AccessCondition = ac) coll.TryReadDocument(WipBatch.WellKnownDocumentId, ro) - let private loggedGetIndex (getIndex : Stream * Position option -> Async<_>) (stream: Stream, maybePos: Position option) (log: ILogger) = async { + let private loggedGet (get : CollectionStream * Position option -> Async<_>) (stream: CollectionStream, maybePos: Position option) (log: ILogger) = async { let log = log |> Log.prop "stream" stream.name - let! t, (ru, res : ReadResult) = getIndex (stream,maybePos) |> Stopwatch.Time + let! t, (ru, res : ReadResult) = get (stream,maybePos) |> Stopwatch.Time let log count bytes (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru }) match res with | ReadResult.NotModified -> @@ -570,29 +561,30 @@ module private Read = let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataProjections doc.c |> Log.prop "etag" doc._etag log.Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru) return ru, res } - type [] IndexResult = NotModified | NotFound | Found of Position * IOrderedEvent[] + type [] Result = NotModified | NotFound | Found of Position * IOrderedEvent[] /// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with IndexResult.UnChanged - let tryLoadIndex (log : ILogger) retryPolicy client (stream: Stream, maybePos: Position option): Async = async { - let getIndex = getIndex client - let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGetIndex getIndex (stream,maybePos)) log + let tryLoad (log : ILogger) retryPolicy client (stream: CollectionStream) (maybePos: Position option): Async = async { + let get = get client + let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGet get (stream,maybePos)) log match res with - | ReadResult.NotModified -> return IndexResult.NotModified - | ReadResult.NotFound -> return IndexResult.NotFound - | ReadResult.Found doc -> return IndexResult.Found (doc.ToPosition(), Enum.EventsAndProjections doc |> Array.ofSeq) } + | ReadResult.NotModified -> return Result.NotModified + | ReadResult.NotFound -> return Result.NotFound + | ReadResult.Found doc -> return Result.Found (doc.ToPosition(), Enum.EventsAndProjections doc |> Array.ofSeq) } + module private Query = open Microsoft.Azure.Documents.Linq - let private genBatchesQuery (client : IDocumentClient) (stream: Stream, pos: Position option) (direction: Direction) batchSize = + let private mkQuery (client : IDocumentClient) maxItems (stream: CollectionStream) (direction: Direction) (startPos: Position option) = let querySpec = - match pos with + match startPos with | None -> SqlQuerySpec("SELECT * FROM c WHERE c.i!=-1 ORDER BY c.i " + if direction = Direction.Forward then "ASC" else "DESC") | Some p -> let f = if direction = Direction.Forward then "c.i >= @id ORDER BY c.i ASC" else "c.i < @id ORDER BY c.i DESC" SqlQuerySpec("SELECT * FROM c WHERE c.i != -1 AND " + f, SqlParameterCollection [SqlParameter("@id", p.index)]) - let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable batchSize) + let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems) client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() // Unrolls the Batches in a response - note when reading backawards, the events are emitted in reverse order of index - let private handleSlice (stream: Stream, startPos: Position option) direction (query: IDocumentQuery) (log: ILogger) + let private handleSlice direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery) (log: ILogger) : Async = async { let! ct = Async.CancellationToken let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time @@ -610,7 +602,7 @@ module private Read = let maybePosition = batches |> Array.tryPick (fun x -> x.TryToPosition()) return events, maybePosition, ru } - let private runBatchesQuery (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) + let private runQuery (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) (maxPermittedBatchReads: int option) (query: IDocumentQuery) : AsyncSeq = @@ -626,7 +618,7 @@ module private Read = yield! loop (batchCount + 1) } loop 0 - let logBatchRead direction batchSize streamName interval (responsesCount, events : IOrderedEvent []) nextI (ru: float) (log : ILogger) = + let private logBatchRead direction batchSize streamName interval (responsesCount, events : IOrderedEvent []) nextI (ru: float) (log : ILogger) = let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } let action = match direction with Direction.Forward -> "LoadF" | Direction.Backward -> "LoadB" @@ -636,145 +628,75 @@ module private Read = "Eqx {action:l} {stream} v{nextI} {count}/{responses} {ms}ms rc={ru}", action, streamName, nextI, count, responsesCount, (let e = interval.Elapsed in e.TotalMilliseconds), ru) - let loadFrom (log : ILogger) retryPolicy client direction batchSize maxPermittedBatchReads (stream: Stream, pos: Position option) - : Async = async { - let mutable ru = 0.0 - let mutable responses = 0 - let mutable maybeIndexDocument = None - let mergeBatches (batches: AsyncSeq) = async { - let! (events : IOrderedEvent[]) = - batches - |> AsyncSeq.map (fun (events, maybePos, r) -> - if maybeIndexDocument = None then maybeIndexDocument <- maybePos - ru <- ru + r - responses <- responses + 1 - events) - |> AsyncSeq.concatSeq - |> AsyncSeq.toArrayAsync - return events, maybeIndexDocument, ru } - use query = genBatchesQuery client (stream,pos) direction batchSize - let pullSlice = handleSlice (stream,pos) direction - let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query) - let log = log |> Log.prop "batchSize" batchSize |> Log.prop "direction" direction |> Log.prop "stream" stream.name - let slices : AsyncSeq = runBatchesQuery log retryingLoggingReadSlice maxPermittedBatchReads query - let! t, (events, maybePos, ru) = mergeBatches slices |> Stopwatch.Time - query.Dispose() - return t, responses, events, maybePos, ru } - - let inferPosition maybeIndexDocument (events: IOrderedEvent[]): Position = match maybeIndexDocument with Some p -> p | None -> Position.FromMaxIndex events - - let loadForwardsFrom (log : ILogger) retryPolicy client batchSize maxPermittedBatchReads (stream: Stream, pos: Position option) - : Async = async { - let direction = Direction.Forward - let! t, responses, events, maybePos, ru = loadFrom log retryPolicy client direction batchSize maxPermittedBatchReads (stream,pos) - let pos = inferPosition maybePos events - log |> logBatchRead direction batchSize stream.name t (responses,events) pos ru - return pos, events } - - let loadBackwardsFrom (log : ILogger) retryPolicy client batchSize maxPermittedBatchReads (stream: Stream, pos: Position option) - : Async = async { - let direction = Direction.Backward - let! t, responses, events, maybePos, ru = loadFrom log retryPolicy client direction batchSize maxPermittedBatchReads (stream,pos) - let pos = inferPosition maybePos events - log |> logBatchRead direction batchSize stream.name t (responses,events) pos ru - return pos, events } + let private inferPosition maybeIndexDocument (events: IOrderedEvent[]): Position = match maybeIndexDocument with Some p -> p | None -> Position.FromMaxIndex events - let calculateUsedVersusDroppedPayload firstUsedEventIndex (xs: IOrderedEvent[]) : int * int = + let private calculateUsedVersusDroppedPayload stopIndex (xs: IOrderedEvent[]) : int * int = let mutable used, dropped = 0, 0 + let mutable found = false for x in xs do let (Log.EventLen bytes) = x - if x.Index >= firstUsedEventIndex then used <- used + bytes - else dropped <- dropped + bytes + if found then dropped <- dropped + bytes + else used <- used + bytes + if x.Index = stopIndex then found <- true used, dropped - let loadBackwardsUntilCompactionOrStart (log : ILogger) retryPolicy client batchSize maxPermittedBatchReads isCompactionEvent (stream: Stream) + + let walk (log : ILogger) client retryPolicy maxItems maxRequests direction (stream: CollectionStream) startPos predicate : Async = async { - let mutable responseCount = 0 - let mergeFromCompactionPointOrStartFromBackwardsStream (log : ILogger) (batchesBackward : AsyncSeq) + let responseCount = ref 0 + let mergeBatches (log : ILogger) (batchesBackward : AsyncSeq) : Async = async { - let mutable lastSlice = None + let mutable lastResponse = None let mutable maybeIndexDocument = None let mutable ru = 0.0 - let! tempBackward = + let! events = batchesBackward |> AsyncSeq.map (fun (events, maybePos, r) -> if maybeIndexDocument = None then maybeIndexDocument <- maybePos - lastSlice <- Some events; ru <- ru + r - responseCount <- responseCount + 1 + lastResponse <- Some events; ru <- ru + r + incr responseCount events) |> AsyncSeq.concatSeq |> AsyncSeq.takeWhileInclusive (fun x -> - if not (isCompactionEvent x) then true // continue the search + if not (predicate x) then true // continue the search else - match lastSlice with + match lastResponse with | None -> log.Information("Eqx Stop stream={stream} at={index}", stream.name, x.Index) | Some batch -> let used, residual = batch |> calculateUsedVersusDroppedPayload x.Index log.Information("Eqx Stop stream={stream} at={index} used={used} residual={residual}", stream.name, x.Index, used, residual) false) |> AsyncSeq.toArrayAsync - let eventsForward = Array.Reverse(tempBackward); tempBackward // sic - relatively cheap, in-place reverse of something we own - return eventsForward, maybeIndexDocument, ru } - let direction = Direction.Backward - use query = genBatchesQuery client (stream,None) direction batchSize - let pullSlice = handleSlice (stream,None) direction + return events, maybeIndexDocument, ru } + use query = mkQuery client maxItems stream direction startPos + let pullSlice = handleSlice direction stream startPos let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query) - let log = log |> Log.prop "batchSize" batchSize |> Log.prop "stream" stream.name + let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream.name let readlog = log |> Log.prop "direction" direction - let batchesBackward : AsyncSeq = runBatchesQuery readlog retryingLoggingReadSlice maxPermittedBatchReads query - let! t, (events, maybeIndexDocument, ru) = mergeFromCompactionPointOrStartFromBackwardsStream log batchesBackward |> Stopwatch.Time + let batches : AsyncSeq = runQuery readlog retryingLoggingReadSlice maxRequests query + let! t, (events, maybeIndexDocument, ru) = mergeBatches log batches |> Stopwatch.Time query.Dispose() let pos = inferPosition maybeIndexDocument events - log |> logBatchRead direction batchSize stream.name t (responseCount,events) pos.index ru + log |> logBatchRead direction maxItems stream.name t (!responseCount,events) pos.index ru return pos, events } module UnionEncoderAdapters = - let private mkEvent (x : UnionCodec.EncodedUnion) : IEvent = + let encodeEvent (codec : UnionCodec.IUnionEncoder<'event, byte[]>) (x : 'event) : IEvent = + let e = codec.Encode x { new IEvent with - member __.EventType = x.caseName - member __.Data = x.payload + member __.EventType = e.caseName + member __.Data = e.payload member __.Meta = null } - let encodeEvent (codec : UnionCodec.IUnionEncoder<'event, byte[]>) (x : 'event) : IEvent = - codec.Encode x |> mkEvent - let decodeKnownEvents (codec : UnionCodec.IUnionEncoder<'event, byte[]>) (xs : IOrderedEvent seq) : 'event seq = - xs |> Seq.choose (fun x -> codec.TryDecode { caseName = x.EventType; payload = x.Data }) - -type []Token = { stream: Stream; pos: Position; rollingSnapshotEventIndex: int64 option; batchCapacityLimit: int option } + let decodeKnownEvents (codec : UnionCodec.IUnionEncoder<'event, byte[]>): IOrderedEvent seq -> 'event seq = + Seq.choose (fun x -> codec.TryDecode { caseName = x.EventType; payload = x.Data }) +type [] Token = { stream: CollectionStream; pos: Position } module Token = - let private create rollingSnapshotEventIndex batchCapacityLimit stream pos : Storage.StreamToken = - { value = box { stream = stream; pos = pos; rollingSnapshotEventIndex = rollingSnapshotEventIndex; batchCapacityLimit = batchCapacityLimit } } - /// No batching / compaction; we only need to retain the StreamVersion - let ofNonCompacting (stream,pos) : Storage.StreamToken = - create None None stream pos - // headroom before compaction is necessary given the stated knowledge of the last (if known) `rollingSnapshotEventIndexption` - let private batchCapacityLimit maybeSnapshotEventIndex unstoredEventsPending (windowSize: int) (pos: Position) : int = - match maybeSnapshotEventIndex with - | Some (rollingSnapshotEventIndex : int64) -> (windowSize - unstoredEventsPending) - int (pos.index - rollingSnapshotEventIndex + 1L) |> max 0 - | None -> (windowSize - unstoredEventsPending) - int pos.index |> max 0 - let (*private*) ofRollingSnapshotEventIndex maybeSnapshotEventIndex unstoredEventsPending batchSize (stream,pos) : Storage.StreamToken = - let batchCapacityLimit = batchCapacityLimit maybeSnapshotEventIndex unstoredEventsPending batchSize pos - create maybeSnapshotEventIndex (Some batchCapacityLimit) stream pos - /// Assume we have not seen any compaction events; use the batchSize and version to infer headroom - let ofUncompactedVersion batchSize pos : Storage.StreamToken = - ofRollingSnapshotEventIndex None 0 batchSize pos - let (|Unpack|) (token: Storage.StreamToken) : Token = unbox token.value - /// Use previousToken plus the data we are adding and the position we are adding it to infer a headroom - let ofPreviousTokenAndEventsLength (Unpack previousToken) eventsLength batchSize pos : Storage.StreamToken = - let rollingSnapshotEventIndexOption = previousToken.rollingSnapshotEventIndex - ofRollingSnapshotEventIndex rollingSnapshotEventIndexOption eventsLength batchSize pos - let ofPreviousTokenWithUpdatedPosition (Unpack previousToken) batchSize pos : Storage.StreamToken = - let rollingSnapshotEventIndexOption = previousToken.rollingSnapshotEventIndex - ofRollingSnapshotEventIndex rollingSnapshotEventIndexOption 0 batchSize pos - /// Use an event just read from the stream to infer headroom - let ofCompactionResolvedEventAndVersion (compactionEvent: IOrderedEvent) batchSize pos : Storage.StreamToken = - ofRollingSnapshotEventIndex (Some compactionEvent.Index) 0 batchSize pos - /// Use an event we are about to write to the stream to infer headroom - let ofPreviousStreamVersionAndCompactionEventDataIndex prevPos compactionEventDataIndex eventsLength batchSize streamVersion' : Storage.StreamToken = - ofRollingSnapshotEventIndex (Some (prevPos.index + int64 compactionEventDataIndex)) eventsLength batchSize streamVersion' - let supersedes (Unpack current) (Unpack x) = - let currentVersion, newVersion = current.pos.index, x.pos.index - let currentETag, newETag = current.pos.etag, x.pos.etag + let create stream pos : Storage.StreamToken = { value = box { stream = stream; pos = pos } } + let (|Unpack|) (token: Storage.StreamToken) : CollectionStream*Position = let t = unbox token.value in t.stream,t.pos + let supersedes (Unpack (_,currentPos)) (Unpack (_,xPos)) = + let currentVersion, newVersion = currentPos.index, xPos.index + let currentETag, newETag = currentPos.etag, xPos.etag newVersion > currentVersion || currentETag <> newETag namespace Equinox.Cosmos.Builder @@ -787,6 +709,7 @@ open FSharp.Control open Microsoft.Azure.Documents open Serilog open System +open System.Collections.Generic [] module Internal = @@ -796,17 +719,12 @@ module Internal = [] type LoadFromTokenResult = Unchanged | Found of Storage.StreamToken * IOrderedEvent[] - [] - type SearchStrategy<'event> = - | EventType of string - | Predicate of ('event -> bool) - /// Defines the policies in force for retrying with regard to transient failures calling CosmosDb (as opposed to application level concurrency conflicts) type EqxConnection(client: IDocumentClient, ?readRetryPolicy (*: (int -> Async<'T>) -> Async<'T>*), ?writeRetryPolicy) = member __.Client = client member __.ReadRetryPolicy = readRetryPolicy member __.WriteRetryPolicy = writeRetryPolicy - member __.Close = (client :?> Client.DocumentClient).Dispose() + //member __.Close = (client :?> Client.DocumentClient).Dispose() /// Defines the policies in force regarding how to constrain query responses type EqxBatchingPolicy @@ -815,159 +733,86 @@ type EqxBatchingPolicy // Dynamic version of `defaultMaxItems`, allowing one to react to dynamic configuration changes. Default to using `defaultMaxItems` ?getDefaultMaxItems : unit -> int, /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices`. Default: unlimited. - ?maxCalls) = + ?maxRequests) = let getdefaultMaxItems = defaultArg getDefaultMaxItems (fun () -> defaultArg defaultMaxItems 10) /// Limit for Maximum number of `Batch` records in a single query batch response member __.MaxItems = getdefaultMaxItems () /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices` - member __.MaxCalls = maxCalls + member __.MaxRequests = maxRequests type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = - let (|EventTypePredicate|) predicate (x:IOrderedEvent) = predicate x.EventType - let (|IEventDataArray|) events = [| for e in events -> e :> IOrderedEvent |] - member __.LoadForward log batchingOverride maybeRollingSnapshotPredicate (stream: Stream, pos: Position option) - : Async = async { - let batching = defaultArg batchingOverride batching - let! pos, events = Read.loadForwardsFrom log conn.ReadRetryPolicy conn.Client batching.MaxItems batching.MaxCalls (stream,pos) - match maybeRollingSnapshotPredicate with - | None -> return Token.ofNonCompacting (stream,pos), events - | Some (EventTypePredicate isCompactionEvent) -> - match events |> Array.tryFindBack isCompactionEvent with - | None -> return Token.ofUncompactedVersion batching.MaxItems (stream,pos), events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.MaxItems (stream,pos), events } - member __.LoadBackward log batchingOverride (stream: Stream, pos: Position option) - : Async = async { + let eventTypesPredicate resolved = + let acc = HashSet() + fun (x: IOrderedEvent) -> + acc.Add x.EventType |> ignore + resolved acc + let (|Satisfies|_|) predicate (xs:IOrderedEvent[]) = + match Array.tryFindIndexBack predicate xs with + | None -> None + | Some index -> Array.sub xs index (xs.Length - index) |> Some + let loadBackwardsStopping log predicate stream: Async = async { + let! pos, events = Query.walk log conn.Client conn.ReadRetryPolicy batching.MaxItems batching.MaxRequests Direction.Backward stream None predicate + Array.Reverse events + return Token.create stream pos, events } + member __.Read log batchingOverride stream direction startPos predicate: Async = async { let batching = defaultArg batchingOverride batching - return! Read.loadBackwardsFrom log conn.ReadRetryPolicy conn.Client batching.MaxItems batching.MaxCalls (stream,pos) } - member __.LoadBackwardsStoppingAtCompactionEvent log (EventTypePredicate isCompactionEvent) stream - : Async = async { - let! pos, events = Read.loadBackwardsUntilCompactionOrStart log conn.ReadRetryPolicy conn.Client batching.MaxItems batching.MaxCalls isCompactionEvent stream - match Array.tryHead events |> Option.filter isCompactionEvent with - | None -> return Token.ofUncompactedVersion batching.MaxItems (stream,pos), events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.MaxItems (stream,pos), events } - member private __.InterpretIndexOrFallback log (EventTypePredicate isRelevantProjectionOrRollingSnapshot as etp) stream res - : Async = async { + let! pos, events = Query.walk log conn.Client conn.ReadRetryPolicy batching.MaxItems batching.MaxRequests direction stream startPos predicate + return Token.create stream pos, events } + member __.LoadFromProjectionsOrRollingSnapshots log predicate (stream,maybePos): Async = async { + let! res = Index.tryLoad log None(* TODO conn.ReadRetryPolicy*) conn.Client stream maybePos + let predicate = eventTypesPredicate predicate match res with - | Read.IndexResult.NotModified -> return invalidOp "Not handled" - | Read.IndexResult.Found (pos, projectionsAndEvents) when projectionsAndEvents |> Array.exists isRelevantProjectionOrRollingSnapshot -> - return Token.ofNonCompacting (stream,pos), projectionsAndEvents - | _ -> return! __.LoadBackwardsStoppingAtCompactionEvent log etp stream } - member __.IndexedOrBatched log isCompactionEventType (stream,maybePos) - : Async = async { - let! res = Read.tryLoadIndex log None(* TODO conn.ReadRetryPolicy*) conn.Client (stream,maybePos) - return! __.InterpretIndexOrFallback log isCompactionEventType stream res } - member __.GetPosition(log, stream, ?pos) - : Async = async { - let! res = Read.tryLoadIndex log None(* TODO conn.ReadRetryPolicy*) conn.Client (stream,pos) + | Index.Result.NotModified -> return invalidOp "Not handled" + | Index.Result.Found (pos, Satisfies predicate enoughEvents) -> return Token.create stream pos, enoughEvents + | _ -> return! loadBackwardsStopping log predicate stream } + member __.GetPosition(log, stream, ?pos): Async = async { + let! res = Index.tryLoad log None(* TODO conn.ReadRetryPolicy*) conn.Client stream pos match res with - | Read.IndexResult.NotFound -> return Token.ofNonCompacting(stream,Position.FromKnownEmpty) - | Read.IndexResult.NotModified -> return Token.ofNonCompacting (stream, pos.Value) - | Read.IndexResult.Found (pos, _projectionsAndEvents) -> return Token.ofNonCompacting (stream,pos) } - member __.LoadFromToken log (Token.Unpack token as streamToken) maybeRollingSnapshotOrProjectionPredicate tryIndex - : Async = async { - let ok r = LoadFromTokenResult.Found r - if not tryIndex then - let! pos, ((IEventDataArray xs) as events) = - Read.loadForwardsFrom log conn.ReadRetryPolicy conn.Client batching.MaxItems batching.MaxCalls (token.stream,Some token.pos) - let ok t = ok (t,xs) - match maybeRollingSnapshotOrProjectionPredicate with - | None -> return ok (Token.ofNonCompacting (token.stream,token.pos)) - | Some (EventTypePredicate isCompactionEvent) -> - match events |> Array.tryFindBack isCompactionEvent with - | None -> return ok (Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.MaxItems (token.stream,token.pos)) - | Some resolvedEvent -> return ok (Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.MaxItems (token.stream,pos)) - else - let! res = Read.tryLoadIndex log None(* TODO conn.ReadRetryPolicy*) conn.Client (token.stream,Some token.pos) - match res with - | Read.IndexResult.NotModified -> - return LoadFromTokenResult.Unchanged - | _ -> - let! loaded = __.InterpretIndexOrFallback log maybeRollingSnapshotOrProjectionPredicate.Value token.stream res - return ok loaded } - member __.TrySync log (Token.Unpack token as streamToken) maybeRollingSnapshotPredicate (expectedVersion, batch: Store.WipBatch) - : Async = async { - let! wr = Sync.batch log conn.WriteRetryPolicy conn.Client token.stream (expectedVersion,batch) + | Index.Result.NotFound -> return Token.create stream Position.FromKnownEmpty + | Index.Result.NotModified -> return Token.create stream pos.Value + | Index.Result.Found (pos, _projectionsAndEvents) -> return Token.create stream pos } + member __.LoadFromToken log (stream,pos) predicate: Async = async { + let predicate = eventTypesPredicate predicate + let! res = Index.tryLoad log None(* TODO conn.ReadRetryPolicy*) conn.Client stream (Some pos) + match res with + | Index.Result.NotFound -> return LoadFromTokenResult.Found (Token.create stream Position.FromKnownEmpty,Array.empty) + | Index.Result.NotModified -> return LoadFromTokenResult.Unchanged + | Index.Result.Found (pos, Satisfies predicate enoughEvents) -> return LoadFromTokenResult.Found (Token.create stream pos, enoughEvents) + | _ -> + let! res = __.Read log None stream Direction.Forward (Some pos) (fun _ -> false) + return LoadFromTokenResult.Found res } + member __.Sync log stream (expectedVersion, batch: Store.WipBatch): Async = async { + let! wr = Sync.batch log conn.WriteRetryPolicy conn.Client stream (expectedVersion,batch) match wr with - | Sync.Result.Conflict (pos',events) -> - return InternalSyncResult.Conflict (Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.MaxItems (token.stream,pos'),events) - | Sync.Result.ConflictUnknown pos' -> - return InternalSyncResult.ConflictUnknown (Token.ofPreviousTokenWithUpdatedPosition streamToken batching.MaxItems (token.stream,pos')) - | Sync.Result.Written pos' -> - - let token = - match maybeRollingSnapshotPredicate with - | None -> Token.ofNonCompacting (token.stream,pos') - | Some isCompactionEvent -> - match batch.e |> Array.tryFindIndexBack (fun x -> isCompactionEvent x.t) with - | None -> Token.ofPreviousTokenAndEventsLength streamToken batch.e.Length batching.MaxItems (token.stream,pos') - | Some compactionEventIndex -> - Token.ofPreviousStreamVersionAndCompactionEventDataIndex token.pos compactionEventIndex batch.e.Length batching.MaxItems (token.stream,pos') - return InternalSyncResult.Written token } - -type EqxCollection(gateway : EqxGateway, databaseId, collectionId) = - member __.Gateway = gateway - member __.CollectionUri = Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId) - -type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) = - /// Determines whether writing a Compaction event is warranted (based on the existing state and the current `Accumulated` changes) - member __.IsCompactionDue = eventsLen > capacityBeforeCompaction - -type private Category<'event, 'state> - ( coll : EqxCollection, - codec : UnionCodec.IUnionEncoder<'event, byte[]>, - ?access : AccessStrategy<'event,'state>) = - let compactionPredicate = - match access with - | None -> None - | Some (AccessStrategy.IndexedSearch (predicate,_)) -> Some predicate - | Some AccessStrategy.EventsAreState -> Some (fun _ -> true) - | Some (AccessStrategy.RollingSnapshots (et,_)) -> Some ((=) et) - let response (fold: 'state -> 'event seq -> 'state) initial token events = - token, fold initial (UnionEncoderAdapters.decodeKnownEvents codec events) - let load (fold: 'state -> 'event seq -> 'state) initial loadF = async { - let! token, events = loadF - return response fold initial token events } - member __.Load (fold: 'state -> 'event seq -> 'state) (initial: 'state) streamName (log : ILogger) - : Async = - let stream = Stream.Create(coll.CollectionUri, streamName) - let forward = load fold initial (coll.Gateway.LoadForward log None None (stream,None)) - let compacted predicate = load fold initial (coll.Gateway.LoadBackwardsStoppingAtCompactionEvent log predicate stream) - let indexed predicate = load fold initial (coll.Gateway.IndexedOrBatched log predicate (stream,None)) - match access with - | None -> forward - | Some (AccessStrategy.IndexedSearch (predicate,_)) -> indexed predicate - | Some AccessStrategy.EventsAreState -> compacted (fun _ -> true) - | Some (AccessStrategy.RollingSnapshots (et,_)) -> compacted ((=) et) - member __.LoadFromToken (fold: 'state -> 'event seq -> 'state) (initial: 'state) (state: 'state) token (log : ILogger) - : Async = async { - let indexed = match access with Some (AccessStrategy.IndexedSearch _) -> true | _ -> false - let! res = coll.Gateway.LoadFromToken log token compactionPredicate indexed + | Sync.Result.Conflict (pos',events) -> return InternalSyncResult.Conflict (Token.create stream pos',events) + | Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create stream pos') + | Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create stream pos') } + +type private Category<'event, 'state>(gateway : EqxGateway, codec : UnionCodec.IUnionEncoder<'event, byte[]>) = + let respond (fold: 'state -> 'event seq -> 'state) initial events : 'state = + fold initial (UnionEncoderAdapters.decodeKnownEvents codec events) + let handle fold initial load: Async<'t * 'state> = async { + let! token, events = load + return token, respond fold initial events } + member __.Load collectionStream fold initial predicate (log : ILogger): Async = + gateway.LoadFromProjectionsOrRollingSnapshots log predicate (collectionStream,None) |> handle fold initial + member __.LoadFromToken (Token.Unpack streamPos, state: 'state as current) fold predicate (log : ILogger): Async = async { + let! res = gateway.LoadFromToken log streamPos predicate match res with - | LoadFromTokenResult.Unchanged -> return token, state - | LoadFromTokenResult.Found (token,events ) -> return response fold initial token events } - member __.TrySync (fold: 'state -> 'event seq -> 'state) initial (log : ILogger) - (Token.Unpack token as streamToken, expectedVersion : int64 option, state : 'state) - (events : 'event list, state' : 'state) + | LoadFromTokenResult.Unchanged -> return current + | LoadFromTokenResult.Found (token',events) -> return token', respond fold state events } + member __.Sync (Token.Unpack (stream,pos), state as current) (project: 'state -> 'event seq -> 'event seq) (expectedVersion : int64 option, events, state') fold predicate log : Async> = async { - let eventsIncludingSnapshots, projections = - match access with - | None | Some AccessStrategy.EventsAreState -> - Seq.ofList events, Seq.empty - | Some (AccessStrategy.RollingSnapshots (_,f)) -> - let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) - (if cc.IsCompactionDue then Seq.append events [f state'] |> Seq.cache else Seq.ofList events), Seq.empty - | Some (AccessStrategy.IndexedSearch (_,index)) -> - Seq.ofList events, index state' let encode = UnionEncoderAdapters.encodeEvent codec - let encodedEvents, projections = Seq.map encode events |> Array.ofSeq, Seq.map encode projections - let baseIndex = token.pos.index + int64 events.Length - let projections = Sync.mkProjections baseIndex projections - let batch = Sync.mkBatch token.stream encodedEvents projections - let! syncRes = coll.Gateway.TrySync log streamToken compactionPredicate (expectedVersion,batch) - match syncRes with - | InternalSyncResult.Conflict (token',events) -> return Storage.SyncResult.Conflict (async { return response fold initial token' events }) - | InternalSyncResult.ConflictUnknown token' -> return Storage.SyncResult.Conflict (__.LoadFromToken fold initial state token' log) - | InternalSyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state eventsIncludingSnapshots) } + let eventsEncoded, projectionsEncoded = Seq.map encode events |> Array.ofSeq, Seq.map encode (project state' events) + let baseIndex = pos.index + int64 (List.length events) + let projections = Sync.mkProjections baseIndex projectionsEncoded + let batch = Sync.mkBatch stream eventsEncoded projections + let! res = gateway.Sync log stream (expectedVersion,batch) + match res with + | InternalSyncResult.Conflict (token',events') -> return Storage.SyncResult.Conflict (async { return token', respond fold state events' }) + | InternalSyncResult.ConflictUnknown _token' -> return Storage.SyncResult.Conflict (__.LoadFromToken current fold predicate log) + | InternalSyncResult.Written token' -> return Storage.SyncResult.Written (token', state') } module Caching = open System.Runtime.Caching @@ -1011,11 +856,12 @@ module Caching = interface ICategory<'event, 'state> with member __.Load (streamName : string) (log : ILogger) : Async = interceptAsync (inner.Load streamName log) streamName - member __.TrySync (log : ILogger) (Token.Unpack token as streamToken, state) (events : 'event list, state' : 'state) : Async> = async { + member __.TrySync (log : ILogger) (Token.Unpack (stream,_) as streamToken,state) (events : 'event list, state': 'state) + : Async> = async { let! syncRes = inner.TrySync log (streamToken, state) (events,state') match syncRes with - | Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict (interceptAsync resync token.stream.name) - | Storage.SyncResult.Written (token', state') -> return Storage.SyncResult.Written (intercept token.stream.name (token', state')) } + | Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict (interceptAsync resync stream.name) + | Storage.SyncResult.Written (token', state') ->return Storage.SyncResult.Written (intercept stream.name (token', state')) } let applyCacheUpdatesWithSlidingExpiration (cache: Cache) @@ -1027,49 +873,84 @@ module Caching = let addOrUpdateSlidingExpirationCacheEntry streamName = CacheEntry >> cache.UpdateIfNewer policy (prefix + streamName) CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ -type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, ?readCache) = - let loadAlgorithm streamName initial log = - let batched = category.Load fold initial streamName log - let cached token state = category.LoadFromToken fold initial state token log - match readCache with - | None -> batched - | Some (cache : Caching.Cache, prefix : string) -> - match cache.TryGet(prefix + streamName) with - | None -> batched - | Some (token, state) -> cached token state +type private Folder<'event, 'state> + ( category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, + project: ('state -> 'event seq -> 'event seq), predicate : HashSet -> bool, mkCollectionStream : string -> Store.CollectionStream, ?readCache) = interface ICategory<'event, 'state> with - member __.Load streamName (log : ILogger) : Async = - loadAlgorithm streamName initial log - member __.TrySync (log : ILogger) (Token.Unpack token as t, state) (events : 'event list, state': 'state) : Async> = async { - let! syncRes = category.TrySync fold initial log (t, Some token.pos.index, state) (events,state') + member __.Load streamName (log : ILogger): Async = + let collStream = mkCollectionStream streamName + let batched = category.Load collStream fold initial predicate log + let cached tokenAndState = category.LoadFromToken tokenAndState fold predicate log + match readCache with + | None -> batched + | Some (cache : Caching.Cache, prefix : string) -> + match cache.TryGet(prefix + streamName) with + | None -> batched + | Some tokenAndState -> cached tokenAndState + member __.TrySync (log : ILogger) (Token.Unpack (_stream,pos) as streamToken,state) (events : 'event list, state': 'state) + : Async> = async { + let! syncRes = category.Sync (streamToken,state) project (Some pos.index, events, state') fold predicate log match syncRes with - | Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict resync - | Storage.SyncResult.Written (token',state') -> return Storage.SyncResult.Written (token',state') } + | Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict resync + | Storage.SyncResult.Written (token',state') -> return Storage.SyncResult.Written (token',state') } + +/// Defines a process for mapping from a Stream Name to the appropriate storage area, allowing control over segregation / co-locating of data +type EqxCollections(selectDatabaseAndCollection : string -> string*string) = + new (databaseId, collectionId) = EqxCollections(fun _streamName -> databaseId, collectionId) + member __.CollectionForStream streamName = + let databaseId, collectionId = selectDatabaseAndCollection streamName + Store.CollectionStream.Create(Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId), streamName) + +/// Pairs a Gateway, defining the retry policies for CosmosDb with an EqxCollections to +type EqxStore(gateway: EqxGateway, collections: EqxCollections) = + member __.Gateway = gateway + member __.Collections = collections [] type CachingStrategy = + /// Retain a single set of State, together with the associated etags + /// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to + /// track all relevant data in the state, and/or have the `project` function ensure all relevant events get indexed quickly | SlidingWindow of Caching.Cache * window: TimeSpan - /// Prefix is used to distinguish multiple folds per stream - | SlidingWindowPrefixed of Caching.Cache * window: TimeSpan * prefix: string - -type EqxStreamBuilder<'event, 'state>(gateway : EqxGateway, codec, fold, initial, ?access, ?caching) = - member __.Create (databaseId, collectionId, streamName) : Equinox.IStream<'event, 'state> = - let category = Category<'event, 'state>(EqxCollection(gateway, databaseId, collectionId), codec, ?access = access) +[] +type AccessStrategy<'event,'state> = + /// Require a configurable Set of Event Types to have been accumulated from a) projections + b) searching backward in the event stream + /// until `resolved` deems it so; fold foward based on those + /// When saving, `project` the 'state to seed the set of events that `resolved` will see first + | Projections of resolved: (ISet -> bool) * project: ('state -> 'event seq) + /// Simplified version of projection that only has a single Projection Event Type + /// Provides equivalent performance to Projections, just simplified function signatures + | Projection of eventType: string * ('state -> 'event) + /// Simplified version + | AnyKnownEventType of eventTypes: ISet + +type EqxStreamBuilder<'event, 'state>(store : EqxStore, codec, fold, initial, ?access, ?caching) = + member __.Create streamName : Equinox.IStream<'event, 'state> = let readCacheOption = match caching with | None -> None | Some (CachingStrategy.SlidingWindow(cache, _)) -> Some(cache, null) - | Some (CachingStrategy.SlidingWindowPrefixed(cache, _, prefix)) -> Some(cache, prefix) - let folder = Folder<'event, 'state>(category, fold, initial, ?readCache = readCacheOption) + let predicate, project = + match access with + | None -> (fun _ -> false), (fun _ _ -> Seq.empty) + | Some (AccessStrategy.Projections (predicate,project)) -> + predicate, + (fun state _events -> project state) + | Some (AccessStrategy.Projection (et,compact)) -> + (fun (ets: HashSet) -> ets.Contains et), + (fun state _events -> seq [compact state]) + | Some (AccessStrategy.AnyKnownEventType knownEventTypes) -> + (fun (ets: HashSet) -> knownEventTypes.Overlaps ets), + (fun _ events -> Seq.last events |> Seq.singleton) + let category = Category<'event, 'state>(store.Gateway, codec) + let folder = Folder<'event, 'state>(category, fold, initial, project, predicate, store.Collections.CollectionForStream, ?readCache = readCacheOption) let category : ICategory<_,_> = match caching with | None -> folder :> _ | Some (CachingStrategy.SlidingWindow(cache, window)) -> Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder - | Some (CachingStrategy.SlidingWindowPrefixed(cache, window, prefix)) -> - Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder Equinox.Stream.create category streamName @@ -1166,10 +1047,8 @@ type AppendResult<'t> = type EqxContext ( /// Connection to CosmosDb with DocumentDb Transient Read and Write Retry policies conn : EqxConnection, - /// Default Cosmos Database name for this context - databaseId, - /// Default Cosmos Collection name for this context - collectionId, + /// Database + Collection selector + collections: EqxCollections, /// Logger to write to - see https://github.com/serilog/serilog/wiki/Provided-Sinks for how to wire to your logger logger : Serilog.ILogger, /// Optional maximum number of Store.Batch records to retrieve as a set (how many Events are placed therein is controlled by maxEventsPerSlice). @@ -1181,24 +1060,28 @@ type EqxContext let batching = EqxBatchingPolicy(getDefaultMaxItems=getDefaultMaxItems) let gateway = EqxGateway(conn, batching) - member __.CreateStream(streamName,?dbId,?collId) = - let collection = EqxCollection(gateway, defaultArg dbId databaseId, defaultArg collId collectionId) - Stream.Create(collection.CollectionUri, streamName) + let maxCountPredicate count = + let acc = ref (max count 0) + fun _ -> + if !acc = 0 then false else + decr acc + true + let yieldPositionAndData res = async { + let! (Token.Unpack (_,pos')), data = res + return pos', data } + + member __.CreateStream(streamName) = collections.CollectionForStream streamName - member internal __.GetInternal((stream,pos) as streamPos, ?batchSize, ?direction) = async { + member internal __.GetInternal((stream, startPos), ?maxCount, ?direction) = async { let direction = defaultArg direction Direction.Forward - let batching = batchSize |> Option.map (fun max -> EqxBatchingPolicy(defaultMaxItems=max)) - match direction with - | Direction.Backward -> return! gateway.LoadBackward logger batching streamPos - | Direction.Forward -> - let! (Token.Unpack token, data) = gateway.LoadForward logger batching None (stream,pos) - return token.pos,data } + let predicate = match maxCount with Some limit -> maxCountPredicate limit | None -> fun _ -> false + return! gateway.Read logger None stream direction startPos predicate } /// Establishes the current position of the stream in as effficient a manner as possible /// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency state checks) member __.Sync(stream, ?position: Position) : Async = async { //let indexed predicate = load fold initial (coll.Gateway.IndexedOrBatched log predicate (stream,None)) - let! (Token.Unpack { pos = pos' }) = gateway.GetPosition(logger, stream, ?pos=position) + let! (Token.Unpack (_,pos')) = gateway.GetPosition(logger, stream, ?pos=position) return pos' } /// Reads in batches of `batchSize` from the specified `Position`, allowing the reader to efficiently walk away from a running query @@ -1209,19 +1092,18 @@ type EqxContext return AsyncSeq.ofSeq data } /// Reads all Events from a `Position` in a given `direction` - member __.Read(stream, ?position, ?batchSize, ?direction) : Async = - __.GetInternal((stream, position), ?batchSize = batchSize, ?direction=direction) + member __.Read(stream, ?position, ?maxCount, ?direction) : Async = + __.GetInternal((stream, position), ?maxCount=maxCount, ?direction=direction) |> yieldPositionAndData /// Appends the supplied batch of events, subject to a consistency check based on the `position` /// Callers should implement appropriate idempotent handling, or use Equinox.Handler for that purpose member __.Sync(stream, position, events: IEvent[]) : Async> = async { - let token = Token.ofNonCompacting (stream,position) let batch = Sync.mkBatch stream events Seq.empty - let! res = gateway.TrySync logger token None (Some position.index,batch) + let! res = gateway.Sync logger stream (Some position.index,batch) match res with - | InternalSyncResult.Written (Token.Unpack token) -> return AppendResult.Ok token.pos - | InternalSyncResult.Conflict (Token.Unpack token,events) -> return AppendResult.Conflict (token.pos, events) - | InternalSyncResult.ConflictUnknown (Token.Unpack token) -> return AppendResult.ConflictUnknown token.pos } + | Builder.Internal.InternalSyncResult.Written (Token.Unpack (_,pos)) -> return AppendResult.Ok pos + | Builder.Internal.InternalSyncResult.Conflict (Token.Unpack (_,pos),events) -> return AppendResult.Conflict (pos, events) + | Builder.Internal.InternalSyncResult.ConflictUnknown (Token.Unpack (_,pos)) -> return AppendResult.ConflictUnknown pos } /// Low level, non-idempotent call appending events to a stream without a concurrency control mechanism in play /// NB Should be used sparingly; Equinox.Handler enables building equivalent equivalent idempotent handling with minimal code. @@ -1266,8 +1148,8 @@ module Events = /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is larger than the largest /// sequence number in the stream. - let get (ctx: EqxContext) (streamName: string) (MinPosition index: int64) (batchSize: int): Async = - ctx.Read(ctx.CreateStream streamName, ?position=index, batchSize=batchSize) |> dropPosition + let get (ctx: EqxContext) (streamName: string) (MinPosition index: int64) (maxCount: int): Async = + ctx.Read(ctx.CreateStream streamName, ?position=index, maxCount=maxCount) |> dropPosition /// Appends a batch of events to a stream at the specified expected sequence number. /// If the specified expected sequence number does not match the stream, the events are not appended @@ -1286,15 +1168,15 @@ module Events = /// reading in batches of the specified size. /// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest /// sequence number in the stream. - let getAllBackwards (ctx: EqxContext) (streamName: string) (MaxPosition index: int64) (batchSize: int): AsyncSeq = - ctx.Walk(ctx.CreateStream streamName, batchSize, ?position=index, direction=Direction.Backward) + let getAllBackwards (ctx: EqxContext) (streamName: string) (MaxPosition index: int64) (maxCount: int): AsyncSeq = + ctx.Walk(ctx.CreateStream streamName, maxCount, ?position=index, direction=Direction.Backward) /// Returns an async array of events in the stream backwards starting from the specified sequence number, /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest /// sequence number in the stream. - let getBackwards (ctx: EqxContext) (streamName: string) (MaxPosition index: int64) (batchSize: int): Async = - ctx.Read(ctx.CreateStream streamName, ?position=index, batchSize=batchSize, direction=Direction.Backward) |> dropPosition + let getBackwards (ctx: EqxContext) (streamName: string) (MaxPosition index: int64) (maxCount: int): Async = + ctx.Read(ctx.CreateStream streamName, ?position=index, maxCount=maxCount, direction=Direction.Backward) |> dropPosition /// Obtains the `index` from the current write Position let getNextIndex (ctx: EqxContext) (streamName: string) : Async = diff --git a/src/Equinox/Equinox.fs b/src/Equinox/Equinox.fs index 76179b94d..b781dff8b 100644 --- a/src/Equinox/Equinox.fs +++ b/src/Equinox/Equinox.fs @@ -66,7 +66,7 @@ type ICategory<'event, 'state> = /// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store) abstract TrySync : log: ILogger -> token: Storage.StreamToken * originState: 'state - -> events: 'event list * state: 'state + -> events: 'event list * state': 'state -> Async> // Exception yielded by Handler.Decide after `count` attempts have yielded conflicts at the point of syncing with the Store diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index aac6a56cc..3e4b9c3fa 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -34,20 +34,19 @@ type Tests(testOutputHelper) = incr testIterations sprintf "events-%O-%i" name !testIterations let (|TestDbCollStream|) (TestStream streamName) = - let (StoreCollection (dbId,collId,streamName)) = streamName - dbId,collId,streamName - let mkContextWithSliceLimit conn dbId collId maxEventsPerSlice = - EqxContext(conn,dbId,collId,log,defaultMaxItems=defaultBatchSize) - let mkContext conn dbId collId = mkContextWithSliceLimit conn dbId collId None + streamName + let mkContextWithItemLimit conn defaultBatchSize = + EqxContext(conn,collections,log,?defaultMaxItems=defaultBatchSize) + let mkContext conn = mkContextWithItemLimit conn None let verifyRequestChargesMax rus = let tripRequestCharges = [ for e, c in capture.RequestCharges -> sprintf "%A" e, c ] test <@ float rus >= Seq.sum (Seq.map snd tripRequestCharges) @> [] - let append (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { + let append (TestDbCollStream (streamName)) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContext conn dbId collId + let ctx = mkContext conn let event = EventData.Create("test_event") let index = 0L @@ -100,9 +99,9 @@ type Tests(testOutputHelper) = let verifyCorrectEvents = verifyCorrectEventsEx Direction.Forward [] - let ``appendAtEnd and getNextIndex`` (extras, TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { + let ``appendAtEnd and getNextIndex`` (extras, TestDbCollStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContextWithSliceLimit conn dbId collId (Some 1) + let ctx = mkContextWithItemLimit conn (Some 1) // If a fail triggers a rerun, we need to dump the previous log entries captured capture.Clear() @@ -166,9 +165,9 @@ type Tests(testOutputHelper) = } [] - let ``append - fails on non-matching`` (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { + let ``append - fails on non-matching`` (TestDbCollStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContext conn dbId collId + let ctx = mkContext conn // Attempt to write, skipping Index 0 let event = EventData.Create("test_event") @@ -199,9 +198,9 @@ type Tests(testOutputHelper) = } [] - let get (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { + let get (TestDbCollStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContext conn dbId collId + let ctx = mkContext conn // We're going to ignore the first, to prove we can let! expected = add6EventsIn2Batches ctx streamName @@ -216,9 +215,9 @@ type Tests(testOutputHelper) = } [] - let getBackwards (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { + let getBackwards (TestDbCollStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContext conn dbId collId + let ctx = mkContext conn let! expected = add6EventsIn2Batches ctx streamName @@ -238,9 +237,9 @@ type Tests(testOutputHelper) = // TODO 2 batches test [] - let ``get (in 2 batches)`` (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { + let ``get (in 2 batches)`` (TestDbCollStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContextWithSliceLimit conn dbId collId (Some 1) + let ctx = mkContextWithItemLimit conn (Some 1) let! expected = add6EventsIn2Batches ctx streamName let expected = Array.skip 1 expected @@ -255,9 +254,9 @@ type Tests(testOutputHelper) = } [] - let getAll (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { + let getAll (TestDbCollStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContext conn dbId collId + let ctx = mkContext conn let! expected = add6EventsIn2Batches ctx streamName diff --git a/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs b/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs index 4a455308f..467c1bf57 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs @@ -25,10 +25,13 @@ let connectToSpecifiedCosmosOrSimulator (log: Serilog.ILogger) = Discovery.FromConnectionString connectionString |> connectToCosmos log "EQUINOX_COSMOS_CONNECTION" -let (|StoreCollection|) streamName = - let databaseId = read "EQUINOX_COSMOS_DATABASE" |> Option.defaultValue "equinox-test" - let collectionId = read "EQUINOX_COSMOS_COLLECTION" |> Option.defaultValue "equinox-test" - databaseId, collectionId, streamName - let defaultBatchSize = 500 -let createEqxGateway connection batchSize = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=batchSize)) \ No newline at end of file + +let collections = + EqxCollections( + read "EQUINOX_COSMOS_DATABASE" |> Option.defaultValue "equinox-test", + read "EQUINOX_COSMOS_COLLECTION" |> Option.defaultValue "equinox-test") + +let createEqxStore connection batchSize = + let gateway = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=batchSize)) + EqxStore(gateway, collections) \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 3eba5b936..987e9c7f7 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -1,6 +1,5 @@ module Equinox.Cosmos.Integration.CosmosIntegration -open Equinox open Equinox.Cosmos.Integration.Infrastructure open Equinox.Cosmos.Builder open Swensen.Unquote @@ -12,47 +11,36 @@ let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings) module Cart = - let fold, initial, compact, index = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.compact, Domain.Cart.Folds.index + let fold, initial, project = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.compact let codec = genCodec() let createServiceWithoutOptimization connection batchSize log = - let gateway = createEqxGateway connection batchSize - let resolveStream (StoreCollection args) = - EqxStreamBuilder(gateway, codec, fold, initial).Create(args) + let store = createEqxStore connection batchSize + let resolveStream = EqxStreamBuilder(store, codec, fold, initial).Create Backend.Cart.Service(log, resolveStream) let createServiceWithCompaction connection batchSize log = - let gateway = createEqxGateway connection batchSize - let resolveStream (StoreCollection args) = - EqxStreamBuilder(gateway, codec, fold, initial, Cosmos.AccessStrategy.RollingSnapshots compact).Create(args) + let store = createEqxStore connection batchSize + let resolveStream = EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection project).Create Backend.Cart.Service(log, resolveStream) let createServiceWithCaching connection batchSize log cache = - let gateway = createEqxGateway connection batchSize + let store = createEqxStore connection batchSize let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, caching = sliding20m).Create(args) - Backend.Cart.Service(log, resolveStream) - let createServiceIndexed connection batchSize log = - let gateway = createEqxGateway connection batchSize - let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, Cosmos.AccessStrategy.IndexedSearch index).Create(args) - Backend.Cart.Service(log, resolveStream) - let createServiceWithCachingIndexed connection batchSize log cache = - let gateway = createEqxGateway connection batchSize - let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, Cosmos.AccessStrategy.IndexedSearch index, caching=sliding20m).Create(args) + let resolveStream = EqxStreamBuilder(store, codec, fold, initial, caching = sliding20m).Create Backend.Cart.Service(log, resolveStream) let createServiceWithCompactionAndCaching connection batchSize log cache = - let gateway = createEqxGateway connection batchSize + let store = createEqxStore connection batchSize let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, Cosmos.AccessStrategy.RollingSnapshots compact, sliding20m).Create(args) + let resolveStream = EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection project, sliding20m).Create Backend.Cart.Service(log, resolveStream) module ContactPreferences = - let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial + let fold, initial, eventTypes = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial, Domain.ContactPreferences.Events.eventTypeNames let codec = genCodec() let createServiceWithoutOptimization createGateway defaultBatchSize log _ignoreWindowSize _ignoreCompactionPredicate = let gateway = createGateway defaultBatchSize - let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial).Create(args) + let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial).Create Backend.ContactPreferences.Service(log, resolveStream) let createService createGateway log = - let resolveStream (StoreCollection args) = EqxStreamBuilder(createGateway 1, codec, fold, initial, Cosmos.AccessStrategy.EventsAreState).Create(args) + let resolveStream = EqxStreamBuilder(createGateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType eventTypes).Create Backend.ContactPreferences.Service(log, resolveStream) #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -222,7 +210,7 @@ type Tests(testOutputHelper) = [] let ``Can correctly read and update against Cosmos with EventsAreState Access Strategy`` value = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let service = ContactPreferences.createService (createEqxGateway conn) log + let service = ContactPreferences.createService (createEqxStore conn) log let email = let g = System.Guid.NewGuid() in g.ToString "N" //let (Domain.ContactPreferences.Id email) = id () @@ -277,7 +265,7 @@ type Tests(testOutputHelper) = let! conn = connectToSpecifiedCosmosOrSimulator log let batchSize = 10 let cache = Caching.Cache("cart", sizeMb = 50) - let createServiceCached () = Cart.createServiceWithCachingIndexed conn batchSize log cache + let createServiceCached () = Cart.createServiceWithCompactionAndCaching conn batchSize log cache let service1, service2 = createServiceCached (), createServiceCached () // Trigger 10 events, then reload @@ -305,7 +293,7 @@ type Tests(testOutputHelper) = let ``Can roundtrip against Cosmos, correctly using the index to avoid redundant reads`` context skuId cartId = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log let batchSize = 10 - let createServiceIndexed () = Cart.createServiceIndexed conn batchSize log + let createServiceIndexed () = Cart.createServiceWithCompaction conn batchSize log let service1, service2 = createServiceIndexed (), createServiceIndexed () // Trigger 10 events, then reload @@ -325,45 +313,3 @@ type Tests(testOutputHelper) = let! _ = service2.Read cartId test <@ [EqxAct.Indexed] = capture.ExternalCalls @> } - - [] - let ``Can combine compaction with caching against Cosmos`` context skuId = Async.RunSynchronously <| async { - let cartId = Domain.Infrastructure.CartId(System.Guid.NewGuid()) - let! conn = connectToSpecifiedCosmosOrSimulator log - let batchSize = 10 - let service1 = Cart.createServiceWithCompaction conn batchSize log - let cache = Caching.Cache("cart", sizeMb = 50) - let service2 = Cart.createServiceWithCompactionAndCaching conn batchSize log cache - - // Trigger 10 events, then reload - do! addAndThenRemoveItemsManyTimes context cartId skuId service1 5 - let! _ = service2.Read cartId - - // ... should see a single read as we are inside the batch threshold - test <@ batchBackwardsAndAppend @ singleBatchBackwards = capture.ExternalCalls @> - - // Add two more, which should push it over the threshold and hence trigger inclusion of a snapshot event (but not incurr extra roundtrips) - capture.Clear() - do! addAndThenRemoveItemsManyTimes context cartId skuId service1 1 - test <@ batchBackwardsAndAppend = capture.ExternalCalls @> - - // While we now have 13 events, we whould be able to read them backwards with a single call - capture.Clear() - let! _ = service1.Read cartId - test <@ singleBatchBackwards = capture.ExternalCalls @> - - // Add 8 more; total of 21 should not trigger snapshotting as Event Number 12 (the 13th one) is a shapshot - capture.Clear() - do! addAndThenRemoveItemsManyTimes context cartId skuId service1 4 - test <@ batchBackwardsAndAppend = capture.ExternalCalls @> - - // While we now have 21 events, we should be able to read them with a single call - capture.Clear() - let! _ = service1.Read cartId - // ... and trigger a second snapshotting (inducing a single additional read + write) - do! addAndThenRemoveItemsManyTimes context cartId skuId service1 1 - // and we _could_ reload the 24 events with a single read if reading backwards. However we are using the cache, which last saw it with 10 events, which necessitates two reads - let! _ = service2.Read cartId - let suboptimalExtraSlice = [singleSliceForward] - test <@ singleBatchBackwards @ batchBackwardsAndAppend @ suboptimalExtraSlice @ singleBatchForward = capture.ExternalCalls @> - } \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosTokenTests.fs b/tests/Equinox.Cosmos.Integration/CosmosTokenTests.fs deleted file mode 100644 index 257397f6f..000000000 --- a/tests/Equinox.Cosmos.Integration/CosmosTokenTests.fs +++ /dev/null @@ -1,80 +0,0 @@ -module Equinox.Cosmos.Tests.Tokens - -open Equinox -open Equinox.Cosmos -open FsCheck.Xunit -open Swensen.Unquote.Assertions -open Xunit - -let unpack (Token.Unpack token : Storage.StreamToken) = - token.pos.index, token.rollingSnapshotEventIndex, token.batchCapacityLimit - -let (|TokenFromIndex|) index : Token = - { stream = { collectionUri = null; name = null } - pos = { index = index; etag = None } - rollingSnapshotEventIndex = Some 0L - batchCapacityLimit = None } -let (|StreamPos|) (TokenFromIndex token) = token.stream, token.pos -[] -let ``ofUncompactedVersion - batchCapacityLimit`` (StreamPos streamPos) batchSize expectedCapacity = - let _, _, batchCapacityLimit = Token.ofUncompactedVersion batchSize streamPos |> unpack - test <@ Some expectedCapacity = batchCapacityLimit @> - -[] -let ``ofPreviousTokenAndEventsLength - batchCapacityLimit`` (previousCompactionEventNumber : System.Nullable) (StreamPos pos) eventsLength batchSize expectedCapacity = - let previousToken = - if not previousCompactionEventNumber.HasValue then Token.ofRollingSnapshotEventIndex None 0 -84 ((|StreamPos|) -42L) - else Token.ofRollingSnapshotEventIndex (Some previousCompactionEventNumber.Value) 0 -84 ((|StreamPos|) -42L) - let _, _, batchCapacityLimit = unpack <| Token.ofPreviousTokenAndEventsLength previousToken eventsLength batchSize pos - test <@ Some expectedCapacity = batchCapacityLimit @> - -[] -let ``Properties of tokens based on various generation mechanisms `` (StreamPos (stream,pos)) (previousCompactionEventNumber : int64 option) eventsLength batchSize = - let ovStreamVersion, ovCompactionEventNumber, ovBatchCapacityLimit = - unpack <| Token.ofNonCompacting (stream,pos) - let uvStreamVersion, uvCompactionEventNumber, uvBatchCapacityLimit = - unpack <| Token.ofUncompactedVersion batchSize (stream,pos) - let (StreamPos sp0, StreamPos sp) = 0L, pos.index - let previousToken = Token.ofRollingSnapshotEventIndex previousCompactionEventNumber 0 -84 sp0 - let peStreamVersion, peCompactionEventNumber, peBatchCapacityLimit = - unpack <| Token.ofPreviousTokenAndEventsLength previousToken eventsLength batchSize sp - - // StreamVersion - test <@ pos.index = ovStreamVersion @> - test <@ pos.index = uvStreamVersion @> - test <@ pos.index = peStreamVersion @> - - // CompactionEventNumber - test <@ None = ovCompactionEventNumber @> - test <@ None = uvCompactionEventNumber @> - test <@ previousCompactionEventNumber = peCompactionEventNumber @> - - // BatchCapacityLimit - // TODO REWRITE THIS - //test <@ None = ovBatchCapacityLimit @> - //let rawUncompactedBatchCapacityLimit batchSize (pos : Store.Position) = batchSize - (int pos.index - 1) - 2 - //let rawCompactedBatchCapacityLimit compactionEventNumber batchSize (pos : Store.Position) = batchSize - int (pos.index - 1L - compactionEventNumber - 1L) - //test <@ Some (rawUncompactedBatchCapacityLimit batchSize pos |> max 0) = uvBatchCapacityLimit @> - //let rawExpectedFromPreviousCompactionEventNumber = - // match previousCompactionEventNumber with - // | None -> rawUncompactedBatchCapacityLimit batchSize pos - // | Some pci -> rawCompactedBatchCapacityLimit pci batchSize pos - //test <@ Some (max 0 (rawExpectedFromPreviousCompactionEventNumber - eventsLength)) = peBatchCapacityLimit @> \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj index df3ea5b58..796526db0 100644 --- a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj +++ b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj @@ -9,7 +9,6 @@ -