From 472801616be5d3e7de786eba02bd1e5f8371a636 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 7 Nov 2018 16:30:01 +0000 Subject: [PATCH] Push compaction down into Stores, resolves #23 --- build.ps1 | 7 +- samples/Store/Integration/CartIntegration.fs | 6 +- .../ContactPreferencesIntegration.fs | 12 +-- .../Store/Integration/FavoritesIntegration.fs | 2 +- src/Equinox.Cosmos/Cosmos.fs | 93 ++++++++++++++----- .../CosmosIntegration.fs | 18 ++-- 6 files changed, 90 insertions(+), 48 deletions(-) diff --git a/build.ps1 b/build.ps1 index 5e9dbde31..3b27d0148 100644 --- a/build.ps1 +++ b/build.ps1 @@ -8,7 +8,6 @@ param( [Alias("cc")][string] $cosmosCollection=$env:EQUINOX_COSMOS_COLLECTION, [Alias("scp")][switch][bool] $skipProvisionCosmos=$skipCosmos -or -not $cosmosServer -or -not $cosmosDatabase -or -not $cosmosCollection, [Alias("scd")][switch][bool] $skipDeprovisionCosmos=$skipProvisionCosmos, - [string] $additionalMsBuildArgs [string] $additionalMsBuildArgs="-t:Build" ) @@ -21,8 +20,8 @@ $env:EQUINOX_INTEGRATION_SKIP_EVENTSTORE=[string]$skipEs if ($skipEs) { warn "Skipping EventStore tests" } function cliCosmos($arghs) { - Write-Host "dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection $arghs" - dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs + Write-Host "dotnet run cli/Equinox.Cli cosmos -s -d $cosmosDatabase -c $cosmosCollection $arghs" + dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs } if ($skipCosmos) { @@ -31,7 +30,7 @@ if ($skipCosmos) { warn "Skipping Provisioning Cosmos" } else { warn "Provisioning cosmos..." - dotnet run cli/Equinox.Cli cosmos $cosmosServer -d $cosmosDatabase -c $cosmosCollection provision -ru 10000 + cliCosmos @("provision", "-ru", "1000") $deprovisionCosmos=$true } $env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index 612623f04..899ba039c 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -22,9 +22,9 @@ let resolveGesStreamWithRollingSnapshots gateway = let resolveGesStreamWithoutCustomAccessStrategy gateway = GesResolver(gateway, codec, fold, initial).Resolve -let resolveEqxStreamWithCompactionEventType gateway compactionEventType (StreamArgs args) = - EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.EventType compactionEventType).Create(args) -let resolveEqxStreamWithoutCompactionSemantics gateway _compactionEventType (StreamArgs args) = +let resolveEqxStreamWithCompactionEventType gateway (StreamArgs args) = + EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args) +let resolveEqxStreamWithoutCompactionSemantics gateway (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial).Create(args) let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count = diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index 4f14c3833..e2714eae4 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -21,12 +21,10 @@ let resolveStreamGesWithOptimizedStorageSemantics gateway = let resolveStreamGesWithoutAccessStrategy gateway = GesResolver(gateway defaultBatchSize, codec, fold, initial).Resolve -let resolveStreamEqxWithCompactionSemantics gateway = - fun predicate (StreamArgs args) -> - EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.Predicate predicate).Create(args) -let resolveStreamEqxWithoutCompactionSemantics gateway = - fun _ignoreWindowSize _ignoreCompactionPredicate (StreamArgs args) -> - EqxStreamBuilder(gateway, codec, fold, initial).Create(args) +let resolveStreamEqxWithCompactionSemantics gateway (StreamArgs args) = + EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args) +let resolveStreamEqxWithoutCompactionSemantics gateway (StreamArgs args) = + EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args) type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper @@ -65,7 +63,7 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async { - let! service = arrangeWithoutCompaction connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics + let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics do! act service args } diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index ceeda64f7..41fbe87c9 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -21,7 +21,7 @@ let createServiceGes gateway log = Backend.Favorites.Service(log, resolveStream) let createServiceEqx gateway log = - let resolveStream cet (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.EventType cet).Create(args) + let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args) Backend.Favorites.Service(log, resolveStream) type Tests(testOutputHelper) = diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index a886ce779..49763b3d2 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -77,6 +77,41 @@ module Store = if array = null || Array.length array = 0 then serializer.Serialize(writer, null) else writer.WriteRawValue(System.Text.Encoding.UTF8.GetString(array)) + [] + type IndexEvent = + { p: string // "{streamName}" + id: string // "{-1}" + + w: int64 // 100: window size + /// last index/i value + m: int64 // {index} + + (* "x": [ + { "i":0, + "c":"ISO 8601" + "e":[ + [{"t":"added","d":"..."},{"t":"compacted/1","d":"..."}], + [{"t":"removed","d":"..."}], + ] + } + ] *) + x: JObject[][] } + + (* Pseudocode: + function sync(p, expectedVersion, windowSize, events) { + if (i == 0) then { + coll.insert(p,0,{ p:p, id:-1, w:windowSize, m:flatLen(events)}) + } else { + const i = doc.find(p=p && id=-1) + if(i.m <> expectedVersion) then emit from expectedVersion else + i.x.append(events) + for (var (i, c, e: [ {e1}, ...]) in events) { + coll.insert({p:p, id:i, i:i, c:c, e:e1) + } + // trim i.x to w total items in i.[e] + coll.update(p,id,i) + } + } *) [] type Direction = Forward | Backward with override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward" @@ -289,7 +324,6 @@ module UnionEncoderAdapters = type []Token = { pos: Store.Position; compactionEventNumber: int64 option } -[] module Token = let private create compactionEventNumber batchCapacityLimit pos : Storage.StreamToken = { value = box { pos = pos; compactionEventNumber = compactionEventNumber }; batchCapacityLimit = batchCapacityLimit } @@ -385,26 +419,49 @@ type private Collection(gateway : EqxGateway, databaseId, collectionId) = member __.Gateway = gateway member __.CollectionUri = Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId) -type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUnionEncoder<'event, byte[]>, ?compactionStrategy) = +[] +type AccessStrategy<'event,'state> = + | EventsAreState + | RollingSnapshots of eventType: string * compact: ('state -> 'event) + +type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) = + /// Determines whether writing a Compaction event is warranted (based on the existing state and the current `Accumulated` changes) + member __.IsCompactionDue = eventsLen > capacityBeforeCompaction + +type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUnionEncoder<'event, byte[]>, ?access : AccessStrategy<'event,'state>) = let (|Pos|) streamName : Store.Position = { collectionUri = coll.CollectionUri; streamName = streamName; index = None } + let compactionPredicate = + match access with + | None -> None + | Some AccessStrategy.EventsAreState -> Some (fun _ -> true) + | Some (AccessStrategy.RollingSnapshots (et,_)) -> Some ((=) et) let loadAlgorithm load (Pos pos) initial log = let batched = load initial (coll.Gateway.LoadBatched log None pos) let compacted predicate = load initial (coll.Gateway.LoadBackwardsStoppingAtCompactionEvent log predicate pos) - match compactionStrategy with - | Some predicate -> compacted predicate + match access with | None -> batched + | Some AccessStrategy.EventsAreState -> compacted (fun _ -> true) + | Some (AccessStrategy.RollingSnapshots (et,_)) -> compacted ((=) et) let load (fold: 'state -> 'event seq -> 'state) initial loadF = async { let! token, events = loadF return token, fold initial (UnionEncoderAdapters.decodeKnownEvents codec events) } member __.Load (fold: 'state -> 'event seq -> 'state) (initial: 'state) streamName (log : ILogger) : Async = loadAlgorithm (load fold) streamName initial log member __.LoadFromToken (fold: 'state -> 'event seq -> 'state) (state: 'state) token (log : ILogger) : Async = - (load fold) state (coll.Gateway.LoadFromToken log token compactionStrategy false) - member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger) (token, state) (events : 'event list) : Async> = async { + (load fold) state (coll.Gateway.LoadFromToken log token compactionPredicate false) + member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger) + (token : Storage.StreamToken, state : 'state) + (events : 'event list, state' : 'state) : Async> = async { + let events = + match access with + | None | Some AccessStrategy.EventsAreState -> events + | Some (AccessStrategy.RollingSnapshots (_,f)) -> + let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) + if cc.IsCompactionDue then events @ [f state'] else events let encodedEvents : Store.EventData[] = UnionEncoderAdapters.encodeEvents codec (Seq.ofList events) - let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionStrategy + let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionPredicate match syncRes with - | GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionStrategy true)) + | GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionPredicate true)) | GatewaySyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state (Seq.ofList events)) } module Caching = @@ -449,8 +506,8 @@ module Caching = interface ICategory<'event, 'state> with member __.Load (streamName : string) (log : ILogger) : Async = interceptAsync (inner.Load streamName log) streamName - member __.TrySync streamName (log : ILogger) (token, state) (events : 'event list) : Async> = async { - let! syncRes = inner.TrySync streamName log (token, state) events + member __.TrySync streamName (log : ILogger) (token, state) (events : 'event list, state' : 'state) : Async> = async { + let! syncRes = inner.TrySync streamName log (token, state) (events,state') match syncRes with | Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict (interceptAsync resync streamName) | Storage.SyncResult.Written (token', state') -> return Storage.SyncResult.Written (token', state') } @@ -478,17 +535,12 @@ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: ' interface ICategory<'event, 'state> with member __.Load (streamName : string) (log : ILogger) : Async = loadAlgorithm streamName initial log - member __.TrySync _streamName(* TODO remove from main interface *) (log : ILogger) (token, state) (events : 'event list) : Async> = async { - let! syncRes = category.TrySync fold log (token, state) events + member __.TrySync _streamName(* TODO remove from main interface *) (log : ILogger) (token, state) (events : 'event list, state': 'state) : Async> = async { + let! syncRes = category.TrySync fold log (token, state) (events,state') match syncRes with | Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict resync | Storage.SyncResult.Written (token',state') -> return Storage.SyncResult.Written (token',state') } -[] -type CompactionStrategy = - | EventType of string - | Predicate of (string -> bool) - [] type CachingStrategy = | SlidingWindow of Caching.Cache * window: TimeSpan @@ -497,12 +549,7 @@ type CachingStrategy = type EqxStreamBuilder<'event, 'state>(gateway : EqxGateway, codec, fold, initial, ?compaction, ?caching) = member __.Create (databaseId, collectionId, streamName) : Equinox.IStream<'event, 'state> = - let compactionPredicateOption = - match compaction with - | None -> None - | Some (CompactionStrategy.Predicate predicate) -> Some predicate - | Some (CompactionStrategy.EventType eventType) -> Some (fun x -> x = eventType) - let category = Category<'event, 'state>(Collection(gateway, databaseId, collectionId), codec, ?compactionStrategy = compactionPredicateOption) + let category = Category<'event, 'state>(Collection(gateway, databaseId, collectionId), codec, ?access = compaction) let readCacheOption = match caching with diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index d0e34da74..e0ed98f5a 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -11,27 +11,27 @@ let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings) module Cart = - let fold, initial = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial + let fold, initial, compact = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.compact let codec = genCodec() let createServiceWithoutOptimization connection batchSize log = let gateway = createEqxGateway connection batchSize - let resolveStream _ignoreCompactionEventTypeOption (StreamArgs args) = + let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial).Create(args) Backend.Cart.Service(log, resolveStream) let createServiceWithCompaction connection batchSize log = let gateway = createEqxGateway connection batchSize - let resolveStream compactionEventType (StreamArgs args) = - EqxStreamBuilder(gateway, codec, fold, initial, compaction=CompactionStrategy.EventType compactionEventType).Create(args) + let resolveStream (StreamArgs args) = + EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact).Create(args) Backend.Cart.Service(log, resolveStream) let createServiceWithCaching connection batchSize log cache = let gateway = createEqxGateway connection batchSize let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream _ignorecompactionEventType (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, caching = sliding20m).Create(args) + let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, caching = sliding20m).Create(args) Backend.Cart.Service(log, resolveStream) let createServiceWithCompactionAndCaching connection batchSize log cache = let gateway = createEqxGateway connection batchSize let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream cet (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, CompactionStrategy.EventType cet, sliding20m).Create(args) + let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact, sliding20m).Create(args) Backend.Cart.Service(log, resolveStream) module ContactPreferences = @@ -39,12 +39,10 @@ module ContactPreferences = let codec = genCodec() let createServiceWithoutOptimization createGateway defaultBatchSize log _ignoreWindowSize _ignoreCompactionPredicate = let gateway = createGateway defaultBatchSize - let resolveStream _windowSize _compactionPredicate (StreamArgs args) = - EqxStreamBuilder(gateway, codec, fold, initial).Create(args) + let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial).Create(args) Backend.ContactPreferences.Service(log, resolveStream) let createService createGateway log = - let resolveStream batchSize compactionPredicate (StreamArgs args) = - EqxStreamBuilder(createGateway batchSize, codec, fold, initial, CompactionStrategy.Predicate compactionPredicate).Create(args) + let resolveStream (StreamArgs args) = EqxStreamBuilder(createGateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(args) Backend.ContactPreferences.Service(log, resolveStream) #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)