Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
xerial committed Dec 12, 2019
1 parent 2109baa commit 48663ca
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import scala.util.{Failure, Success, Try}
*
*/
object CircuitBreaker {

sealed trait CircuitBreakerState
case object OPEN extends CircuitBreakerState
case object HALF_OPEN extends CircuitBreakerState
case object CLOSED extends CircuitBreakerState

case class CircuitBreakerOpenException()

private[control] def DO_NOTHING: () => Unit = {}
}

import CircuitBreaker._
Expand All @@ -40,8 +43,16 @@ case class CircuitBreaker(
healthCheckPolicy: HealthCheckPolicy,
resultClassifier: Any => ResultClass = ResultClass.ALWAYS_SUCCEED,
errorClassifier: Throwable => ResultClass = ResultClass.ALWAYS_RETRY,
private var state: AtomicReference[CircuitBreakerState] = new AtomicReference[CircuitBreakerState](CLOSED)
fallback: () => Unit = DO_NOTHING,
private val state: AtomicReference[CircuitBreakerState] =
new AtomicReference[CircuitBreakerState](CircuitBreaker.CLOSED)
) {
def withName(name: String): CircuitBreaker = {
this.copy(name = name)
}
def withHealthCheckPolicy(healthCheckPolicy: HealthCheckPolicy): CircuitBreaker = {
this.copy(healthCheckPolicy = healthCheckPolicy)
}

def open: this.type = {
state.set(OPEN)
Expand All @@ -60,8 +71,10 @@ case class CircuitBreaker(
state.get() == CLOSED
}

def run[A](body: => A): A = {
if (isConnected) {
def run[A](body: => A): Unit = {
if (!isConnected) {
fallback()
} else {
val result = Try(body)
val resultClass = result match {
case Success(x) => resultClassifier(x)
Expand All @@ -82,9 +95,7 @@ case class CircuitBreaker(
result.get
case Failed(retryable, cause, extraWait) =>
healthCheckPolicy.recordFailure

}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import java.util.concurrent.atomic.AtomicLong
*
*/
trait HealthCheckPolicy {

def isAlive: Boolean = !isMarkedDead
def isMarkedDead: Boolean

Expand All @@ -40,33 +39,32 @@ trait HealthCheckPolicy {
}

object HealthCheckPolicy {

/**
* A policy for marking the service dead upon consecutive failures
*/
def markDeadOnConsecutiveFailures(numFailures: Int): HealthCheckPolicy = new HealthCheckPolicy {
private val consecutiveFailures = new AtomicLong(0L)
def markDeadOnConsecutiveFailures(numFailures: Int): HealthCheckPolicy =
new HealthCheckPolicy {
private val consecutiveFailures = new AtomicLong(0L)

override def isMarkedDead: Boolean = consecutiveFailures.get() >= numFailures
override def isMarkedDead: Boolean = consecutiveFailures.get() >= numFailures

override def recordSuccess: Unit = {
consecutiveFailures.set(0)
}
override def recordSuccess: Unit = {
consecutiveFailures.set(0)
}

/**
* Called when request is failed.
* Returns delay
*/
override def recordFailure: Unit = {
consecutiveFailures.incrementAndGet()
}
/**
* Called when request is failed.
* Returns delay
*/
override def recordFailure: Unit = {
consecutiveFailures.incrementAndGet()
}

/**
* Called when the target service is recovered
*/
override def recovered: Unit = {
consecutiveFailures.set(0)
/**
* Called when the target service is recovered
*/
override def recovered: Unit = {
consecutiveFailures.set(0)
}
}
}

}

0 comments on commit 48663ca

Please sign in to comment.