Skip to content

Commit

Permalink
Complete handling for caching of index reads
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 20, 2018
1 parent 23a2fe2 commit c2886d6
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ module SerilogHelpers =
match evt with
| Equinox.Cosmos.Log.Index { ru = ru }
| Equinox.Cosmos.Log.IndexNotFound { ru = ru }
| Equinox.Cosmos.Log.IndexCached { ru = ru }
| Equinox.Cosmos.Log.IndexNotModified { ru = ru }
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Forward,_, { ru = ru })
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Backward,_, { ru = ru }) -> CosmosReadRu ru
| Equinox.Cosmos.Log.WriteSuccess {ru = ru }
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ module EquinoxCosmosInterop =
| Log.Batch (Direction.Forward,c,m) -> "EqxLoadF", m, Some c, m.ru
| Log.Batch (Direction.Backward,c,m) -> "EqxLoadB", m, Some c, m.ru
| Log.Index m -> "EqxLoadI", m, None, m.ru
| Log.IndexNotFound m -> "EqxLoadIN", m, None, m.ru
| Log.IndexCached m -> "EqxLoadIC", m, None, m.ru
| Log.IndexNotFound m -> "EqxLoadI404", m, None, m.ru
| Log.IndexNotModified m -> "EqxLoadI302", m, None, m.ru
{ action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; batches = batches
interval = StopwatchInterval(metric.interval.StartTicks,metric.interval.EndTicks); ru = ru }

Expand Down
68 changes: 32 additions & 36 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,6 @@ open System

[<AutoOpen>]
module DocDbExtensions =
type Client.RequestOptions with
/// Simplified ETag precondition builder
member options.ETag
with get () =
match options.AccessCondition with
| null -> null
| ac -> ac.Condition

and set etag =
if String.IsNullOrEmpty etag then () else
options.AccessCondition <- Client.AccessCondition(Type = Client.AccessConditionType.IfMatch, Condition = etag)

/// Extracts the innermost exception from a nested hierarchy of Aggregate Exceptions
let (|AggregateException|) (exn : exn) =
let rec aux (e : exn) =
Expand All @@ -34,31 +22,35 @@ module DocDbExtensions =
| _ -> e

aux exn

/// DocumentDB Error HttpStatusCode extractor
let (|DocDbException|_|) (e : exn) =
match e with
| AggregateException (:? DocumentClientException as dce) -> Some dce
| _ -> None
/// Map Nullable to Option
let (|HasValue|_|) (x:Nullable<_>) = if x.HasValue then Some x.Value else None
let (|HasValue|Null|) (x:Nullable<_>) =
if x.HasValue then HasValue x.Value
else Null
/// DocumentDB Error HttpStatusCode extractor
let (|DocDbStatusCode|_|) (e : DocumentClientException) =
match e.StatusCode with
| HasValue x -> Some x
| _ -> None
| Null -> None

type ReadResult<'T> = Found of 'T | NotFound | PreconditionFailed
type ReadResult<'T> = Found of 'T | NotFound | NotModified
type DocDbCollection(client : IDocumentClient, collectionUri) =
member __.TryReadDocument(documentId : string, ?options : Client.RequestOptions): Async<float * ReadResult<'T>> = async {
let! ct = Async.CancellationToken
let options = defaultArg options null
let docLink = sprintf "%O/docs/%s" collectionUri documentId
try let! document = async { return! client.ReadDocumentAsync<'T>(docLink, options = options, cancellationToken = ct) |> Async.AwaitTaskCorrect }
return document.RequestCharge, Found document.Document
if document.StatusCode = System.Net.HttpStatusCode.NotModified then return document.RequestCharge, NotModified
// NB `.Document` will NRE if a IfNoneModified precondition triggers a NotModified result
else return document.RequestCharge, Found document.Document
with
| DocDbException (DocDbStatusCode System.Net.HttpStatusCode.NotFound as e) -> return e.RequestCharge, NotFound
| DocDbException (DocDbStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, PreconditionFailed }
// NB while the docs suggest you may see a 412, the NotModified in the body of the try/with is actually what happens
| DocDbException (DocDbStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, NotModified }

module Store =
[<NoComparison>]
Expand Down Expand Up @@ -144,6 +136,10 @@ module Store =
{ p: string // "{streamName}"
id: string // "{-1}"

/// When we read, we need to capture the value so we can retain it for caching purposes; when we write, there's no point sending it as it would not be honored
[<JsonProperty(DefaultValueHandling=DefaultValueHandling.Ignore,Required=Required.Default)>]
_etag: string

//w: int64 // 100: window size
/// last index/i value
m: int64 // {index}
Expand All @@ -167,7 +163,7 @@ module Store =
}
static member IdConstant = "-1"
static member Create (pos: Position) eventCount (eds: EventData[]) : IndexEvent =
{ p = pos.streamName; id = IndexEvent.IdConstant; m = pos.IndexRel eventCount
{ p = pos.streamName; id = IndexEvent.IdConstant; m = pos.IndexRel eventCount; _etag = null
c = [| for ed in eds -> { t = ed.eventType; d = ed.data; m = ed.metadata } |] }
and IndexProjection =
{ /// The Event Type, used to drive deserialization
Expand Down Expand Up @@ -217,7 +213,7 @@ module Log =
/// Individual read request for the Index, not found
| IndexNotFound of Measurement
/// Index read with Single RU Request Charge due to correct use of etag in cache
| IndexCached of Measurement
| IndexNotModified of Measurement
/// Summarizes a set of Slices read together
| Batch of Direction * slices: int * Measurement
let prop name value (log : ILogger) = log.ForContext(name, value)
Expand Down Expand Up @@ -320,27 +316,27 @@ module private Read =
let! t, (ru, res : ReadResult<Store.IndexEvent>) = getIndex pos |> Stopwatch.Time
let log count bytes (f : Log.Measurement -> _) = log |> Log.event (f { stream = pos.streamName; interval = t; bytes = bytes; count = count; ru = ru })
match res with
| ReadResult.PreconditionFailed ->
(log 0 0 Log.IndexCached).Information("Eqx {action:l} {ms}ms rc={ru}", "IndexCached", (let e = t.Elapsed in e.TotalMilliseconds), ru)
| ReadResult.NotModified ->
(log 0 0 Log.IndexNotModified).Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru)
| ReadResult.NotFound ->
(log 0 0 Log.IndexNotFound).Information("Eqx {action:l} {ms}ms rc={ru}", "IndexNotFound", (let e = t.Elapsed in e.TotalMilliseconds), ru)
(log 0 0 Log.IndexNotFound).Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru)
| ReadResult.Found doc ->
let log =
let (|EventLen|) (x : Store.IndexProjection) = match x.d, x.m with Log.BlobLen bytes, Log.BlobLen metaBytes -> bytes + metaBytes
let bytes, count = doc.c |> Array.sumBy (|EventLen|), doc.c.Length
log bytes count Log.Index
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propProjectionEvents "Json" doc.c
log.Information("Eqx {action:l} {ms}ms rc={ru}", "Index", (let e = t.Elapsed in e.TotalMilliseconds), ru)
let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propProjectionEvents "Json" doc.c |> Log.prop "etag" doc._etag
log.Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru)
return ru, res }
type [<RequireQualifiedAccess; NoComparison>] IndexResult = Unchanged | NotFound | Found of Store.Position * Store.IndexProjection[]
type [<RequireQualifiedAccess; NoComparison>] IndexResult = NotModified | NotFound | Found of Store.Position * Store.IndexProjection[]
/// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with IndexResult.UnChanged
let loadIndex (log : ILogger) retryPolicy client (pos : Store.Position): Async<IndexResult> = async {
let getIndex = getIndex client
let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGetIndex getIndex pos) log
match res with
| ReadResult.PreconditionFailed -> return IndexResult.Unchanged
| ReadResult.NotModified -> return IndexResult.NotModified
| ReadResult.NotFound -> return IndexResult.NotFound
| ReadResult.Found index -> return IndexResult.Found ({ pos with index = Some index.m }, index.c) }
| ReadResult.Found index -> return IndexResult.Found ({ pos with index = Some index.m; etag=if index._etag=null then None else Some index._etag }, index.c) }

let private getQuery (client : IDocumentClient) (pos:Store.Position) (direction: Direction) batchSize =
let querySpec =
Expand Down Expand Up @@ -505,9 +501,11 @@ module Token =
let ofPreviousStreamVersionAndCompactionEventDataIndex prevStreamVersion compactionEventDataIndex eventsLength batchSize streamVersion' : Storage.StreamToken =
ofCompactionEventNumber (Some (prevStreamVersion + 1L + int64 compactionEventDataIndex)) eventsLength batchSize streamVersion'
let private unpackEqxStreamVersion (x : Storage.StreamToken) = let x : Token = unbox x.value in x.pos.Index
let private unpackEqxETag (x : Storage.StreamToken) = let x : Token = unbox x.value in x.pos.etag
let supersedes current x =
let currentVersion, newVersion = unpackEqxStreamVersion current, unpackEqxStreamVersion x
newVersion > currentVersion
let currentETag, newETag = unpackEqxETag current, unpackEqxETag x
newVersion > currentVersion || currentETag <> newETag

type EqxConnection(client: IDocumentClient, ?readRetryPolicy (*: (int -> Async<'T>) -> Async<'T>*), ?writeRetryPolicy) =
member __.Client = client
Expand Down Expand Up @@ -535,9 +533,9 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) =
let (|IEventDataArray|) events = [| for e in events -> e :> Store.IEventData |]
member private __.InterpretIndexOrFallback log isCompactionEventType pos res: Async<Storage.StreamToken * Store.IEventData[]> = async {
match res with
| Read.IndexResult.NotModified -> return invalidOp "Not handled"
| Read.IndexResult.Found (pos, projectionsAndEvents) when projectionsAndEvents |> Array.exists (fun x -> isCompactionEventType x.t) ->
return Token.ofNonCompacting pos, projectionsAndEvents |> Seq.cast<Store.IEventData> |> Array.ofSeq
| Read.IndexResult.Unchanged -> return invalidOp "Not handled"
| _ ->
let! streamToken, events = __.LoadBackwardsStoppingAtCompactionEvent log isCompactionEventType pos
return streamToken, events |> Seq.cast<Store.IEventData> |> Array.ofSeq }
Expand Down Expand Up @@ -573,7 +571,7 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) =
else
let! res = Read.loadIndex log None(* TODO conn.ReadRetryPolicy*) conn.Client pos
match res with
| Read.IndexResult.Unchanged ->
| Read.IndexResult.NotModified ->
return LoadFromTokenResult.Unchanged
| _ ->
let! loaded = __.InterpretIndexOrFallback log isCompactionEventType.Value pos res
Expand Down Expand Up @@ -803,21 +801,19 @@ module Initialization =
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
if (!docs) throw new Error("docs argument is missing.");
if(index) {
var assignedEtag = null;
if (index) {
function callback(err, doc, options) {
if (err) throw err;
assignedEtag = doc._etag
}
if (-1 == expectedVersion) {
collection.createDocument(collectionLink, index, { disableAutomaticIdGeneration : true}, callback);
} else {
collection.replaceDocument(collection.getAltLink() + "/docs/" + index.id, index, callback);
}
response.setBody({ etag: null, conflicts: null });
} else {
// can also contain { conflicts: [{t, d}] } representing conflicting events since expectedVersion
// null/missing signifies events have been written, with no conflict
response.setBody({ etag: assignedEtag, conflicts: null });
} else {
response.setBody({ etag: null, conflicts: null });
}
for (var i=0; i<docs.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ module SerilogHelpers =
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Backward,_,_) -> EqxAct.BatchBackward
| Equinox.Cosmos.Log.Index _ -> EqxAct.Indexed
| Equinox.Cosmos.Log.IndexNotFound _ -> EqxAct.IndexedNotFound
| Equinox.Cosmos.Log.IndexCached _ -> EqxAct.IndexedCached
| Equinox.Cosmos.Log.IndexNotModified _ -> EqxAct.IndexedCached
let (|EqxEvent|_|) (logEvent : LogEvent) : Equinox.Cosmos.Log.Event option =
logEvent.Properties.Values |> Seq.tryPick (function
| SerilogScalar (:? Equinox.Cosmos.Log.Event as e) -> Some e
Expand Down
14 changes: 9 additions & 5 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ type Tests(testOutputHelper) =
}

let primeIndex = [EqxAct.IndexedNotFound; EqxAct.SliceBackward; EqxAct.BatchBackward]
// When the test gets re-run to simplify, the stream will typically already have values
let primeIndexRerun = [EqxAct.Indexed]

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly using the index and cache to avoid redundant reads`` context skuId cartId = Async.RunSynchronously <| async {
Expand All @@ -291,19 +293,21 @@ type Tests(testOutputHelper) =
do! addAndThenRemoveItemsManyTimes context cartId skuId service1 5
let! _ = service2.Read cartId

// ... should see a single read (with a) we are writes are cached
let! _ = service2.Read cartId
test <@ primeIndex @ [EqxAct.Append; EqxAct.IndexedCached; EqxAct.IndexedCached] = capture.ExternalCalls @>
// ... should see a single Indexed read given writes are cached
test <@ primeIndex @ [EqxAct.Append; EqxAct.Indexed] = capture.ExternalCalls
|| primeIndexRerun @ [EqxAct.Append; EqxAct.Indexed] = capture.ExternalCalls@>

// Add two more - the roundtrip should only incur a single read
// Add two more - the roundtrip should only incur a single read, which should be cached by virtue of being a second one in successono
capture.Clear()
do! addAndThenRemoveItemsManyTimes context cartId skuId service1 1
test <@ [EqxAct.IndexedCached; EqxAct.Append] = capture.ExternalCalls @>

// While we now have 12 events, we should be able to read them with a single call
capture.Clear()
let! _ = service2.Read cartId
test <@ [EqxAct.IndexedCached] = capture.ExternalCalls @>
let! _ = service2.Read cartId
// First read is a re-read, second is cached
test <@ [EqxAct.Indexed;EqxAct.IndexedCached] = capture.ExternalCalls @>
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
Expand Down

0 comments on commit c2886d6

Please sign in to comment.