Skip to content

Commit

Permalink
add more logic
Browse files Browse the repository at this point in the history
  • Loading branch information
xerial committed Dec 24, 2019
1 parent 3c7d2f1 commit 59d5a24
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ object CircuitBreaker {
case object HALF_OPEN extends CircuitBreakerState
case object CLOSED extends CircuitBreakerState

case class CircuitBreakerOpenException()

private[control] def DO_NOTHING: () => Unit = {}
case class CircuitBreakerOpenException() extends Exception
}

import CircuitBreaker._
Expand All @@ -43,7 +41,6 @@ case class CircuitBreaker(
healthCheckPolicy: HealthCheckPolicy,
resultClassifier: Any => ResultClass = ResultClass.ALWAYS_SUCCEED,
errorClassifier: Throwable => ResultClass = ResultClass.ALWAYS_RETRY,
fallback: () => Unit = DO_NOTHING,
private val state: AtomicReference[CircuitBreakerState] =
new AtomicReference[CircuitBreakerState](CircuitBreaker.CLOSED)
) {
Expand Down Expand Up @@ -71,9 +68,9 @@ case class CircuitBreaker(
state.get() == CLOSED
}

def run[A](body: => A): Unit = {
def run[A](body: => A): A = {
if (!isConnected) {
fallback()
throw CircuitBreakerOpenException()
} else {
val result = Try(body)
val resultClass = result match {
Expand All @@ -85,16 +82,17 @@ case class CircuitBreaker(
}
resultClass match {
case Succeeded =>
healthCheckPolicy.recovered
state.get() match {
case HALF_OPEN | CLOSED =>
open
case OPEN | HALF_OPEN =>
healthCheckPolicy.recovered
close
case _ =>
}
healthCheckPolicy.recordSuccess
result.get
case Failed(retryable, cause, extraWait) =>
healthCheckPolicy.recordFailure
throw cause
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package wvlet.airframe.control

import java.util.concurrent.atomic.AtomicLong

import wvlet.airframe.control.util.ExponentialMovingAverage

/**
*
*/
Expand Down Expand Up @@ -67,4 +69,34 @@ object HealthCheckPolicy {
consecutiveFailures.set(0)
}
}

def markDeadOnRecentFailureRate(failureRate: Double, timeWindowMillis: Long): HealthCheckPolicy =
new HealthCheckPolicy {
private val failureRateEMA = new ExponentialMovingAverage(timeWindowMillis)

override def isMarkedDead: Boolean = {
failureRateEMA.last > failureRate
}

/**
* Called when a request succeeds
*/
override def recordSuccess: Unit = {
failureRateEMA.update(System.currentTimeMillis(), 0)
}

/**
* Called when request is failed.
*/
override def recordFailure: Unit = {
failureRateEMA.update(System.currentTimeMillis(), 1)
}

/**
* Called when the target service is recovered
*/
override def recovered: Unit = {
failureRateEMA.reset()
}
}
}

0 comments on commit 59d5a24

Please sign in to comment.