From 48663ca94d9896768920b28f25546eefad689adc Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Wed, 4 Dec 2019 11:55:48 -0800 Subject: [PATCH] WIP --- .../airframe/control/CircuitBreaker.scala | 23 +++++++--- .../airframe/control/HealthCheckPolicy.scala | 42 +++++++++---------- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/airframe-control/src/main/scala/wvlet/airframe/control/CircuitBreaker.scala b/airframe-control/src/main/scala/wvlet/airframe/control/CircuitBreaker.scala index 6bb125020d..8b6b904d7e 100644 --- a/airframe-control/src/main/scala/wvlet/airframe/control/CircuitBreaker.scala +++ b/airframe-control/src/main/scala/wvlet/airframe/control/CircuitBreaker.scala @@ -25,11 +25,14 @@ import scala.util.{Failure, Success, Try} * */ object CircuitBreaker { - sealed trait CircuitBreakerState case object OPEN extends CircuitBreakerState case object HALF_OPEN extends CircuitBreakerState case object CLOSED extends CircuitBreakerState + + case class CircuitBreakerOpenException() + + private[control] def DO_NOTHING: () => Unit = {} } import CircuitBreaker._ @@ -40,8 +43,16 @@ case class CircuitBreaker( healthCheckPolicy: HealthCheckPolicy, resultClassifier: Any => ResultClass = ResultClass.ALWAYS_SUCCEED, errorClassifier: Throwable => ResultClass = ResultClass.ALWAYS_RETRY, - private var state: AtomicReference[CircuitBreakerState] = new AtomicReference[CircuitBreakerState](CLOSED) + fallback: () => Unit = DO_NOTHING, + private val state: AtomicReference[CircuitBreakerState] = + new AtomicReference[CircuitBreakerState](CircuitBreaker.CLOSED) ) { + def withName(name: String): CircuitBreaker = { + this.copy(name = name) + } + def withHealthCheckPolicy(healthCheckPolicy: HealthCheckPolicy): CircuitBreaker = { + this.copy(healthCheckPolicy = healthCheckPolicy) + } def open: this.type = { state.set(OPEN) @@ -60,8 +71,10 @@ case class CircuitBreaker( state.get() == CLOSED } - def run[A](body: => A): A = { - if (isConnected) { + def run[A](body: => A): Unit = { + if (!isConnected) { + fallback() + } else { val result = Try(body) val resultClass = result match { case Success(x) => resultClassifier(x) @@ -82,9 +95,7 @@ case class CircuitBreaker( result.get case Failed(retryable, cause, extraWait) => healthCheckPolicy.recordFailure - } } - } } diff --git a/airframe-control/src/main/scala/wvlet/airframe/control/HealthCheckPolicy.scala b/airframe-control/src/main/scala/wvlet/airframe/control/HealthCheckPolicy.scala index 14fa94cacc..27b813fcef 100644 --- a/airframe-control/src/main/scala/wvlet/airframe/control/HealthCheckPolicy.scala +++ b/airframe-control/src/main/scala/wvlet/airframe/control/HealthCheckPolicy.scala @@ -19,7 +19,6 @@ import java.util.concurrent.atomic.AtomicLong * */ trait HealthCheckPolicy { - def isAlive: Boolean = !isMarkedDead def isMarkedDead: Boolean @@ -40,33 +39,32 @@ trait HealthCheckPolicy { } object HealthCheckPolicy { - /** * A policy for marking the service dead upon consecutive failures */ - def markDeadOnConsecutiveFailures(numFailures: Int): HealthCheckPolicy = new HealthCheckPolicy { - private val consecutiveFailures = new AtomicLong(0L) + def markDeadOnConsecutiveFailures(numFailures: Int): HealthCheckPolicy = + new HealthCheckPolicy { + private val consecutiveFailures = new AtomicLong(0L) - override def isMarkedDead: Boolean = consecutiveFailures.get() >= numFailures + override def isMarkedDead: Boolean = consecutiveFailures.get() >= numFailures - override def recordSuccess: Unit = { - consecutiveFailures.set(0) - } + override def recordSuccess: Unit = { + consecutiveFailures.set(0) + } - /** - * Called when request is failed. - * Returns delay - */ - override def recordFailure: Unit = { - consecutiveFailures.incrementAndGet() - } + /** + * Called when request is failed. + * Returns delay + */ + override def recordFailure: Unit = { + consecutiveFailures.incrementAndGet() + } - /** - * Called when the target service is recovered - */ - override def recovered: Unit = { - consecutiveFailures.set(0) + /** + * Called when the target service is recovered + */ + override def recovered: Unit = { + consecutiveFailures.set(0) + } } - } - }