From e55521bf82d2692ffb5ab6f5755f0eab89f3776f Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Mon, 6 Jan 2020 10:29:26 -0800 Subject: [PATCH] airframe-control: Add CircuitBreaker (#763) --- .../airframe/control/CircuitBreaker.scala | 281 ++++++++++++++++++ .../airframe/control/HealthCheckPolicy.scala | 154 ++++++++++ .../scala/wvlet/airframe/control/Ticker.scala | 2 +- .../util/ExponentialMovingAverage.scala | 64 ++++ .../airframe/control/CircuitBreakerTest.scala | 153 ++++++++++ docs/airframe-control.md | 75 +++-- 6 files changed, 709 insertions(+), 20 deletions(-) create mode 100644 airframe-control/src/main/scala/wvlet/airframe/control/CircuitBreaker.scala create mode 100644 airframe-control/src/main/scala/wvlet/airframe/control/HealthCheckPolicy.scala create mode 100644 airframe-control/src/main/scala/wvlet/airframe/control/util/ExponentialMovingAverage.scala create mode 100644 airframe-control/src/test/scala/wvlet/airframe/control/CircuitBreakerTest.scala diff --git a/airframe-control/src/main/scala/wvlet/airframe/control/CircuitBreaker.scala b/airframe-control/src/main/scala/wvlet/airframe/control/CircuitBreaker.scala new file mode 100644 index 0000000000..1d1a01c789 --- /dev/null +++ b/airframe-control/src/main/scala/wvlet/airframe/control/CircuitBreaker.scala @@ -0,0 +1,281 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package wvlet.airframe.control + +import java.util.concurrent.atomic.AtomicReference + +import wvlet.airframe.control.ResultClass.{Failed, Succeeded} +import wvlet.airframe.control.Retry.RetryableFailure + +import scala.util.{Failure, Success, Try} +import wvlet.log.LogSupport +import wvlet.airframe.control.Retry.RetryPolicy +import wvlet.airframe.control.Retry.RetryPolicyConfig +import wvlet.airframe.control.Retry.Jitter + +/** + * An exception thrown when the circuit breaker is open. + */ +case class CircuitBreakerOpenException(context: CircuitBreakerContext) extends Exception + +sealed trait CircuitBreakerState + +/** + * + */ +object CircuitBreaker extends LogSupport { + + case object OPEN extends CircuitBreakerState + case object HALF_OPEN extends CircuitBreakerState + case object CLOSED extends CircuitBreakerState + + def default: CircuitBreaker = new CircuitBreaker() + def newCircuitBreaker(name: String): CircuitBreaker = new CircuitBreaker().withName(name) + + /** + * Create a CircuitBreaker that will be open after observing numFailures out of numExecutions. + */ + def withFailureThreshold(numFailures: Int, numExecutions: Int = 10): CircuitBreaker = { + default.withHealthCheckPolicy(HealthCheckPolicy.markDeadOnFailureThreshold(numFailures, numExecutions)) + } + + /** + * Create a CircuitBreaker that will be open if the failure rate in a time window exceeds the given threshold. + * The failure rate will be decayed exponentially as time goes. + */ + def withFailureRate(failureRate: Double, timeWindowMillis: Int = 60000): CircuitBreaker = { + default.withHealthCheckPolicy(HealthCheckPolicy.markDeadOnRecentFailureRate(failureRate, timeWindowMillis)) + } + + /** + * Create a CircuitBreaker that will be open if the number of consecutive failrues excceeds the given threshold. + * + */ + def withConsecutiveFailures(numFailures: Int): CircuitBreaker = { + default.withHealthCheckPolicy(HealthCheckPolicy.markDeadOnConsecutiveFailures(numFailures)) + } + + private[control] def throwOpenException: CircuitBreakerContext => Unit = { ctx: CircuitBreakerContext => + throw CircuitBreakerOpenException(ctx) + } + + private[control] def reportStateChange = { ctx: CircuitBreakerContext => + info(s"CircuitBreaker(name:${ctx.name}) is changed to ${ctx.state}") + } +} + +import CircuitBreaker._ + +/** + * A safe interface for accessing CircuitBreaker states when handling events. + */ +trait CircuitBreakerContext { + def name: String + def state: CircuitBreakerState + def lastFailure: Option[Throwable] +} + +case class CircuitBreaker( + name: String = "default", + healthCheckPolicy: HealthCheckPolicy = HealthCheckPolicy.markDeadOnConsecutiveFailures(3), + resultClassifier: Any => ResultClass = ResultClass.ALWAYS_SUCCEED, + errorClassifier: Throwable => ResultClass = ResultClass.ALWAYS_RETRY, + onOpenHandler: CircuitBreakerContext => Unit = CircuitBreaker.throwOpenException, + onStateChangeListener: CircuitBreakerContext => Unit = CircuitBreaker.reportStateChange, + delayAfterMarkedDead: RetryPolicy = new Jitter(new RetryPolicyConfig(initialIntervalMillis = 30000)), // 30 seconds + private var nextProvingTimeMillis: Long = Long.MaxValue, + private var provingWaitTimeMillis: Long = 0L, + var lastFailure: Option[Throwable] = None, + private val currentState: AtomicReference[CircuitBreakerState] = new AtomicReference(CircuitBreaker.CLOSED) +) extends CircuitBreakerContext + with LogSupport { + def state: CircuitBreakerState = currentState.get() + + /** + * Set the name of this CircuitBreaker + */ + def withName(newName: String): CircuitBreaker = { + this.copy(name = newName) + } + + /** + * Set a health check policy, which will be used to determine the state of the target service. + */ + def withHealthCheckPolicy(newHealthCheckPolicy: HealthCheckPolicy): CircuitBreaker = { + this.copy(healthCheckPolicy = newHealthCheckPolicy) + } + + /** + * Set a classifier to determine whether the execution result of the code block is successful or not. + */ + def withResultClassifier(newResultClassifier: Any => ResultClass): CircuitBreaker = { + this.copy(resultClassifier = newResultClassifier) + } + + /** + * Set a classifier to determine whether the exception happend in the code block can be ignoreable (Successful) or not for + * the accessing the target service. + */ + def withErrorClassifier(newErrorClassifier: Throwable => ResultClass): CircuitBreaker = { + this.copy(errorClassifier = newErrorClassifier) + } + + /** + * Set a delay policy until moving the state from OPEN to HALF_OPEN (probing) state. + * The default is Jittered-exponential backoff delay with the initial interval of 30 seconds. + */ + def withDelayAfterMarkedDead(retryPolicy: RetryPolicy): CircuitBreaker = { + this.copy(delayAfterMarkedDead = retryPolicy) + } + + /** + * Set an event listner that monitors CircuitBreaker state changes + */ + def onStateChange(listener: CircuitBreakerContext => Unit): CircuitBreaker = { + this.copy(onStateChangeListener = listener) + } + + /** + * Defines the action when trying to use the open circuit. The default + * behavior is to throw CircuitBreakerOpenException + */ + def onOpen(handler: CircuitBreakerContext => Unit): CircuitBreaker = { + this.copy(onOpenHandler = handler) + } + + /** + * Reset the lastFailure and close the circuit + */ + def reset: Unit = { + lastFailure = None + currentState.set(CLOSED) + nextProvingTimeMillis = Long.MaxValue + provingWaitTimeMillis = 0L + healthCheckPolicy.recovered + } + + /** + * Force setting the current state. + */ + def setState(newState: CircuitBreakerState): this.type = { + if (currentState.get() != newState) { + currentState.set(newState) + onStateChangeListener(this) + } + this + } + + def open: this.type = setState(OPEN) + def halfOpen: this.type = setState(HALF_OPEN) + def close: this.type = setState(CLOSED) + + /** + * Returns true when the circuit can execute the code ( OPEN or HALF_OPEN state) + */ + def isConnected: Boolean = { + val s = currentState.get() + s == CLOSED || s == HALF_OPEN + } + + /** + * Note: Use this method only for the standalone mode. Generally, using CircuiteBreaker.run is sufficient. + * + * If the connection is open, perform the specified action. The + * default behavior is fail-fast, i.e., throwing CircuitBreakerOpenException + */ + def verifyConnection: Unit = { + if (!isConnected) { + val currentTime = System.currentTimeMillis() + if (currentTime > nextProvingTimeMillis) { + halfOpen + } else { + onOpenHandler(this) + } + } + } + + /** + * Note: Use this method only for the standalone mode. Generally, using CircuitBreaker.run is sufficient. + * + * This method reports a successful state to the CircuitBreaker. + */ + def recordSuccess: Unit = { + healthCheckPolicy.recordSuccess + val isDead = healthCheckPolicy.isMarkedDead + currentState.get() match { + case HALF_OPEN => + // Probe request succeeds, so move to CLOSED state + healthCheckPolicy.recovered + close + case CLOSED if isDead => + open + case OPEN if !isDead => + // Service is not marked dead, so try proving at HALF_OPEN state + halfOpen + case _ => + } + } + + /** + * Note: Use this method only for the standalone mode. Generally, using CircuitBreaker.run is sufficient. + * + * This method reports a failure state to the CircuitBreaker. + */ + def recordFailure(e: Throwable): Unit = { + lastFailure = Some(e) + healthCheckPolicy.recordFailure + if (healthCheckPolicy.isMarkedDead) { + val baseWaitMillis = provingWaitTimeMillis.max(delayAfterMarkedDead.retryPolicyConfig.initialIntervalMillis).toInt + val nextWaitMillis = delayAfterMarkedDead.nextWait(baseWaitMillis) + provingWaitTimeMillis = delayAfterMarkedDead.updateBaseWait(baseWaitMillis) + nextProvingTimeMillis = System.currentTimeMillis() + nextWaitMillis + open + } + } + + /** + * Execute the body block through the CircuitBreaker. + * + * If the state is OPEN, this will throw CircuitBreakerOpenException (fail-fast). The state will move to HALF_OPEN state + * after a cetain amount of delay, determined by the delayAfterMarkedDead policy. + * + * If the state is HALF_OPEN, this method allows running the code block once, and if the result is successful, + * the state will move to CLOSED. If not, the state will be OPEN again. + * + * If the state is CLOSED, the code block will be executed normally. If the result is marked failure or nonRetryable exception + * is thrown, it will report to the failure to the HealthCheckPolicy. If this policy determins the target service is dead, + * the circuit will shift to OPEN state to block the future execution. + * + */ + def run[A](body: => A): Unit = { + verifyConnection + + val result = Try(body) + val resultClass = result match { + case Success(x) => resultClassifier(x) + case Failure(RetryableFailure(e)) => + ResultClass.retryableFailure(e) + case Failure(e) => + errorClassifier(e) + } + resultClass match { + case Succeeded => + recordSuccess + result.get + case Failed(retryable, cause, extraWait) => + recordFailure(cause) + throw cause + } + } +} diff --git a/airframe-control/src/main/scala/wvlet/airframe/control/HealthCheckPolicy.scala b/airframe-control/src/main/scala/wvlet/airframe/control/HealthCheckPolicy.scala new file mode 100644 index 0000000000..7c77247e09 --- /dev/null +++ b/airframe-control/src/main/scala/wvlet/airframe/control/HealthCheckPolicy.scala @@ -0,0 +1,154 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package wvlet.airframe.control + +import java.util.concurrent.atomic.AtomicLong + +import wvlet.airframe.control.util.ExponentialMovingAverage +import wvlet.log.LogSupport + +/** + * + */ +trait HealthCheckPolicy { + def isAlive: Boolean = !isMarkedDead + def isMarkedDead: Boolean + + /** + * Called when a request succeeds + */ + def recordSuccess: Unit + + /** + * Called when request is failed. + */ + def recordFailure: Unit + + /** + * Called when the target service is recovered + */ + def recovered: Unit +} + +object HealthCheckPolicy extends LogSupport { + + /** + * A policy for marking the service dead upon consecutive failures + */ + def markDeadOnConsecutiveFailures(numFailures: Int): HealthCheckPolicy = + new HealthCheckPolicy { + private val consecutiveFailures = new AtomicLong(0L) + + override def isMarkedDead: Boolean = consecutiveFailures.get() >= numFailures + + override def recordSuccess: Unit = { + consecutiveFailures.set(0) + } + + /** + * Called when request is failed. + * Returns delay + */ + override def recordFailure: Unit = { + consecutiveFailures.incrementAndGet() + } + + /** + * Called when the target service is recovered + */ + override def recovered: Unit = { + consecutiveFailures.set(0) + } + } + + def markDeadOnRecentFailureRate(failureRate: Double, timeWindowMillis: Long): HealthCheckPolicy = + new HealthCheckPolicy { + private val failureRateEMA = new ExponentialMovingAverage(timeWindowMillis) + + override def isMarkedDead: Boolean = { + //logger.warn(s"failure rate: ${failureRateEMA.last}, ${failureRate}") + failureRateEMA.last > failureRate + } + + /** + * Called when a request succeeds + */ + override def recordSuccess: Unit = { + failureRateEMA.update(System.currentTimeMillis(), 0) + } + + /** + * Called when request is failed. + */ + override def recordFailure: Unit = { + failureRateEMA.update(System.currentTimeMillis(), 1) + } + + /** + * Called when the target service is recovered + */ + override def recovered: Unit = { + failureRateEMA.reset() + } + } + + def markDeadOnFailureThreshold(numFailures: Int, numExecutions: Int) = { + require(numExecutions > 0, s"numExecusions ${numExecutions} should be larger than 0") + require( + numFailures <= numExecutions, + s"numFailures ${numFailures} should be less than numExections(${numExecutions})" + ) + + new HealthCheckPolicy { + private val arraySize = (numExecutions + 64 - 1) / 64 + // Circular bit vector of execution hisory 0 (success) or 1 (failure) + private val executionHistory: Array[Long] = Array.fill[Long](arraySize)(0L) + private var executionCount: Long = 0 + + private def failureCount = { + executionHistory.map { java.lang.Long.bitCount(_) }.sum + } + + override def isMarkedDead: Boolean = { + if (executionCount < numExecutions) { + false + } else { + failureCount >= numFailures + } + } + + private def setAndMove(v: Boolean): Unit = { + val i = (executionCount % numExecutions).toInt + val mask = 1L << (63 - i % 64) + if (v == true) { + executionHistory(i / 64) |= mask + } else { + executionHistory(i / 64) &= ~mask + } + executionCount += 1 + if (executionCount < 0) { + // Reset upon overflow + executionCount = numExecutions + } + } + + override def recordSuccess: Unit = setAndMove(false) + override def recordFailure: Unit = setAndMove(true) + + override def recovered: Unit = { + executionCount = 0 + } + } + } +} diff --git a/airframe-control/src/main/scala/wvlet/airframe/control/Ticker.scala b/airframe-control/src/main/scala/wvlet/airframe/control/Ticker.scala index 3e30929336..a9d4003d20 100644 --- a/airframe-control/src/main/scala/wvlet/airframe/control/Ticker.scala +++ b/airframe-control/src/main/scala/wvlet/airframe/control/Ticker.scala @@ -17,7 +17,7 @@ package wvlet.airframe.control * Ticker is for measuring the elapsed time. */ trait Ticker { - // Retrun the number of nanoseconds elapsed + // Return the number of nanoseconds elapsed def read: Long } diff --git a/airframe-control/src/main/scala/wvlet/airframe/control/util/ExponentialMovingAverage.scala b/airframe-control/src/main/scala/wvlet/airframe/control/util/ExponentialMovingAverage.scala new file mode 100644 index 0000000000..8fcff59bf4 --- /dev/null +++ b/airframe-control/src/main/scala/wvlet/airframe/control/util/ExponentialMovingAverage.scala @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package wvlet.airframe.control.util + +/** + * This code is based on com.twitter.finagle.util.Ema. + * + * This class is non-thread safe, so callers of the update method need to manage synchronization. + */ +private[control] class ExponentialMovingAverage(windowSize: Long) { + private[this] var time = Long.MinValue + + // this is volatile to allow read-only calls to `last` + // without needing synchronization. + @volatile private[this] var ema = 0.0 + + /** + * Update the average with observed value `x`, and return the new average. + * + * Since `update` requires monotonic timestamps, it is up to the caller to + * ensure that calls to update do not race. + */ + def update(timeStamp: Long, x: Double): Double = { + if (time == Long.MinValue) { + time = timeStamp + ema = x + x + } else { + val td = timeStamp - time + assert(td >= 0, "Nonmonotonic timestamp") + time = timeStamp + val w = if (windowSize == 0.0) 0.0 else math.exp(-td.toDouble / windowSize) + val newEma = x * (1 - w) + ema * w + ema = newEma + newEma + } + } + + /** + * Return the last observation. + * + * @note This is safe to call without synchronization. + */ + def last: Double = ema + + /** + * Reset the average to 0 and erase all observations. + */ + def reset(): Unit = { + time = Long.MinValue + ema = 0.0 + } +} diff --git a/airframe-control/src/test/scala/wvlet/airframe/control/CircuitBreakerTest.scala b/airframe-control/src/test/scala/wvlet/airframe/control/CircuitBreakerTest.scala new file mode 100644 index 0000000000..b477217c1f --- /dev/null +++ b/airframe-control/src/test/scala/wvlet/airframe/control/CircuitBreakerTest.scala @@ -0,0 +1,153 @@ +package wvlet.airframe.control + +import wvlet.airspec._ +import java.util.concurrent.TimeoutException + +class CircuitBreakerTest extends AirSpec { + + def `support changing states`: Unit = { + val cb = CircuitBreaker.default + cb.state shouldBe CircuitBreaker.CLOSED + cb.isConnected shouldBe true + + cb.open + cb.state shouldBe CircuitBreaker.OPEN + cb.isConnected shouldBe false + + cb.halfOpen + cb.state shouldBe CircuitBreaker.HALF_OPEN + cb.isConnected shouldBe true + + cb.close + cb.state shouldBe CircuitBreaker.CLOSED + cb.isConnected shouldBe true + } + + def `support standalone usage`: Unit = { + val cb = CircuitBreaker.default + + cb.verifyConnection + try { + cb.recordSuccess + } catch { + case e: Throwable => + cb.recordFailure(e) + } + + // Capture the open exception + intercept[CircuitBreakerOpenException] { + cb.open + cb.verifyConnection + } + } + + def `support failure threshold`: Unit = { + val cb = CircuitBreaker.withFailureThreshold(2, 5) + val e = new TimeoutException() + cb.isConnected shouldBe true + + // 0/0 + cb.recordSuccess + cb.isConnected shouldBe true + + // 0/1 + cb.recordSuccess + cb.isConnected shouldBe true + + // 1/2 + cb.recordFailure(e) + cb.isConnected shouldBe true + + // 2/3 + cb.recordFailure(e) + cb.isConnected shouldBe true + + // 2/4 + cb.recordSuccess + cb.isConnected shouldBe false + + // 2/5 -> open the circuit breaker + cb.recordSuccess + cb.isConnected shouldBe false + + // 2/5 + cb.recordSuccess + cb.isConnected shouldBe false + + // 2/5 + cb.recordSuccess + cb.isConnected shouldBe true + + // 1/5 + cb.recordSuccess + cb.isConnected shouldBe true + } + + def `support consecutive failure health checker`: Unit = { + val cb = CircuitBreaker.withConsecutiveFailures(2) + val e = new TimeoutException() + cb.isConnected shouldBe true + + // 1/1 + cb.recordSuccess + cb.isConnected shouldBe true + + // 1/2 + cb.recordFailure(e) + cb.isConnected shouldBe true + + // 1/3 + cb.recordFailure(e) + cb.isConnected shouldBe false + + // Force probing + cb.halfOpen + + // 1/4 + cb.recordSuccess + cb.isConnected shouldBe true + } + + def `support failure rate health checker`: Unit = { + val cb = CircuitBreaker.withFailureRate(0.01, timeWindowMillis = 1000) + val e = new TimeoutException() + cb.isConnected shouldBe true + + // 1/1 + cb.recordSuccess + cb.isConnected shouldBe true + + // 1/2 + Thread.sleep(200) + cb.recordFailure(e) + cb.isConnected shouldBe false + + // 1/3 + Thread.sleep(200) + cb.recordFailure(e) + cb.isConnected shouldBe false + + // Force probing + cb.halfOpen + + // The state should be recovered after the successful request + cb.recordSuccess + cb.isConnected shouldBe true + } + + def `run code with circuit`: Unit = { + val cb = CircuitBreaker.withFailureThreshold(1, 2) + + cb.run {} + + intercept[TimeoutException] { + cb.run { + throw new TimeoutException() + } + } + + intercept[CircuitBreakerOpenException] { + cb.run {} + } + } +} diff --git a/docs/airframe-control.md b/docs/airframe-control.md index cb9a62c1bb..a2393d414f 100644 --- a/docs/airframe-control.md +++ b/docs/airframe-control.md @@ -3,8 +3,10 @@ id: airframe-control title: airframe-control: Retry/Rate Control --- -airframe-control is a library for writing control flow at ease. +airframe-control is a colleciton of libraries to manage control flows, that are especially useul for making remote API calls. +For example, airframe-control has exponential back-off retry, jittering, circuit breaker, parallel task execution support, etc. +- [Source Code at GitHub](https://github.com/wvlet/airframe/tree/master/airframe-control) # Usage @@ -17,7 +19,7 @@ libraryDependencies += "org.wvlet.airframe" %% "airframe-control" % "(version)" ## Control -Loan Pattern (open a resource and close): +This provides a handy Loan Pattern syntax for preperly open and close resources: ```scala import wvlet.airframe.control.Control @@ -39,6 +41,8 @@ Control.withResources( ### Exponential Backoff +Exponential backoff will multiply the waiting time for each retry attempt. The default multiplier is 1.5. For example, if the initial waiting time is 1 second, the next waiting time will be 1 x 1.5 = 1.5 second, and the next waiting time will be 1.5 * 1.5 = 2.25 seconds, and so on. + ```scala import wvlet.airframe.control.Retry import java.util.concurrent.TimeoutException @@ -47,7 +51,8 @@ import java.util.concurrent.TimeoutException val r: String = Retry .withBackOff(maxRetry = 3) - .retryOn { + // Classify the retryable or non-retryable error type. All exceptions will be retried by default. + .retryOn { case e: TimeoutException => Retry.retryableFailure(e) } .run { @@ -61,6 +66,29 @@ val r: String = } ``` +To classify error types within `retryOn` method, use `Retry.retryableFailure(Throwable)` or `Retry.nonRetryableFailure(Throwable)`. + + +### Adding Extra Wait + +```scala +import wvlet.airframe.control.Retry +import java.util.concurrent.TimeoutException + +Retry + .withJitter() + .retryOn { + case e: IllegalArgumentException => + Retry.nonRetryableFailure(e) + case e: TimeoutException => + Retry + .retryableFailure(e) + // Add extra wait millis + .withExtraWaitMillis(50) + } +``` + +### Bounded Time Backoff To decide the number of backoff retries from an expected total wait time, use `withBoundedBackoff`: ```scala @@ -68,14 +96,15 @@ import wvlet.airframe.control.Retry Retry .withBoundedBackoff( - initialIntervalMillis = 1000, + initialIntervalMillis = 1000, maxTotalWaitMillis = 30000 ) ``` - ### Jitter +Jitter is useful to add randomness between the retry intervals especially if there are multiple tasks using the same retry pattern. For example, if the base waiting time is 10 seconds, Jitter will pick a next waiting time between [0, 10] to add some random factor. Then, the base waiting time will be multiplied as in the exponential backoff. This randomness will avoid having multiple API calls that will be retried at the same timing, which often cause resource contention or overload of the target service. With Jittering you can avoid such unexpected correlations between retried requests. + ```scala import wvlet.airframe.control.Retry import java.util.concurrent.TimeoutException @@ -91,27 +120,35 @@ Retry } ``` -### Adding Extra Wait +## CircuitBreaker + +CircuitBreaker is used to avoid excessive calls to a remote service when the service is unavailable, and provides the capability to fail-fast the application so that we can avoid adding an extra waiting time before getting any response from the struggling service. + +CircuitBreaker is useful for: +- Adding a safety around remote API calls +- Protecting the system from too many exceptions of the same type. + + +CircuitBreaker has tree states: CLOSED, OPEN, and HALF_OPEN. + +- __CLOSED__: This is the default state where all executions are allowed. If the target service becomes unhealthy (markedDead), the states will transit to OPEN state. +- __OPEN__: The connection to the target service is broken in this state, and no execution will be allowed. In this state, all executions will throw CircuitBreakerOpenException to perform fail-fast so that we can quickly return the control to the caller. After a certain amount of time is passed specified by delayAfterMarkedDead policy, this state will shift to HALF_OPEN state. +- __HALF_OPEN__: This state will perform a _probing_ to the target service. That means, an execution to the target service is allowed once, and if the request succeeds the state will move to CLOSED state. If the request fails, it will go back to OPEN state again. The delay interval time will be computed by some retry policy. The default delay policy is an exponential backoff (30 seconds initial wait) with jittering. ```scala -import wvlet.airframe.control.Retry -import java.util.concurrent.TimeoutException +import wvlet.airframe.control.CircuitBreaker -Retry - .withJitter() - .retryOn { - case e: IllegalArgumentException => - Retry.nonRetryableFailure(e) - case e: TimeoutException => - Retry - .retryableFailure(e) - // Add extra wait millis - .withExtraWaitMillis(50) +val cb = CircuitBreaker + .withFailureThreshold(3, 10) // Open the circuit when observing 3 failures out of 10 executions + .run { + // body } ``` ## Parallel +Parallel is a library for ensuring using a fixed number of threads (= parallelism) for running tasks. + ```scala import wvlet.airframe.control.Parallel @@ -143,7 +180,7 @@ val result = source.parallel.withParallelism(4).map { i => You can monitor metrics of parallel execution via JMX using [airframe-jmx](https://github.com/wvlet/airframe/tree/master/airframe-jmx). -``` +```scala JMXAgent.defaultAgent.register[Parallel.ParallelExecutionStats](Parallel.jmxStats) ```