diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index 531d0e7ff..94885344c 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -145,6 +145,11 @@ module Test = let c = Caching.Cache("Cli", sizeMb = 50) CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some else None + let eqxCache = + if targs.Contains Cached then + let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50) + Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some + else None let resolveStream streamName = match store with | Store.Mem store -> @@ -152,11 +157,6 @@ module Test = | Store.Es gateway -> GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create(streamName) | Store.Cosmos (gateway, databaseId, connectionId) -> - let cache = - if targs.Contains Cached then - let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50) - Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some - else None if targs.Contains Indexed then EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.IndexedSearch index, ?caching = cache) .Create(databaseId, connectionId, streamName) @@ -247,7 +247,7 @@ let main argv = let clients = Array.init (testsPerSecond * 2) (fun _ -> Guid.NewGuid () |> ClientId) let test = targs.GetResult(Name,Favorites) - log.Information( "Running {test} with caching: {caching}, indexing: {indexed}. "+ + log.Information( "Running {test} with caching: {cached}, indexing: {indexed}. "+ "Duration for {duration} with test freq {tps} hits/s; max errors: {errorCutOff}, reporting intervals: {ri}, report file: {report}", test, targs.Contains Cached, targs.Contains Indexed, duration, testsPerSecond, errorCutoff, reportingIntervals, report) let runSingleTest clientId = diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 4a38f1ec5..1d67f753a 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -251,13 +251,15 @@ type EqxSyncResult = | ConflictUnknown of Store.Position | Conflict of Store.Position * events: Store.IEventData[] +// NB don't nest in a private module, or serialization will fail miserably ;) +[] +type WriteResponse = { etag: string; conflicts: Store.IndexProjection[] } + module private Write = - [] - type WriteResponse = { etag: string; conflicts: Store.IndexProjection[] } - let [] sprocName = "AtomicMultiDocInsert" + let [] sprocName = "EquinoxIndexedWrite" let private writeEventsAsync (client: IDocumentClient) (pos: Store.Position) (events: Store.EventData seq,maybeIndexEvents): Async = async { - let sprocUri = sprintf "%O/sprocs/%s" pos.collectionUri sprocName + let sprocLink = sprintf "%O/sprocs/%s" pos.collectionUri sprocName let opts = Client.RequestOptions(PartitionKey=PartitionKey(pos.streamName)) let! ct = Async.CancellationToken let events = events |> Seq.mapi (fun i ed -> Store.Event.Create pos (i+1) ed |> JsonConvert.SerializeObject) |> Seq.toArray @@ -267,7 +269,7 @@ module private Write = | None | Some [||] -> Unchecked.defaultof<_> | Some eds -> Store.IndexEvent.Create pos (events.Length) eds try - let! (res : Client.StoredProcedureResponse) = client.ExecuteStoredProcedureAsync(sprocUri, opts, ct, box events, box pos.Index, box pos.etag, box index) |> Async.AwaitTaskCorrect + let! (res : Client.StoredProcedureResponse) = client.ExecuteStoredProcedureAsync(sprocLink, opts, ct, box events, box pos.Index, box pos.etag, box index) |> Async.AwaitTaskCorrect match res.RequestCharge, (match res.Response.etag with null -> None | x -> Some x), res.Response.conflicts with | rc,e,null -> return rc, EqxSyncResult.Written { pos with index = Some (pos.IndexRel events.Length); etag=e } | rc,e,[||] -> return rc, EqxSyncResult.ConflictUnknown { pos with etag=e } @@ -361,7 +363,7 @@ module private Read = let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propResolvedEvents "Json" slice let index = match slice |> Array.tryHead with Some head -> head.id | None -> null (log |> Log.prop "startIndex" pos.Index |> Log.prop "bytes" bytes |> Log.event evt) - .Information("Eqx {action:l} {count} {ms}ms i={index} rc={ru}", "Read", count, (let e = t.Elapsed in e.TotalMilliseconds), index, ru) + .Information("Eqx {action:l} {count} {direction} {ms}ms i={index} rc={ru}", "Query", count, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru) return slice, ru } let private readBatches (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) @@ -796,7 +798,7 @@ module Initialization = return coll.Resource.Id } let createProc (client: IDocumentClient) (collectionUri: Uri) = async { - let f = """function multidocInsert(docs, expectedVersion, etag, index) { + let f = """function indexedWrite(docs, expectedVersion, etag, index) { var response = getContext().getResponse(); var collection = getContext().getCollection(); var collectionLink = collection.getSelfLink(); @@ -804,14 +806,15 @@ module Initialization = if (index) { function callback(err, doc, options) { if (err) throw err; + response.setBody({ etag: doc._etag, conflicts: null }); } if (-1 == expectedVersion) { collection.createDocument(collectionLink, index, { disableAutomaticIdGeneration : true}, callback); } else { collection.replaceDocument(collection.getAltLink() + "/docs/" + index.id, index, callback); } - response.setBody({ etag: null, conflicts: null }); } else { + // call always expects a parseable json response with `etag` and `conflicts` // can also contain { conflicts: [{t, d}] } representing conflicting events since expectedVersion // null/missing signifies events have been written, with no conflict response.setBody({ etag: null, conflicts: null }); diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 88943ec0f..a7ef73ff2 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -278,7 +278,7 @@ type Tests(testOutputHelper) = let primeIndex = [EqxAct.IndexedNotFound; EqxAct.SliceBackward; EqxAct.BatchBackward] // When the test gets re-run to simplify, the stream will typically already have values - let primeIndexRerun = [EqxAct.Indexed] + let primeIndexRerun = [EqxAct.IndexedCached] [] let ``Can roundtrip against Cosmos, correctly using the index and cache to avoid redundant reads`` context skuId cartId = Async.RunSynchronously <| async { @@ -293,9 +293,9 @@ type Tests(testOutputHelper) = do! addAndThenRemoveItemsManyTimes context cartId skuId service1 5 let! _ = service2.Read cartId - // ... should see a single Indexed read given writes are cached - test <@ primeIndex @ [EqxAct.Append; EqxAct.Indexed] = capture.ExternalCalls - || primeIndexRerun @ [EqxAct.Append; EqxAct.Indexed] = capture.ExternalCalls@> + // ... should see a single Cached Indexed read given writes are cached and writer emits etag + test <@ primeIndex @ [EqxAct.Append; EqxAct.IndexedCached] = capture.ExternalCalls + || primeIndexRerun @ [EqxAct.Append; EqxAct.IndexedCached] = capture.ExternalCalls@> // Add two more - the roundtrip should only incur a single read, which should be cached by virtue of being a second one in successono capture.Clear() @@ -306,8 +306,8 @@ type Tests(testOutputHelper) = capture.Clear() let! _ = service2.Read cartId let! _ = service2.Read cartId - // First read is a re-read, second is cached - test <@ [EqxAct.Indexed;EqxAct.IndexedCached] = capture.ExternalCalls @> + // First is cached because writer emits etag, second remains cached + test <@ [EqxAct.IndexedCached; EqxAct.IndexedCached] = capture.ExternalCalls @> } []