diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index 470e50265..531d0e7ff 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -176,7 +176,7 @@ module SerilogHelpers = match evt with | Equinox.Cosmos.Log.Index { ru = ru } | Equinox.Cosmos.Log.IndexNotFound { ru = ru } - | Equinox.Cosmos.Log.IndexCached { ru = ru } + | Equinox.Cosmos.Log.IndexNotModified { ru = ru } | Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Forward,_, { ru = ru }) | Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Backward,_, { ru = ru }) -> CosmosReadRu ru | Equinox.Cosmos.Log.WriteSuccess {ru = ru } diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs index 83a5f937c..d438a740b 100644 --- a/samples/Store/Integration/LogIntegration.fs +++ b/samples/Store/Integration/LogIntegration.fs @@ -34,8 +34,8 @@ module EquinoxCosmosInterop = | Log.Batch (Direction.Forward,c,m) -> "EqxLoadF", m, Some c, m.ru | Log.Batch (Direction.Backward,c,m) -> "EqxLoadB", m, Some c, m.ru | Log.Index m -> "EqxLoadI", m, None, m.ru - | Log.IndexNotFound m -> "EqxLoadIN", m, None, m.ru - | Log.IndexCached m -> "EqxLoadIC", m, None, m.ru + | Log.IndexNotFound m -> "EqxLoadI404", m, None, m.ru + | Log.IndexNotModified m -> "EqxLoadI302", m, None, m.ru { action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; batches = batches interval = StopwatchInterval(metric.interval.StartTicks,metric.interval.EndTicks); ru = ru } diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index bdee6d557..4a38f1ec5 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -13,18 +13,6 @@ open System [] module DocDbExtensions = - type Client.RequestOptions with - /// Simplified ETag precondition builder - member options.ETag - with get () = - match options.AccessCondition with - | null -> null - | ac -> ac.Condition - - and set etag = - if String.IsNullOrEmpty etag then () else - options.AccessCondition <- Client.AccessCondition(Type = Client.AccessConditionType.IfMatch, Condition = etag) - /// Extracts the innermost exception from a nested hierarchy of Aggregate Exceptions let (|AggregateException|) (exn : exn) = let rec aux (e : exn) = @@ -34,31 +22,35 @@ module DocDbExtensions = | _ -> e aux exn - /// DocumentDB Error HttpStatusCode extractor let (|DocDbException|_|) (e : exn) = match e with | AggregateException (:? DocumentClientException as dce) -> Some dce | _ -> None /// Map Nullable to Option - let (|HasValue|_|) (x:Nullable<_>) = if x.HasValue then Some x.Value else None + let (|HasValue|Null|) (x:Nullable<_>) = + if x.HasValue then HasValue x.Value + else Null /// DocumentDB Error HttpStatusCode extractor let (|DocDbStatusCode|_|) (e : DocumentClientException) = match e.StatusCode with | HasValue x -> Some x - | _ -> None + | Null -> None - type ReadResult<'T> = Found of 'T | NotFound | PreconditionFailed + type ReadResult<'T> = Found of 'T | NotFound | NotModified type DocDbCollection(client : IDocumentClient, collectionUri) = member __.TryReadDocument(documentId : string, ?options : Client.RequestOptions): Async> = async { let! ct = Async.CancellationToken let options = defaultArg options null let docLink = sprintf "%O/docs/%s" collectionUri documentId try let! document = async { return! client.ReadDocumentAsync<'T>(docLink, options = options, cancellationToken = ct) |> Async.AwaitTaskCorrect } - return document.RequestCharge, Found document.Document + if document.StatusCode = System.Net.HttpStatusCode.NotModified then return document.RequestCharge, NotModified + // NB `.Document` will NRE if a IfNoneModified precondition triggers a NotModified result + else return document.RequestCharge, Found document.Document with | DocDbException (DocDbStatusCode System.Net.HttpStatusCode.NotFound as e) -> return e.RequestCharge, NotFound - | DocDbException (DocDbStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, PreconditionFailed } + // NB while the docs suggest you may see a 412, the NotModified in the body of the try/with is actually what happens + | DocDbException (DocDbStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, NotModified } module Store = [] @@ -144,6 +136,10 @@ module Store = { p: string // "{streamName}" id: string // "{-1}" + /// When we read, we need to capture the value so we can retain it for caching purposes; when we write, there's no point sending it as it would not be honored + [] + _etag: string + //w: int64 // 100: window size /// last index/i value m: int64 // {index} @@ -167,7 +163,7 @@ module Store = } static member IdConstant = "-1" static member Create (pos: Position) eventCount (eds: EventData[]) : IndexEvent = - { p = pos.streamName; id = IndexEvent.IdConstant; m = pos.IndexRel eventCount + { p = pos.streamName; id = IndexEvent.IdConstant; m = pos.IndexRel eventCount; _etag = null c = [| for ed in eds -> { t = ed.eventType; d = ed.data; m = ed.metadata } |] } and IndexProjection = { /// The Event Type, used to drive deserialization @@ -217,7 +213,7 @@ module Log = /// Individual read request for the Index, not found | IndexNotFound of Measurement /// Index read with Single RU Request Charge due to correct use of etag in cache - | IndexCached of Measurement + | IndexNotModified of Measurement /// Summarizes a set of Slices read together | Batch of Direction * slices: int * Measurement let prop name value (log : ILogger) = log.ForContext(name, value) @@ -320,27 +316,27 @@ module private Read = let! t, (ru, res : ReadResult) = getIndex pos |> Stopwatch.Time let log count bytes (f : Log.Measurement -> _) = log |> Log.event (f { stream = pos.streamName; interval = t; bytes = bytes; count = count; ru = ru }) match res with - | ReadResult.PreconditionFailed -> - (log 0 0 Log.IndexCached).Information("Eqx {action:l} {ms}ms rc={ru}", "IndexCached", (let e = t.Elapsed in e.TotalMilliseconds), ru) + | ReadResult.NotModified -> + (log 0 0 Log.IndexNotModified).Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.NotFound -> - (log 0 0 Log.IndexNotFound).Information("Eqx {action:l} {ms}ms rc={ru}", "IndexNotFound", (let e = t.Elapsed in e.TotalMilliseconds), ru) + (log 0 0 Log.IndexNotFound).Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.Found doc -> let log = let (|EventLen|) (x : Store.IndexProjection) = match x.d, x.m with Log.BlobLen bytes, Log.BlobLen metaBytes -> bytes + metaBytes let bytes, count = doc.c |> Array.sumBy (|EventLen|), doc.c.Length log bytes count Log.Index - let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propProjectionEvents "Json" doc.c - log.Information("Eqx {action:l} {ms}ms rc={ru}", "Index", (let e = t.Elapsed in e.TotalMilliseconds), ru) + let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propProjectionEvents "Json" doc.c |> Log.prop "etag" doc._etag + log.Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru) return ru, res } - type [] IndexResult = Unchanged | NotFound | Found of Store.Position * Store.IndexProjection[] + type [] IndexResult = NotModified | NotFound | Found of Store.Position * Store.IndexProjection[] /// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with IndexResult.UnChanged let loadIndex (log : ILogger) retryPolicy client (pos : Store.Position): Async = async { let getIndex = getIndex client let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGetIndex getIndex pos) log match res with - | ReadResult.PreconditionFailed -> return IndexResult.Unchanged + | ReadResult.NotModified -> return IndexResult.NotModified | ReadResult.NotFound -> return IndexResult.NotFound - | ReadResult.Found index -> return IndexResult.Found ({ pos with index = Some index.m }, index.c) } + | ReadResult.Found index -> return IndexResult.Found ({ pos with index = Some index.m; etag=if index._etag=null then None else Some index._etag }, index.c) } let private getQuery (client : IDocumentClient) (pos:Store.Position) (direction: Direction) batchSize = let querySpec = @@ -505,9 +501,11 @@ module Token = let ofPreviousStreamVersionAndCompactionEventDataIndex prevStreamVersion compactionEventDataIndex eventsLength batchSize streamVersion' : Storage.StreamToken = ofCompactionEventNumber (Some (prevStreamVersion + 1L + int64 compactionEventDataIndex)) eventsLength batchSize streamVersion' let private unpackEqxStreamVersion (x : Storage.StreamToken) = let x : Token = unbox x.value in x.pos.Index + let private unpackEqxETag (x : Storage.StreamToken) = let x : Token = unbox x.value in x.pos.etag let supersedes current x = let currentVersion, newVersion = unpackEqxStreamVersion current, unpackEqxStreamVersion x - newVersion > currentVersion + let currentETag, newETag = unpackEqxETag current, unpackEqxETag x + newVersion > currentVersion || currentETag <> newETag type EqxConnection(client: IDocumentClient, ?readRetryPolicy (*: (int -> Async<'T>) -> Async<'T>*), ?writeRetryPolicy) = member __.Client = client @@ -535,9 +533,9 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = let (|IEventDataArray|) events = [| for e in events -> e :> Store.IEventData |] member private __.InterpretIndexOrFallback log isCompactionEventType pos res: Async = async { match res with + | Read.IndexResult.NotModified -> return invalidOp "Not handled" | Read.IndexResult.Found (pos, projectionsAndEvents) when projectionsAndEvents |> Array.exists (fun x -> isCompactionEventType x.t) -> return Token.ofNonCompacting pos, projectionsAndEvents |> Seq.cast |> Array.ofSeq - | Read.IndexResult.Unchanged -> return invalidOp "Not handled" | _ -> let! streamToken, events = __.LoadBackwardsStoppingAtCompactionEvent log isCompactionEventType pos return streamToken, events |> Seq.cast |> Array.ofSeq } @@ -573,7 +571,7 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = else let! res = Read.loadIndex log None(* TODO conn.ReadRetryPolicy*) conn.Client pos match res with - | Read.IndexResult.Unchanged -> + | Read.IndexResult.NotModified -> return LoadFromTokenResult.Unchanged | _ -> let! loaded = __.InterpretIndexOrFallback log isCompactionEventType.Value pos res @@ -803,21 +801,19 @@ module Initialization = var collection = getContext().getCollection(); var collectionLink = collection.getSelfLink(); if (!docs) throw new Error("docs argument is missing."); - if(index) { - var assignedEtag = null; + if (index) { function callback(err, doc, options) { if (err) throw err; - assignedEtag = doc._etag } if (-1 == expectedVersion) { collection.createDocument(collectionLink, index, { disableAutomaticIdGeneration : true}, callback); } else { collection.replaceDocument(collection.getAltLink() + "/docs/" + index.id, index, callback); } + response.setBody({ etag: null, conflicts: null }); + } else { // can also contain { conflicts: [{t, d}] } representing conflicting events since expectedVersion // null/missing signifies events have been written, with no conflict - response.setBody({ etag: assignedEtag, conflicts: null }); - } else { response.setBody({ etag: null, conflicts: null }); } for (var i=0; i EqxAct.BatchBackward | Equinox.Cosmos.Log.Index _ -> EqxAct.Indexed | Equinox.Cosmos.Log.IndexNotFound _ -> EqxAct.IndexedNotFound - | Equinox.Cosmos.Log.IndexCached _ -> EqxAct.IndexedCached + | Equinox.Cosmos.Log.IndexNotModified _ -> EqxAct.IndexedCached let (|EqxEvent|_|) (logEvent : LogEvent) : Equinox.Cosmos.Log.Event option = logEvent.Properties.Values |> Seq.tryPick (function | SerilogScalar (:? Equinox.Cosmos.Log.Event as e) -> Some e diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 7d804f08d..88943ec0f 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -277,6 +277,8 @@ type Tests(testOutputHelper) = } let primeIndex = [EqxAct.IndexedNotFound; EqxAct.SliceBackward; EqxAct.BatchBackward] + // When the test gets re-run to simplify, the stream will typically already have values + let primeIndexRerun = [EqxAct.Indexed] [] let ``Can roundtrip against Cosmos, correctly using the index and cache to avoid redundant reads`` context skuId cartId = Async.RunSynchronously <| async { @@ -291,11 +293,11 @@ type Tests(testOutputHelper) = do! addAndThenRemoveItemsManyTimes context cartId skuId service1 5 let! _ = service2.Read cartId - // ... should see a single read (with a) we are writes are cached - let! _ = service2.Read cartId - test <@ primeIndex @ [EqxAct.Append; EqxAct.IndexedCached; EqxAct.IndexedCached] = capture.ExternalCalls @> + // ... should see a single Indexed read given writes are cached + test <@ primeIndex @ [EqxAct.Append; EqxAct.Indexed] = capture.ExternalCalls + || primeIndexRerun @ [EqxAct.Append; EqxAct.Indexed] = capture.ExternalCalls@> - // Add two more - the roundtrip should only incur a single read + // Add two more - the roundtrip should only incur a single read, which should be cached by virtue of being a second one in successono capture.Clear() do! addAndThenRemoveItemsManyTimes context cartId skuId service1 1 test <@ [EqxAct.IndexedCached; EqxAct.Append] = capture.ExternalCalls @> @@ -303,7 +305,9 @@ type Tests(testOutputHelper) = // While we now have 12 events, we should be able to read them with a single call capture.Clear() let! _ = service2.Read cartId - test <@ [EqxAct.IndexedCached] = capture.ExternalCalls @> + let! _ = service2.Read cartId + // First read is a re-read, second is cached + test <@ [EqxAct.Indexed;EqxAct.IndexedCached] = capture.ExternalCalls @> } []