diff --git a/cli/Equinox.Cli/Equinox.Cli.fsproj b/cli/Equinox.Cli/Equinox.Cli.fsproj index 96fc171b3..bdf6c1223 100644 --- a/cli/Equinox.Cli/Equinox.Cli.fsproj +++ b/cli/Equinox.Cli/Equinox.Cli.fsproj @@ -8,6 +8,9 @@ false true true + + + $(TargetsForTfmSpecificBuildOutput);CopyProjectReferencesToPackage @@ -21,11 +24,11 @@ + - @@ -44,14 +47,9 @@ - - $(TargetsForTfmSpecificBuildOutput);IncludeNonPackagedDlls - - + - - - + diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index 68e7cae71..131b18fc5 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -92,7 +92,7 @@ module Test = clients.[clientIndex % clients.Length] let selectClient = async { return async { return selectClient() } } Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest - let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact + let fold, initial, snapshot = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.snapshot let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect() let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings) let codec = genCodec() @@ -102,12 +102,12 @@ module Test = let c = Caching.Cache("Cli", sizeMb = 50) CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some else None - let resolveStream streamName = + let resolveStream = match store with | Store.Mem store -> - Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName) + Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create | Store.Es gateway -> - GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create(streamName) + GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot, ?caching = esCache).Create Backend.Favorites.Service(log, resolveStream) let runFavoriteTest (service : Backend.Favorites.Service) clientId = async { let sku = Guid.NewGuid() |> SkuId diff --git a/samples/Store/Domain/Cart.fs b/samples/Store/Domain/Cart.fs index 0509fd96c..5a82ff355 100644 --- a/samples/Store/Domain/Cart.fs +++ b/samples/Store/Domain/Cart.fs @@ -11,13 +11,11 @@ module Events = type ItemWaiveReturnsInfo = { context: ContextInfo; skuId: SkuId; waived: bool } module Compaction = - let [] EventType = "compact/1" type StateItemInfo = { skuId: SkuId; quantity: int; returnsWaived: bool } type State = { items: StateItemInfo[] } type Event = - | [] - Compacted of Compaction.State + | Compacted of Compaction.State | ItemAdded of ItemAddInfo | ItemRemoved of ItemRemoveInfo | ItemQuantityChanged of ItemQuantityChangeInfo @@ -42,8 +40,8 @@ module Folds = | Events.ItemQuantityChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with quantity = e.quantity } | i -> i)) | Events.ItemWaiveReturnsChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with returnsWaived = e.waived } | i -> i)) let fold state = Seq.fold evolve state - let compact = Events.Compaction.EventType, fun state -> Events.Compacted (State.toSnapshot state) - + let isOrigin = function Events.Compacted _ -> true | _ -> false + let snapshot = isOrigin, State.toSnapshot >> Events.Compacted type Context = { time: System.DateTime; requestId : RequestId } type Command = | AddItem of Context * SkuId * quantity: int diff --git a/samples/Store/Domain/Favorites.fs b/samples/Store/Domain/Favorites.fs index 819f016c4..eecef2b9c 100644 --- a/samples/Store/Domain/Favorites.fs +++ b/samples/Store/Domain/Favorites.fs @@ -5,12 +5,10 @@ module Events = type Favorited = { date: System.DateTimeOffset; skuId: SkuId } type Unfavorited = { skuId: SkuId } module Compaction = - let [] EventType = "compacted/1" type Compacted = { net: Favorited[] } type Event = - | [] - Compacted of Compaction.Compacted + | Compacted of Compaction.Compacted | Favorited of Favorited | Unfavorited of Unfavorited interface TypeShape.UnionContract.IUnionContract @@ -37,7 +35,8 @@ module Folds = let s = InternalState state for e in events do evolve s e s.AsState() - let compact = Events.Compaction.EventType, fun state -> Events.Compacted { net = state } + let isOrigin = function Events.Compacted _ -> true | _ -> false + let snapshot = isOrigin, fun state -> Events.Compacted { net = state } type Command = | Favorite of date : System.DateTimeOffset * skuIds : SkuId list diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index 676b8db31..e25dbdad6 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -6,7 +6,7 @@ open Swensen.Unquote #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) -let fold, initial, compact = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.compact +let fold, initial, snapshot = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.snapshot let createMemoryStore () = new VolatileStore () @@ -15,10 +15,10 @@ let createServiceMem log store = let codec = Equinox.EventStore.Integration.EventStoreIntegration.genCodec() -let resolveGesStreamWithCompactionEventType gateway streamName = - GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create(streamName) -let resolveGesStreamWithoutCompactionSemantics gateway streamName = - GesStreamBuilder(gateway, codec, fold, initial).Create(streamName) +let resolveGesStreamWithRollingSnapshots gateway = + GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Create +let resolveGesStreamWithoutCustomAccessStrategy gateway = + GesStreamBuilder(gateway, codec, fold, initial).Create let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count = service.FlowAsync(cartId, fun _ctx execute -> @@ -53,12 +53,12 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async { - let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCompactionSemantics + let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy do! act service args } [] - let ``Can roundtrip against EventStore, correctly folding the events with compaction`` args = Async.RunSynchronously <| async { - let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithCompactionEventType + let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async { + let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots do! act service args } diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index 5bc490c60..879952242 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -14,10 +14,10 @@ let createServiceMem log store = Backend.ContactPreferences.Service(log, MemoryStreamBuilder(store, fold, initial).Create) let codec = genCodec() -let resolveStreamGesWithCompactionSemantics gateway streamName = - GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(streamName) -let resolveStreamGesWithoutCompactionSemantics gateway streamName = - GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(streamName) +let resolveStreamGesWithOptimizedStorageSemantics gateway = + GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create +let resolveStreamGesWithoutAccessStrategy gateway = + GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper @@ -44,12 +44,12 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip against EventStore, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async { - let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithoutCompactionSemantics + let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithoutAccessStrategy do! act service args } [] let ``Can roundtrip against EventStore, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async { - let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithCompactionSemantics + let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithOptimizedStorageSemantics do! act service args } \ No newline at end of file diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index 84624bb6a..1100b1e01 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -6,7 +6,7 @@ open Swensen.Unquote #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) -let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact +let fold, initial, snapshot = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.snapshot let createMemoryStore () = new VolatileStore() @@ -15,7 +15,8 @@ let createServiceMem log store = let codec = genCodec() let createServiceGes gateway log = - Backend.Favorites.Service(log, GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create) + let resolveStream = GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Create + Backend.Favorites.Service(log, resolveStream) type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs index 65388a364..a980fcfce 100644 --- a/samples/Store/Integration/LogIntegration.fs +++ b/samples/Store/Integration/LogIntegration.fs @@ -1,7 +1,10 @@ module Samples.Store.Integration.LogIntegration +open Domain open Equinox.Store open Swensen.Unquote +open System +open System.Collections.Concurrent module EquinoxEsInterop = open Equinox.EventStore @@ -58,31 +61,31 @@ type SerilogMetricsExtractor(emit : string -> unit) = let createLoggerWithMetricsExtraction emit = let capture = SerilogMetricsExtractor emit - createLogger capture, capture + createLogger capture #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) type Tests() = - let act buffer capture (service : Backend.Cart.Service) itemCount context cartId skuId resultTag = async { + let act buffer (service : Backend.Cart.Service) itemCount context cartId skuId resultTag = async { do! CartIntegration.addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service itemCount let! state = service.Read cartId test <@ itemCount = match state with { items = [{ quantity = quantity }] } -> quantity | _ -> failwith "nope" @> - - // Even though we've gone over a page, we only need a single read to read the state (plus the one from the execute) - let contains (s : string) (x : string) = x.IndexOf s <> -1 - test <@ let reads = buffer |> Seq.filter (fun s -> s |> contains resultTag) - 2 = Seq.length reads - && not (obj.ReferenceEquals(capture, null)) @> } + // Because we're using Access Strategies that enable us to read our state in a single roundtrip... + // (even though we've gone over a page), we only need a single read to read the state (plus the one from the execute) + let contains (s : string) (x : string) = x.Contains s + test <@ let reads = buffer |> Seq.filter (contains resultTag) + 2 = Seq.length reads @> } // Protip: Debug this test to view standard metrics rendering [] - let ``Can roundtrip against EventStore, hooking, extracting and substituting metrics in the logging information`` context cartId skuId = Async.RunSynchronously <| async { - let buffer = ResizeArray() + let ``Can roundtrip against EventStore, hooking, extracting and substituting metrics in the logging information`` context skuId = Async.RunSynchronously <| async { let batchSize = defaultBatchSize - let (log,capture) = createLoggerWithMetricsExtraction buffer.Add + let buffer = ConcurrentQueue() + let log = createLoggerWithMetricsExtraction buffer.Enqueue let! conn = connectToLocalEventStoreNode log let gateway = createGesGateway conn batchSize - let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithCompactionEventType gateway) - let itemCount, cartId = batchSize / 2 + 1, cartId () - do! act buffer capture service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration" + let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithRollingSnapshots gateway) + let itemCount = batchSize / 2 + 1 + let cartId = Guid.NewGuid() |> CartId + do! act buffer service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration" } diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index eed1d311d..9b9a8ed88 100644 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -163,26 +163,28 @@ module private Read = let partitionPayloadFrom firstUsedEventNumber : ResolvedEvent[] -> int * int = let acc (tu,tr) ((ResolvedEventLen bytes) as y) = if y.Event.EventNumber < firstUsedEventNumber then tu, tr + bytes else tu + bytes, tr Array.fold acc (0,0) - let loadBackwardsUntilCompactionOrStart (log : ILogger) retryPolicy conn batchSize maxPermittedBatchReads streamName isCompactionEvent - : Async = async { + let loadBackwardsUntilCompactionOrStart (log : ILogger) retryPolicy conn batchSize maxPermittedBatchReads streamName (tryDecode,isOrigin) + : Async = async { let mergeFromCompactionPointOrStartFromBackwardsStream (log : ILogger) (batchesBackward : AsyncSeq) - : Async = async { + : Async = async { let versionFromStream, lastBatch = ref None, ref None let! tempBackward = batchesBackward - |> AsyncSeq.map (function + |> AsyncSeq.map (fun batch -> + match batch with | None, events -> lastBatch := Some events; events - | (Some _) as reportedVersion, events -> versionFromStream := reportedVersion; lastBatch := Some events; events) + | (Some _) as reportedVersion, events -> versionFromStream := reportedVersion; lastBatch := Some events; events + |> Array.map (fun e -> e, tryDecode e)) |> AsyncSeq.concatSeq - |> AsyncSeq.takeWhileInclusive (fun x -> - if not (isCompactionEvent x) then true // continue the search - else + |> AsyncSeq.takeWhileInclusive (function + | x, Some e when isOrigin e -> match !lastBatch with | None -> log.Information("GesStop stream={stream} at={eventNumber}", streamName, x.Event.EventNumber) | Some batch -> let used, residual = batch |> partitionPayloadFrom x.Event.EventNumber log.Information("GesStop stream={stream} at={eventNumber} used={used} residual={residual}", streamName, x.Event.EventNumber, used, residual) - false) + false + | _ -> true) // continue the search |> AsyncSeq.toArrayAsync let eventsForward = Array.Reverse(tempBackward); tempBackward // sic - relatively cheap, in-place reverse of something we own let version = match !versionFromStream with Some version -> version | None -> invalidOp "no version encountered in event batch stream" @@ -195,11 +197,11 @@ module private Read = let readlog = log |> Log.prop "direction" direction let batchesBackward : AsyncSeq = readBatches readlog retryingLoggingReadSlice maxPermittedBatchReads startPosition let! t, (version, events) = mergeFromCompactionPointOrStartFromBackwardsStream log batchesBackward |> Stopwatch.Time - log |> logBatchRead direction streamName t events batchSize version + log |> logBatchRead direction streamName t (Array.map fst events) batchSize version return version, events } module UnionEncoderAdapters = - let private encodedEventOfResolvedEvent (x : ResolvedEvent) : UnionCodec.EncodedUnion = + let encodedEventOfResolvedEvent (x : ResolvedEvent) : UnionCodec.EncodedUnion = { caseName = x.Event.EventType; payload = x.Event.Data } let private eventDataOfEncodedEvent (x : UnionCodec.EncodedUnion) = EventData(Guid.NewGuid(), x.caseName, (*isJson*) true, x.payload, [||]) @@ -262,35 +264,34 @@ type GesBatchingPolicy(getMaxBatchSize : unit -> int, ?batchCountLimit) = type GatewaySyncResult = Written of Storage.StreamToken | Conflict type GesGateway(conn : GesConnection, batching : GesBatchingPolicy) = - let isResolvedEventEventType predicate (x:ResolvedEvent) = predicate x.Event.EventType + let isResolvedEventEventType (tryDecode,predicate) (x:ResolvedEvent) = predicate (tryDecode (x.Event.Data)) let tryIsResolvedEventEventType predicateOption = predicateOption |> Option.map isResolvedEventEventType - member __.LoadBatched streamName log isCompactionEventType: Async = async { + member __.LoadBatched streamName log (tryDecode,isCompactionEventType): Async = async { let! version, events = Read.loadForwardsFrom log conn.ReadRetryPolicy conn.ReadConnection batching.BatchSize batching.MaxBatches streamName 0L match tryIsResolvedEventEventType isCompactionEventType with - | None -> return Token.ofNonCompacting streamName version, events + | None -> return Token.ofNonCompacting streamName version, Array.choose tryDecode events | Some isCompactionEvent -> match events |> Array.tryFindBack isCompactionEvent with - | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, events } - member __.LoadBackwardsStoppingAtCompactionEvent streamName log isCompactionEventType: Async = async { - let isCompactionEvent = isResolvedEventEventType isCompactionEventType + | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, Array.choose tryDecode events + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose tryDecode events } + member __.LoadBackwardsStoppingAtCompactionEvent streamName log (tryDecode,isOrigin): Async = async { let! version, events = - Read.loadBackwardsUntilCompactionOrStart log conn.ReadRetryPolicy conn.ReadConnection batching.BatchSize batching.MaxBatches streamName isCompactionEvent - match Array.tryHead events |> Option.filter isCompactionEvent with - | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, events } - member __.LoadFromToken useWriteConn streamName log (Token.Unpack token as streamToken) isCompactionEventType - : Async = async { + Read.loadBackwardsUntilCompactionOrStart log conn.ReadRetryPolicy conn.ReadConnection batching.BatchSize batching.MaxBatches streamName (tryDecode,isOrigin) + match Array.tryHead events |> Option.filter (function _, Some e -> isOrigin e | _ -> false) with + | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, Array.choose snd events + | Some (resolvedEvent,_) -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose snd events } + member __.LoadFromToken useWriteConn streamName log (Token.Unpack token as streamToken) (tryDecode,isCompactionEventType) + : Async = async { let streamPosition = token.pos.streamVersion + 1L let connToUse = if useWriteConn then conn.WriteConnection else conn.ReadConnection let! version, events = Read.loadForwardsFrom log conn.ReadRetryPolicy connToUse batching.BatchSize batching.MaxBatches streamName streamPosition - match tryIsResolvedEventEventType isCompactionEventType with - | None -> return Token.ofNonCompacting streamName version, events + match isCompactionEventType with + | None -> return Token.ofNonCompacting streamName version, Array.choose tryDecode events | Some isCompactionEvent -> - match events |> Array.tryFindBack isCompactionEvent with - | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize version, events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, events } - member __.TrySync log (Token.Unpack token as streamToken) (encodedEvents: EventData array) isCompactionEventType : Async = async { + match events |> Array.tryFindBack (fun re -> match tryDecode re with Some e -> isCompactionEvent e | _ -> false) with + | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize version, Array.choose tryDecode events + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose tryDecode events } + member __.TrySync log (Token.Unpack token as streamToken) (events, encodedEvents: EventData array) (isCompactionEventType) : Async = async { let streamVersion = token.pos.streamVersion let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection token.stream.name streamVersion encodedEvents match wr with @@ -302,8 +303,7 @@ type GesGateway(conn : GesConnection, batching : GesBatchingPolicy) = match isCompactionEventType with | None -> Token.ofNonCompacting token.stream.name version' | Some isCompactionEvent -> - let isEventDataEventType predicate (x:EventData) = predicate x.Type - match encodedEvents |> Array.tryFindIndexBack (isEventDataEventType isCompactionEvent) with + match events |> Array.ofList |> Array.tryFindIndexBack isCompactionEvent with | None -> Token.ofPreviousTokenAndEventsLength streamToken encodedEvents.Length batching.BatchSize version' | Some compactionEventIndex -> Token.ofPreviousStreamVersionAndCompactionEventDataIndex streamToken compactionEventIndex encodedEvents.Length batching.BatchSize version' @@ -312,47 +312,54 @@ type GesGateway(conn : GesConnection, batching : GesBatchingPolicy) = [] type AccessStrategy<'event,'state> = | EventsAreState - | RollingSnapshots of eventType: string * compact: ('state -> 'event) + | RollingSnapshots of isValid: ('event -> bool) * compact: ('state -> 'event) type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) = /// Determines whether writing a Compaction event is warranted (based on the existing state and the current `Accumulated` changes) member __.IsCompactionDue = eventsLen > capacityBeforeCompaction type private Category<'event, 'state>(gateway : GesGateway, codec : UnionCodec.IUnionEncoder<'event,byte[]>, ?access : AccessStrategy<'event,'state>) = + let tryDecode (e: ResolvedEvent) = e |> UnionEncoderAdapters.encodedEventOfResolvedEvent |> codec.TryDecode let compactionPredicate = match access with | None -> None | Some AccessStrategy.EventsAreState -> Some (fun _ -> true) - | Some (AccessStrategy.RollingSnapshots (et,_)) -> Some ((=) et) + | Some (AccessStrategy.RollingSnapshots (isValid,_)) -> Some isValid + let isOrigin = + match access with + | None | Some AccessStrategy.EventsAreState -> fun _ -> true + | Some (AccessStrategy.RollingSnapshots (isValid,_)) -> isValid let loadAlgorithm load streamName initial log = - let batched = load initial (gateway.LoadBatched streamName log None) - let compacted predicate = load initial (gateway.LoadBackwardsStoppingAtCompactionEvent streamName log predicate) + let batched = load initial (gateway.LoadBatched streamName log (tryDecode,None)) + let compacted = load initial (gateway.LoadBackwardsStoppingAtCompactionEvent streamName log (tryDecode,isOrigin)) match access with | None -> batched - | Some AccessStrategy.EventsAreState -> compacted (fun _ -> true) - | Some (AccessStrategy.RollingSnapshots (et,_)) -> compacted ((=) et) + | Some AccessStrategy.EventsAreState + | Some (AccessStrategy.RollingSnapshots _) -> compacted let load (fold: 'state -> 'event seq -> 'state) initial f = async { let! token, events = f - return token, fold initial (UnionEncoderAdapters.decodeKnownEvents codec events) } + return token, fold initial events } member __.Load (fold: 'state -> 'event seq -> 'state) (initial: 'state) (streamName : string) (log : ILogger) : Async = loadAlgorithm (load fold) streamName initial log member __.LoadFromToken (fold: 'state -> 'event seq -> 'state) (state: 'state) (streamName : string) token (log : ILogger) : Async = - (load fold) state (gateway.LoadFromToken false streamName log token compactionPredicate) + (load fold) state (gateway.LoadFromToken false streamName log token (tryDecode,compactionPredicate)) member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger) ((Token.StreamPos (stream,pos) as streamToken), state : 'state) - (events : 'event list, state': 'state) : Async> = async { + (events : 'event list) : Async> = async { let events = match access with | None | Some AccessStrategy.EventsAreState -> events - | Some (AccessStrategy.RollingSnapshots (_,f)) -> + | Some (AccessStrategy.RollingSnapshots (_,compact)) -> let cc = CompactionContext(List.length events, pos.batchCapacityLimit.Value) - if cc.IsCompactionDue then events @ [f state'] else events + if cc.IsCompactionDue then events @ [fold state events |> compact] else events let encodedEvents : EventData[] = UnionEncoderAdapters.encodeEvents codec events - let! syncRes = gateway.TrySync log streamToken encodedEvents compactionPredicate + let! syncRes = gateway.TrySync log streamToken (events,encodedEvents) compactionPredicate match syncRes with - | GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (gateway.LoadFromToken true stream.name log streamToken compactionPredicate)) - | GatewaySyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state (Seq.ofList events)) } + | GatewaySyncResult.Conflict -> + return Storage.SyncResult.Conflict (load fold state (gateway.LoadFromToken true stream.name log streamToken (tryDecode,compactionPredicate))) + | GatewaySyncResult.Written token' -> + return Storage.SyncResult.Written (token', fold state (Seq.ofList events)) } module Caching = open System.Runtime.Caching @@ -396,8 +403,8 @@ module Caching = interface ICategory<'event, 'state> with member __.Load (streamName : string) (log : ILogger) : Async = interceptAsync (inner.Load streamName log) streamName - member __.TrySync (log : ILogger) ((Token.StreamPos (stream,_) as token), state) (events : 'event list, state' : 'state) : Async> = async { - let! syncRes = inner.TrySync log (token, state) (events,state') + member __.TrySync (log : ILogger) ((Token.StreamPos (stream,_) as token), state) (events : 'event list) : Async> = async { + let! syncRes = inner.TrySync log (token, state) events match syncRes with | Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict (interceptAsync resync stream.name) | Storage.SyncResult.Written (token', state') -> return Storage.SyncResult.Written (token', state') } @@ -425,8 +432,8 @@ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: ' interface ICategory<'event, 'state> with member __.Load (streamName : string) (log : ILogger) : Async = loadAlgorithm streamName initial log - member __.TrySync (log : ILogger) (token, initialState) (events : 'event list, state' : 'state) : Async> = async { - let! syncRes = category.TrySync fold log (token, initialState) (events, state') + member __.TrySync (log : ILogger) (token, initialState) (events : 'event list) : Async> = async { + let! syncRes = category.TrySync fold log (token, initialState) events match syncRes with | Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict resync | Storage.SyncResult.Written (token',state') -> return Storage.SyncResult.Written (token',state') } diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index 95976fd4a..89e269727 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -79,7 +79,7 @@ type MemoryCategory<'event, 'state>(store : VolatileStore, fold, initial) = match store.TryLoad<'event> streamName log with | None -> return Token.ofEmpty streamName initial | Some events -> return Token.ofEventArray streamName fold initial events } - member __.TrySync (log : ILogger) (Token.Unpack token, state) (events : 'event list, _state': 'state) = async { + member __.TrySync (log : ILogger) (Token.Unpack token, state) (events : 'event list) = async { let trySyncValue currentValue = if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion) else ConcurrentDictionarySyncResult.Written (Seq.append currentValue events) diff --git a/src/Equinox/Equinox.fs b/src/Equinox/Equinox.fs index 91e84dc5a..53ee60656 100644 --- a/src/Equinox/Equinox.fs +++ b/src/Equinox/Equinox.fs @@ -51,7 +51,7 @@ type IStream<'event, 'state> = /// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry abstract TrySync: log: ILogger -> token: Storage.StreamToken * originState: 'state - -> eventsAndState: 'event list * 'state + -> events: 'event list -> Async> /// Store-agnostic interface representing interactions an Application can have with a set of streams @@ -66,7 +66,7 @@ type ICategory<'event, 'state> = /// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store) abstract TrySync : log: ILogger -> token: Storage.StreamToken * originState: 'state - -> events: 'event list * state: 'state + -> events: 'event list -> Async> /// Defines a hook enabling retry and backoff policies to be specified @@ -80,10 +80,10 @@ module private Flow = /// Represents stream and folding state between the load and run/render phases type SyncState<'event, 'state> ( fold, originState : Storage.StreamToken * 'state, - trySync : ILogger -> Storage.StreamToken * 'state -> 'event list * 'state-> Async>) = + trySync : ILogger -> Storage.StreamToken * 'state -> 'event list -> Async>) = let mutable tokenAndState = originState - let tryOr log eventsAndState handleFailure = async { - let! res = trySync log tokenAndState eventsAndState + let tryOr log events handleFailure = async { + let! res = trySync log tokenAndState events match res with | Storage.SyncResult.Conflict resync -> return! handleFailure resync @@ -95,15 +95,15 @@ module private Flow = member __.State = snd __.Memento member __.CreateContext(): Context<'event, 'state> = Context<'event, 'state>(fold, __.State) - member __.TryOrResync (retryPolicy : IRetryPolicy) (attemptNumber: int) (log : ILogger) eventsAndState = + member __.TryOrResync (retryPolicy : IRetryPolicy) (attemptNumber: int) (log : ILogger) events = let resyncInPreparationForRetry resync = async { let! streamState' = retryPolicy.Execute(log, attemptNumber, resync) tokenAndState <- streamState' return false } - tryOr log eventsAndState resyncInPreparationForRetry - member __.TryOrThrow log eventsAndState attempt = + tryOr log events resyncInPreparationForRetry + member __.TryOrThrow log events attempt = let throw _ = async { return raise <| MaxResyncsExhaustedException attempt } - tryOr log eventsAndState throw |> Async.Ignore + tryOr log events throw |> Async.Ignore /// Obtain a representation of the current state and metadata from the underlying storage stream let load (fold : 'state -> 'event seq -> 'state) (log : ILogger) (stream : IStream<'event, 'state>) @@ -129,10 +129,10 @@ module private Flow = return outcome elif attempt = maxSyncAttempts then log.Debug "Max Sync Attempts exceeded" - do! sync.TryOrThrow log (events, ctx.State) attempt + do! sync.TryOrThrow log events attempt return outcome else - let! committed = sync.TryOrResync retryPolicy attempt log (events, ctx.State) + let! committed = sync.TryOrResync retryPolicy attempt log events if not committed then log.Debug "Resyncing and retrying" return! loop (attempt + 1) @@ -161,8 +161,8 @@ module Stream = interface IStream<'event, 'state> with member __.Load log = category.Load streamName log - member __.TrySync (log: ILogger) (token: Storage.StreamToken, originState: 'state) (events: 'event list, state: 'state) = - category.TrySync log (token, originState) (events,state) + member __.TrySync (log: ILogger) (token: Storage.StreamToken, originState: 'state) (events: 'event list) = + category.TrySync log (token, originState) events /// Handles case where some earlier processing has loaded or determined a the state of a stream, allowing us to avoid a read roundtrip type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>, memento : Storage.StreamToken * 'state) = @@ -172,8 +172,8 @@ module Stream = match preloadedTokenAndState with | Some value -> async { preloadedTokenAndState <- None; return value } | None -> inner.Load log - member __.TrySync (log: ILogger) (token: Storage.StreamToken, originState: 'state) (events: 'event list, state: 'state) = - inner.TrySync log (token, originState) (events,state) + member __.TrySync (log: ILogger) (token: Storage.StreamToken, originState: 'state) (events: 'event list) = + inner.TrySync log (token, originState) events let create (category : ICategory<'event, 'state>) streamName : IStream<'event, 'state> = Stream(category, streamName) :> _ let ofMemento (memento : Storage.StreamToken * 'state) (x : IStream<_,_>) : IStream<'event, 'state> = InitializedStream(x, memento) :> _ diff --git a/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs b/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs index 372ab9084..e7b136189 100644 --- a/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs +++ b/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs @@ -23,19 +23,19 @@ let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings) module Cart = - let fold, initial, compact = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.compact + let fold, initial, snapshot = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.snapshot let codec = genCodec() let createServiceWithoutOptimization log gateway = Backend.Cart.Service(log, GesStreamBuilder(gateway, codec, fold, initial).Create) let createServiceWithCompaction log gateway = - let resolveStream streamName = GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact).Create(streamName) + let resolveStream streamName = GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot).Create(streamName) Backend.Cart.Service(log, resolveStream) let createServiceWithCaching log gateway cache = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) Backend.Cart.Service(log, GesStreamBuilder(gateway, codec, fold, initial, caching = sliding20m).Create) let createServiceWithCompactionAndCaching log gateway cache = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - Backend.Cart.Service(log, GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact, sliding20m).Create) + Backend.Cart.Service(log, GesStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot, sliding20m).Create) module ContactPreferences = let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial