Skip to content

Commit

Permalink
Remove hardwired backoffs
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 26, 2018
1 parent 49cadb1 commit c6f20cc
Showing 1 changed file with 16 additions and 23 deletions.
39 changes: 16 additions & 23 deletions src/Equinox/Equinox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ type ICategory<'event, 'state> =
-> events: 'event list * state: 'state
-> Async<Storage.SyncResult<'state>>

/// Defines a hook enabling retry and backoff policies to be specified
type IRetryPolicy = abstract member Execute: log: ILogger * attemptNumber: int * call: Async<'T> -> Async<'T>

// Exception yielded by Handler.Decide after `count` attempts have yielded conflicts at the point of syncing with the Store
exception FlowAttemptsExceededException of count: int
exception MaxResyncsExhaustedException of count: int

/// Internal implementation of the Store agnostic load + run/render. See Handler for App-facing APIs.
module private Flow =
Expand All @@ -92,25 +95,14 @@ module private Flow =
member __.State = snd __.Memento
member __.CreateContext(): Context<'event, 'state> =
Context<'event, 'state>(fold, __.State)
member __.TryOrResync attempt (log : ILogger) eventsAndState =
member __.TryOrResync (retryPolicy : IRetryPolicy) (attemptNumber: int) (log : ILogger) eventsAndState =
let resyncInPreparationForRetry resync = async {
// According to https://github.com/EventStore/EventStore/issues/1652, backoffs should not be necessary for EventStore
// as the fact we use a Master connection to read Resync data should make it unnecessary
// However, empirically, the backoffs are needed in app code and hence need to live here for now
// TODO: make each store inject backoffs iff necessary
// See https://github.com/jet/equinox/issues/38
if attempt <> 1 then
match Backoff.defaultExponentialBoundedRandomized (attempt-2) with
| None -> ()
| Some ms ->
log.Information("Resync backoff for {Ms}", ms)
do! Async.Sleep ms
let! streamState' = resync
let! streamState' = retryPolicy.Execute(log, attemptNumber, resync)
tokenAndState <- streamState'
return false }
tryOr log eventsAndState resyncInPreparationForRetry
member __.TryOrThrow log eventsAndState attempt =
let throw _ = async { return raise <| FlowAttemptsExceededException attempt }
let throw _ = async { return raise <| MaxResyncsExhaustedException attempt }
tryOr log eventsAndState throw |> Async.Ignore

/// Obtain a representation of the current state and metadata from the underlying storage stream
Expand All @@ -124,7 +116,7 @@ module private Flow =
/// 2a. if no changes required, exit with known state
/// 2b. if saved without conflict, exit with updated state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
let run (sync : SyncState<'event, 'state>) (maxSyncAttempts : int) (log : ILogger) (decide : Context<'event, 'state> -> Async<'result * 'event list>)
let run (sync : SyncState<'event, 'state>) (retryPolicy: IRetryPolicy, maxSyncAttempts : int) (log : ILogger) (decide : Context<'event, 'state> -> Async<'result * 'event list>)
: Async<'result> =
if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")
/// Run a decision cycle - decide what events should be appended given the presented state
Expand All @@ -140,7 +132,7 @@ module private Flow =
do! sync.TryOrThrow log (events, ctx.State) attempt
return outcome
else
let! committed = sync.TryOrResync attempt log (events, ctx.State)
let! committed = sync.TryOrResync retryPolicy attempt log (events, ctx.State)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
Expand All @@ -151,10 +143,10 @@ module private Flow =

/// Internal implementation providing a handler not associated with a specific log or stream
/// Not 'just' making it public; the plan is to have Stream.Handler represent the public interface until further significant patterns present
type HandlerImpl<'event, 'state>(fold, maxAttempts) =
type HandlerImpl<'event, 'state>(fold, maxAttempts, retryPolicy) =
let execAsync stream log f = async { let! syncState = load fold log stream in return! f syncState }
let exec stream log f = execAsync stream log <| fun syncState -> async { return f syncState }
let runFlow stream log decideAsync = execAsync stream log <| fun syncState -> async { return! run syncState maxAttempts log decideAsync }
let runFlow stream log decideAsync = execAsync stream log <| fun syncState -> async { return! run syncState (retryPolicy,maxAttempts) log decideAsync }

member __.Decide(stream : IStream<'event, 'state>, log : ILogger, flow: Context<'event, 'state> -> 'result) : Async<'result> =
runFlow stream log <| fun ctx -> async { let result = flow ctx in return result, ctx.Accumulated }
Expand Down Expand Up @@ -187,15 +179,16 @@ module Stream =
let ofMemento (memento : Storage.StreamToken * 'state) (x : IStream<_,_>) : IStream<'event, 'state> = InitializedStream(x, memento) :> _

/// Core Application-facing API. Wraps the handling of decision or query flow in a manner that is store agnostic
type Handler<'event, 'state>(fold, log, stream : IStream<'event, 'state>, maxAttempts : int) =
let inner = Flow.HandlerImpl<'event, 'state>(fold, maxAttempts)
type Handler<'event, 'state>(fold, log, stream : IStream<'event, 'state>, maxAttempts : int, ?retryPolicy) =
let retryPolicy = defaultArg retryPolicy ({ new IRetryPolicy with member __.Execute(_log, _attemptNumber, f) = async { return! f } })
let inner = Flow.HandlerImpl<'event, 'state>(fold, maxAttempts, retryPolicy)

/// 0. Invoke the supplied `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome.
/// Tries up to `maxAttempts` times in the case of a conflict, throwing FlowAttemptsExceededException` to signal failure.
/// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure.
member __.Decide(flow : Context<'event, 'state> -> 'result) : Async<'result> =
inner.Decide(stream,log,flow)
/// 0. Invoke the supplied _Async_ `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome
/// Tries up to `maxAttempts` times in the case of a conflict, throwing FlowAttemptsExceededException` to signal failure.
/// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure.
member __.DecideAsync(flowAsync : Context<'event, 'state> -> Async<'result>) : Async<'result> =
inner.DecideAsync(stream,log,flowAsync)
/// Low Level helper to allow one to obtain the complete state of a stream (including the position) in order to pass it within the application
Expand Down

0 comments on commit c6f20cc

Please sign in to comment.