Skip to content

Commit

Permalink
Add laziness and tests to getAll[Backwards]
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 11, 2018
1 parent 1ccb67e commit 7de1033
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 17 deletions.
63 changes: 57 additions & 6 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,52 @@ module private Tip =
log |> logQuery direction maxItems stream.name t (!responseCount,raws) pos.index ru
return pos, decoded }

let walkLazy<'event> (log : ILogger) client retryPolicy maxItems maxRequests direction (stream: CollectionStream) startPos
(tryDecode : IIndexedEvent -> 'event option, isOrigin: 'event -> bool)
: AsyncSeq<'event[]> = asyncSeq {
let responseCount = ref 0
use query = mkQuery client maxItems stream direction startPos
let pullSlice = handleResponse direction stream startPos
let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query)
let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream.name
let mutable ru = 0.
let allSlices = ResizeArray()
let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
try
let readlog = log |> Log.prop "direction" direction
let mutable ok = true
while ok do
incr responseCount

match maxRequests with
| Some mpbr when !responseCount >= mpbr -> readlog.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded"
| _ -> ()

let batchLog = readlog |> Log.prop "batchIndex" !responseCount
let! (slice,_pos,rus) = retryingLoggingReadSlice query batchLog
ru <- ru + rus
allSlices.AddRange(slice)

let acc = ResizeArray()
for x in slice do
match tryDecode x with
| Some e when isOrigin e ->
let used, residual = slice |> calculateUsedVersusDroppedPayload x.Index
log.Information("EqxCosmos Stop stream={stream} at={index} {case} used={used} residual={residual}",
stream.name, x.Index, x.EventType, used, residual)
ok <- false
acc.Add e
| Some e -> acc.Add e
| None -> ()
yield acc.ToArray()
ok <- ok && query.HasMoreResults
finally
let endTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let t = StopwatchInterval(startTicks, endTicks)

query.Dispose()
log |> logQuery direction maxItems stream.name t (!responseCount,allSlices.ToArray()) -1L ru }

type [<NoComparison>] Token = { stream: CollectionStream; pos: Position }
module Token =
let create stream pos : Storage.StreamToken = { value = box { stream = stream; pos = pos } }
Expand Down Expand Up @@ -676,6 +722,8 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) =
member __.Read log stream direction startPos (tryDecode,isOrigin) : Async<Storage.StreamToken * 'event[]> = async {
let! pos, events = Query.walk log conn.Client conn.QueryRetryPolicy batching.MaxItems batching.MaxRequests direction stream startPos (tryDecode,isOrigin)
return Token.create stream pos, events }
member __.ReadLazy (batching: EqxBatchingPolicy) log stream direction startPos (tryDecode,isOrigin) : AsyncSeq<'event[]> =
Query.walkLazy log conn.Client conn.QueryRetryPolicy batching.MaxItems batching.MaxRequests direction stream startPos (tryDecode,isOrigin)
member __.LoadFromUnfoldsOrRollingSnapshots log (stream,maybePos) (tryDecode,isOrigin): Async<Storage.StreamToken * 'event[]> = async {
let! res = Tip.tryLoad log conn.TipRetryPolicy conn.Client stream maybePos
match res with
Expand Down Expand Up @@ -990,6 +1038,11 @@ type EqxContext

member __.CreateStream(streamName) = collections.CollectionForStream streamName

member internal __.GetLazy((stream, startPos), ?batchSize, ?direction) : AsyncSeq<IIndexedEvent[]> =
let direction = defaultArg direction Direction.Forward
let batching = EqxBatchingPolicy(defaultArg batchSize 10)
gateway.ReadLazy batching logger stream direction startPos (Some,fun _ -> false)

member internal __.GetInternal((stream, startPos), ?maxCount, ?direction) = async {
let direction = defaultArg direction Direction.Forward
if maxCount = Some 0 then
Expand All @@ -1011,10 +1064,8 @@ type EqxContext

/// Reads in batches of `batchSize` from the specified `Position`, allowing the reader to efficiently walk away from a running query
/// ... NB as long as they Dispose!
member __.Walk(stream, batchSize, ?position, ?direction) : AsyncSeq<IIndexedEvent[]> = asyncSeq {
let! _pos,data = __.GetInternal((stream, position), batchSize, ?direction=direction)
// TODO add laziness
return AsyncSeq.ofSeq data }
member __.Walk(stream, batchSize, ?position, ?direction) : AsyncSeq<IIndexedEvent[]> =
__.GetLazy((stream, position), batchSize, ?direction=direction)

/// Reads all Events from a `Position` in a given `direction`
member __.Read(stream, ?position, ?maxCount, ?direction) : Async<Position*IIndexedEvent[]> =
Expand Down Expand Up @@ -1092,8 +1143,8 @@ module Events =
/// reading in batches of the specified size.
/// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest
/// sequence number in the stream.
let getAllBackwards (ctx: EqxContext) (streamName: string) (MaxPosition index: int64) (maxCount: int): AsyncSeq<IIndexedEvent[]> =
ctx.Walk(ctx.CreateStream streamName, maxCount, ?position=index, direction=Direction.Backward)
let getAllBackwards (ctx: EqxContext) (streamName: string) (MaxPosition index: int64) (batchSize: int): AsyncSeq<IIndexedEvent[]> =
ctx.Walk(ctx.CreateStream streamName, batchSize, ?position=index, direction=Direction.Backward)

/// Returns an async array of events in the stream backwards starting from the specified sequence number,
/// number of events to read is specified by batchSize
Expand Down
40 changes: 29 additions & 11 deletions tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,20 @@ type Tests(testOutputHelper) =
[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let getAll (TestStream streamName) = Async.RunSynchronously <| async {
let! conn = connectToSpecifiedCosmosOrSimulator log
let ctx = mkContextWithItemLimit conn (Some 2)
let ctx = mkContextWithItemLimit conn (Some 1)

let! expected = add6EventsIn2Batches ctx streamName
capture.Clear()

let! res = Events.get ctx streamName 1L 4 // Events.getAll >> AsyncSeq.concatSeq |> AsyncSeq.toArrayAsync
let expected = expected |> Array.tail |> Array.take 4
let! res = Events.getAll ctx streamName 0L 1 |> AsyncSeq.concatSeq |> AsyncSeq.takeWhileInclusive (fun _ -> false) |> AsyncSeq.toArrayAsync
let expected = expected |> Array.take 1

verifyCorrectEvents 1L expected res

// TODO [implement and] prove laziness
test <@ List.replicate 2 EqxAct.ResponseForward @ [EqxAct.QueryForward] = capture.ExternalCalls @>
verifyRequestChargesMax 10 // observed 8.99 // was 3
verifyCorrectEvents 0L expected res
test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @>
let queryRoundTripsAndItemCounts = function EqxEvent (Log.Query (Direction.Forward, responses, { count = c })) -> Some (responses,c) | _ -> None
// validate that, despite only requesting max 1 item, we only needed one trip (which contained only one item)
[1,1] =! capture.ChooseCalls queryRoundTripsAndItemCounts
verifyRequestChargesMax 4 // 3.07 // was 3 // 2.94
}

(* Backward *)
Expand All @@ -263,8 +265,24 @@ type Tests(testOutputHelper) =
verifyRequestChargesMax 10 // observed 8.98 // was 3
}

// TODO AsyncSeq version

// TODO 2 batches backward test

// TODO mine other integration tests
[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let getAllBackwards (TestStream streamName) = Async.RunSynchronously <| async {
let! conn = connectToSpecifiedCosmosOrSimulator log
let ctx = mkContextWithItemLimit conn (Some 2)

let! expected = add6EventsIn2Batches ctx streamName
capture.Clear()

let! res = Events.getAllBackwards ctx streamName 10L 2 |> AsyncSeq.concatSeq |> AsyncSeq.takeWhileInclusive (fun x -> x.Index <> 2L) |> AsyncSeq.toArrayAsync
let expected = expected |> Array.skip 2

verifyCorrectEventsBackward 5L expected res
// only 2 batches of 2 items triggered
test <@ List.replicate 2 EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @>
// validate that we didnt trigger loading of the last item
let queryRoundTripsAndItemCounts = function EqxEvent (Log.Query (Direction.Backward, responses, { count = c })) -> Some (responses,c) | _ -> None
[2,4] =! capture.ChooseCalls queryRoundTripsAndItemCounts
verifyRequestChargesMax 7 // observed 6.03 // was 3 // 2.95
}

0 comments on commit 7de1033

Please sign in to comment.