Skip to content

Commit

Permalink
Correct caching of Writes
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 20, 2018
1 parent c2886d6 commit 31fde7e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 20 deletions.
12 changes: 6 additions & 6 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,18 @@ module Test =
let c = Caching.Cache("Cli", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
let eqxCache =
if targs.Contains Cached then
let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
let resolveStream streamName =
match store with
| Store.Mem store ->
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName)
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create(streamName)
| Store.Cosmos (gateway, databaseId, connectionId) ->
let cache =
if targs.Contains Cached then
let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
if targs.Contains Indexed then
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.IndexedSearch index, ?caching = cache)
.Create(databaseId, connectionId, streamName)
Expand Down Expand Up @@ -247,7 +247,7 @@ let main argv =
let clients = Array.init (testsPerSecond * 2) (fun _ -> Guid.NewGuid () |> ClientId)

let test = targs.GetResult(Name,Favorites)
log.Information( "Running {test} with caching: {caching}, indexing: {indexed}. "+
log.Information( "Running {test} with caching: {cached}, indexing: {indexed}. "+
"Duration for {duration} with test freq {tps} hits/s; max errors: {errorCutOff}, reporting intervals: {ri}, report file: {report}",
test, targs.Contains Cached, targs.Contains Indexed, duration, testsPerSecond, errorCutoff, reportingIntervals, report)
let runSingleTest clientId =
Expand Down
19 changes: 11 additions & 8 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,15 @@ type EqxSyncResult =
| ConflictUnknown of Store.Position
| Conflict of Store.Position * events: Store.IEventData[]

// NB don't nest in a private module, or serialization will fail miserably ;)
[<CLIMutable; NoEquality; NoComparison; JsonObject(ItemRequired=Required.AllowNull)>]
type WriteResponse = { etag: string; conflicts: Store.IndexProjection[] }

module private Write =
[<CLIMutable; NoEquality; NoComparison; JsonObject(ItemRequired=Required.AllowNull)>]
type WriteResponse = { etag: string; conflicts: Store.IndexProjection[] }
let [<Literal>] sprocName = "AtomicMultiDocInsert"
let [<Literal>] sprocName = "EquinoxIndexedWrite"

let private writeEventsAsync (client: IDocumentClient) (pos: Store.Position) (events: Store.EventData seq,maybeIndexEvents): Async<float*EqxSyncResult> = async {
let sprocUri = sprintf "%O/sprocs/%s" pos.collectionUri sprocName
let sprocLink = sprintf "%O/sprocs/%s" pos.collectionUri sprocName
let opts = Client.RequestOptions(PartitionKey=PartitionKey(pos.streamName))
let! ct = Async.CancellationToken
let events = events |> Seq.mapi (fun i ed -> Store.Event.Create pos (i+1) ed |> JsonConvert.SerializeObject) |> Seq.toArray
Expand All @@ -267,7 +269,7 @@ module private Write =
| None | Some [||] -> Unchecked.defaultof<_>
| Some eds -> Store.IndexEvent.Create pos (events.Length) eds
try
let! (res : Client.StoredProcedureResponse<WriteResponse>) = client.ExecuteStoredProcedureAsync(sprocUri, opts, ct, box events, box pos.Index, box pos.etag, box index) |> Async.AwaitTaskCorrect
let! (res : Client.StoredProcedureResponse<WriteResponse>) = client.ExecuteStoredProcedureAsync(sprocLink, opts, ct, box events, box pos.Index, box pos.etag, box index) |> Async.AwaitTaskCorrect
match res.RequestCharge, (match res.Response.etag with null -> None | x -> Some x), res.Response.conflicts with
| rc,e,null -> return rc, EqxSyncResult.Written { pos with index = Some (pos.IndexRel events.Length); etag=e }
| rc,e,[||] -> return rc, EqxSyncResult.ConflictUnknown { pos with etag=e }
Expand Down Expand Up @@ -361,7 +363,7 @@ module private Read =
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propResolvedEvents "Json" slice
let index = match slice |> Array.tryHead with Some head -> head.id | None -> null
(log |> Log.prop "startIndex" pos.Index |> Log.prop "bytes" bytes |> Log.event evt)
.Information("Eqx {action:l} {count} {ms}ms i={index} rc={ru}", "Read", count, (let e = t.Elapsed in e.TotalMilliseconds), index, ru)
.Information("Eqx {action:l} {count} {direction} {ms}ms i={index} rc={ru}", "Query", count, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru)
return slice, ru }

let private readBatches (log : ILogger) (readSlice: IDocumentQuery<Store.Event> -> ILogger -> Async<Store.Event[] * float>)
Expand Down Expand Up @@ -796,22 +798,23 @@ module Initialization =
return coll.Resource.Id }

let createProc (client: IDocumentClient) (collectionUri: Uri) = async {
let f = """function multidocInsert(docs, expectedVersion, etag, index) {
let f = """function indexedWrite(docs, expectedVersion, etag, index) {
var response = getContext().getResponse();
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
if (!docs) throw new Error("docs argument is missing.");
if (index) {
function callback(err, doc, options) {
if (err) throw err;
response.setBody({ etag: doc._etag, conflicts: null });
}
if (-1 == expectedVersion) {
collection.createDocument(collectionLink, index, { disableAutomaticIdGeneration : true}, callback);
} else {
collection.replaceDocument(collection.getAltLink() + "/docs/" + index.id, index, callback);
}
response.setBody({ etag: null, conflicts: null });
} else {
// call always expects a parseable json response with `etag` and `conflicts`
// can also contain { conflicts: [{t, d}] } representing conflicting events since expectedVersion
// null/missing signifies events have been written, with no conflict
response.setBody({ etag: null, conflicts: null });
Expand Down
12 changes: 6 additions & 6 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ type Tests(testOutputHelper) =

let primeIndex = [EqxAct.IndexedNotFound; EqxAct.SliceBackward; EqxAct.BatchBackward]
// When the test gets re-run to simplify, the stream will typically already have values
let primeIndexRerun = [EqxAct.Indexed]
let primeIndexRerun = [EqxAct.IndexedCached]

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly using the index and cache to avoid redundant reads`` context skuId cartId = Async.RunSynchronously <| async {
Expand All @@ -293,9 +293,9 @@ type Tests(testOutputHelper) =
do! addAndThenRemoveItemsManyTimes context cartId skuId service1 5
let! _ = service2.Read cartId

// ... should see a single Indexed read given writes are cached
test <@ primeIndex @ [EqxAct.Append; EqxAct.Indexed] = capture.ExternalCalls
|| primeIndexRerun @ [EqxAct.Append; EqxAct.Indexed] = capture.ExternalCalls@>
// ... should see a single Cached Indexed read given writes are cached and writer emits etag
test <@ primeIndex @ [EqxAct.Append; EqxAct.IndexedCached] = capture.ExternalCalls
|| primeIndexRerun @ [EqxAct.Append; EqxAct.IndexedCached] = capture.ExternalCalls@>

// Add two more - the roundtrip should only incur a single read, which should be cached by virtue of being a second one in successono
capture.Clear()
Expand All @@ -306,8 +306,8 @@ type Tests(testOutputHelper) =
capture.Clear()
let! _ = service2.Read cartId
let! _ = service2.Read cartId
// First read is a re-read, second is cached
test <@ [EqxAct.Indexed;EqxAct.IndexedCached] = capture.ExternalCalls @>
// First is cached because writer emits etag, second remains cached
test <@ [EqxAct.IndexedCached; EqxAct.IndexedCached] = capture.ExternalCalls @>
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
Expand Down

0 comments on commit 31fde7e

Please sign in to comment.