From 838e3322975746475c7d909053b358b606b568bf Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 26 Nov 2018 21:34:38 +0000 Subject: [PATCH] Remove hardwired backoffs (#51) --- src/Equinox/Equinox.fs | 39 +++++++++------------ src/Equinox/Equinox.fsproj | 1 - src/Equinox/Infrastructure.fs | 64 ----------------------------------- 3 files changed, 16 insertions(+), 88 deletions(-) delete mode 100644 src/Equinox/Infrastructure.fs diff --git a/src/Equinox/Equinox.fs b/src/Equinox/Equinox.fs index 8d7ae5119..91e84dc5a 100644 --- a/src/Equinox/Equinox.fs +++ b/src/Equinox/Equinox.fs @@ -69,8 +69,11 @@ type ICategory<'event, 'state> = -> events: 'event list * state: 'state -> Async> +/// Defines a hook enabling retry and backoff policies to be specified +type IRetryPolicy = abstract member Execute: log: ILogger * attemptNumber: int * call: Async<'T> -> Async<'T> + // Exception yielded by Handler.Decide after `count` attempts have yielded conflicts at the point of syncing with the Store -exception FlowAttemptsExceededException of count: int +exception MaxResyncsExhaustedException of count: int /// Internal implementation of the Store agnostic load + run/render. See Handler for App-facing APIs. module private Flow = @@ -92,25 +95,14 @@ module private Flow = member __.State = snd __.Memento member __.CreateContext(): Context<'event, 'state> = Context<'event, 'state>(fold, __.State) - member __.TryOrResync attempt (log : ILogger) eventsAndState = + member __.TryOrResync (retryPolicy : IRetryPolicy) (attemptNumber: int) (log : ILogger) eventsAndState = let resyncInPreparationForRetry resync = async { - // According to https://github.com/EventStore/EventStore/issues/1652, backoffs should not be necessary for EventStore - // as the fact we use a Master connection to read Resync data should make it unnecessary - // However, empirically, the backoffs are needed in app code and hence need to live here for now - // TODO: make each store inject backoffs iff necessary - // See https://github.com/jet/equinox/issues/38 - if attempt <> 1 then - match Backoff.defaultExponentialBoundedRandomized (attempt-2) with - | None -> () - | Some ms -> - log.Information("Resync backoff for {Ms}", ms) - do! Async.Sleep ms - let! streamState' = resync + let! streamState' = retryPolicy.Execute(log, attemptNumber, resync) tokenAndState <- streamState' return false } tryOr log eventsAndState resyncInPreparationForRetry member __.TryOrThrow log eventsAndState attempt = - let throw _ = async { return raise <| FlowAttemptsExceededException attempt } + let throw _ = async { return raise <| MaxResyncsExhaustedException attempt } tryOr log eventsAndState throw |> Async.Ignore /// Obtain a representation of the current state and metadata from the underlying storage stream @@ -124,7 +116,7 @@ module private Flow = /// 2a. if no changes required, exit with known state /// 2b. if saved without conflict, exit with updated state /// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state - let run (sync : SyncState<'event, 'state>) (maxSyncAttempts : int) (log : ILogger) (decide : Context<'event, 'state> -> Async<'result * 'event list>) + let run (sync : SyncState<'event, 'state>) (retryPolicy: IRetryPolicy, maxSyncAttempts : int) (log : ILogger) (decide : Context<'event, 'state> -> Async<'result * 'event list>) : Async<'result> = if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1") /// Run a decision cycle - decide what events should be appended given the presented state @@ -140,7 +132,7 @@ module private Flow = do! sync.TryOrThrow log (events, ctx.State) attempt return outcome else - let! committed = sync.TryOrResync attempt log (events, ctx.State) + let! committed = sync.TryOrResync retryPolicy attempt log (events, ctx.State) if not committed then log.Debug "Resyncing and retrying" return! loop (attempt + 1) @@ -151,10 +143,10 @@ module private Flow = /// Internal implementation providing a handler not associated with a specific log or stream /// Not 'just' making it public; the plan is to have Stream.Handler represent the public interface until further significant patterns present - type HandlerImpl<'event, 'state>(fold, maxAttempts) = + type HandlerImpl<'event, 'state>(fold, maxAttempts, retryPolicy) = let execAsync stream log f = async { let! syncState = load fold log stream in return! f syncState } let exec stream log f = execAsync stream log <| fun syncState -> async { return f syncState } - let runFlow stream log decideAsync = execAsync stream log <| fun syncState -> async { return! run syncState maxAttempts log decideAsync } + let runFlow stream log decideAsync = execAsync stream log <| fun syncState -> async { return! run syncState (retryPolicy,maxAttempts) log decideAsync } member __.Decide(stream : IStream<'event, 'state>, log : ILogger, flow: Context<'event, 'state> -> 'result) : Async<'result> = runFlow stream log <| fun ctx -> async { let result = flow ctx in return result, ctx.Accumulated } @@ -187,15 +179,16 @@ module Stream = let ofMemento (memento : Storage.StreamToken * 'state) (x : IStream<_,_>) : IStream<'event, 'state> = InitializedStream(x, memento) :> _ /// Core Application-facing API. Wraps the handling of decision or query flow in a manner that is store agnostic -type Handler<'event, 'state>(fold, log, stream : IStream<'event, 'state>, maxAttempts : int) = - let inner = Flow.HandlerImpl<'event, 'state>(fold, maxAttempts) +type Handler<'event, 'state>(fold, log, stream : IStream<'event, 'state>, maxAttempts : int, ?retryPolicy) = + let retryPolicy = defaultArg retryPolicy ({ new IRetryPolicy with member __.Execute(_log, _attemptNumber, f) = async { return! f } }) + let inner = Flow.HandlerImpl<'event, 'state>(fold, maxAttempts, retryPolicy) /// 0. Invoke the supplied `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome. - /// Tries up to `maxAttempts` times in the case of a conflict, throwing FlowAttemptsExceededException` to signal failure. + /// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure. member __.Decide(flow : Context<'event, 'state> -> 'result) : Async<'result> = inner.Decide(stream,log,flow) /// 0. Invoke the supplied _Async_ `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome - /// Tries up to `maxAttempts` times in the case of a conflict, throwing FlowAttemptsExceededException` to signal failure. + /// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure. member __.DecideAsync(flowAsync : Context<'event, 'state> -> Async<'result>) : Async<'result> = inner.DecideAsync(stream,log,flowAsync) /// Low Level helper to allow one to obtain the complete state of a stream (including the position) in order to pass it within the application diff --git a/src/Equinox/Equinox.fsproj b/src/Equinox/Equinox.fsproj index be044951b..8c4e7987c 100644 --- a/src/Equinox/Equinox.fsproj +++ b/src/Equinox/Equinox.fsproj @@ -10,7 +10,6 @@ - diff --git a/src/Equinox/Infrastructure.fs b/src/Equinox/Infrastructure.fs deleted file mode 100644 index 8d35cfd89..000000000 --- a/src/Equinox/Infrastructure.fs +++ /dev/null @@ -1,64 +0,0 @@ -namespace Equinox - -/// A backoff strategy. -/// Accepts the attempt number and returns an interval in milliseconds to wait. -/// If None then backoff should stop. -type Backoff = int -> int option - -module private Backoff = - - let private checkOverflow x = - if x = System.Int32.MinValue then 2000000000 - else x - - /// Modifies the interval. - let map (f:int -> int) (b:Backoff) : Backoff = - fun i -> - match b i with - | Some x -> f x |> checkOverflow |> Some - | None -> None - - /// Bounds the interval. - let bound mx = map (min mx) - - /// Creates a back-off strategy which increases the interval exponentially. - let exp (initialIntervalMs:int) (multiplier:float) : Backoff = - fun i -> (float initialIntervalMs) * (pown multiplier i) |> int |> checkOverflow |> Some - - /// Randomizes the output produced by a back-off strategy: - /// randomizedInterval = retryInterval * (random in range [1 - randomizationFactor, 1 + randomizationFactor]) - let rand (randomizationFactor:float) = - let rand = new System.Random() - let maxRand,minRand = (1.0 + randomizationFactor), (1.0 - randomizationFactor) - map (fun x -> (float x) * (rand.NextDouble() * (maxRand - minRand) + minRand) |> int) - - // ------------------------------------------------------------------------------------------------------------------------ - // defaults - - /// 500ms - let [] DefaultInitialIntervalMs = 500 - - /// 60000ms - let [] DefaultMaxIntervalMs = 60000 - - /// 0.5 - let [] DefaultRandomizationFactor = 0.5 - - /// 1.5 - let [] DefaultMultiplier = 1.5 - - /// The default exponential and randomized back-off strategy with a provided initial interval. - /// DefaultMaxIntervalMs = 60,000 - /// DefaultRandomizationFactor = 0.5 - /// DefaultMultiplier = 1.5 - let defaultExponentialBoundedRandomizedOf initialInternal = - exp initialInternal DefaultMultiplier - |> rand DefaultRandomizationFactor - |> bound DefaultMaxIntervalMs - - /// The default exponential and randomized back-off strategy. - /// DefaultInitialIntervalMs = 500 - /// DefaultMaxIntervalMs = 60,000 - /// DefaultRandomizationFactor = 0.5 - /// DefaultMultiplier = 1.5 - let defaultExponentialBoundedRandomized = defaultExponentialBoundedRandomizedOf DefaultInitialIntervalMs \ No newline at end of file