Skip to content

Commit

Permalink
feat: circuit status now contains a rolling window (#34)
Browse files Browse the repository at this point in the history
* feat: circuit status now contains a rolling window

The rolling stats window is configurable in both total time sampled, and
how many snapshots, or buckets, within that time frame.

The new options that can be applied to a circuit breaker are
`rollingCountTimeout` and `rollingCountBuckets`, which default to 10000
and 10, respectively. So by default, the window is a statistical view
over the last 10 seconds, consisting of 10 one second snapshots.

The current circuit breaker status api has not been modified directly,
however the expected results are different. For example, you can still
do this:

```js
console.log(`Failure count: ${circuit.status.failures}`);
```

But that count will consist only of the number of failures within the
current snapshot.

To obtain stats for the entire window, use the `window` property.

```js
const stats = circuit.status.window;
```

This will give you an array containing the statistical sampling for the
entire window. So, given the defaults noted above, by default this will
be a ten element array, with each element containing an object with
sample data that looks something like this:

```js
{
  failures: 11,
  fallbacks: 9,
  successes: 3491,
  rejects: 2,
  fires: 3493,
  timeouts: 0,
  start: 1488999002013
}
```

* feat: add status listeners

Users can add a listener to a circuit's status object, which gets called
with the most recent status each time a new snapshot is created. Allows
users to maintain cumulative stats if they want to.

* chore: move increment function out of Status ctor

* bug: circuit should emit 'fire' even on cache hit.

* (feat) add isCircuitBreakerOpen property to stats

* chore: remove some redundant code
  • Loading branch information
lance authored Mar 30, 2017
1 parent 3657436 commit 05c0a2f
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 45 deletions.
33 changes: 20 additions & 13 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ const CACHE = new WeakMap();
* @param options.rollingCountTimeout Sets the duration of the statistical
* rolling window, in milliseconds. This is how long Opossum keeps metrics for
* the circuit breaker to use and for publishing. Default: 10000
* @param options.rollingCountBuckets sets the number of buckets the rolling
* statistical window is divided into. So, if options.rollingCountTimeout is
* 10000, and options.rollingCountBuckets is 10, then the statistical window
* will be 1000 1 second snapshots in the statistical window. Default: 10
* @fires CircuitBreaker#halfOpen
*/
class CircuitBreaker extends EventEmitter {
constructor (action, options) {
super();
this.options = options;
this.options.rollingCountTimeout = options.rollingCountTimeout || 10000;
this.options.rollingCountBuckets = options.rollingCountBuckets || 10;
this.Promise = options.Promise;
this[STATUS] = new Status(this);
this[STATE] = CLOSED;
Expand Down Expand Up @@ -179,29 +184,31 @@ class CircuitBreaker extends EventEmitter {
* @fires CircuitBreaker#timeout
*/
fire () {
const args = Array.prototype.slice.call(arguments);

/**
* Emitted when the circuit breaker action is executed
* @event CircuitBreaker#fire
*/
this.emit('fire', args);

if (CACHE.get(this) !== undefined) {
/**
* Emitted when the circuit breaker is using the cache
* @event CircuitBreaker#cacheHits
* and finds a value.
* @event CircuitBreaker#cacheHit
*/
this.emit('cacheHits');
this.emit('cacheHit');
return CACHE.get(this);
} else if (this.options.cache) {
/**
* Emitted when the circuit breaker is not using the cache but
* the cache option is enabled.
* @event CircuitBreaker#cacheHits
* Emitted when the circuit breaker does not find a value in
* the cache, but the cache option is enabled.
* @event CircuitBreaker#cacheMiss
*/
this.emit('cacheMisses');
this.emit('cacheMiss');
}

const args = Array.prototype.slice.call(arguments);
/**
* Emitted when the circuit breaker action is executed
* @event CircuitBreaker#fire
*/
this.emit('fire', args);

if (this.opened || (this.halfOpen && this[PENDING_CLOSE])) {
/**
* Emitted when the circuit breaker is open and failing fast
Expand Down
160 changes: 134 additions & 26 deletions lib/status.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
'use strict';

const CIRCUIT_BREAKER = Symbol('circuit-breaker');
const CIRCUIT_OPEN = Symbol('circuit-open');
const STATS_WINDOW = Symbol('stats-window');
const LISTENERS = Symbol('listeners');
const FIRES = Symbol('fires');
const FAILS = Symbol('fails');

/**
* @class
Expand All @@ -9,55 +14,158 @@ const CIRCUIT_BREAKER = Symbol('circuit-breaker');
*/
class Status {
constructor (circuit) {
reset(this);
this[LISTENERS] = new Set();
this[CIRCUIT_BREAKER] = circuit;
circuit.on('success', () => this.successes++);
circuit.on('failure', () => this.failures++);
circuit.on('fallback', () => this.fallbacks++);
circuit.on('timeout', () => this.timeouts++);
circuit.on('fire', () => this.fires++);
circuit.on('reject', () => this.rejects++);
circuit.on('cacheHits', () => this.cacheHits++);
circuit.on('cacheMisses', () => this.cacheMisses++);
const interval = setInterval(
() => reset(this), circuit.options.rollingCountTimeout);
this[STATS_WINDOW] = [];
this[FIRES] = 0;
this[FAILS] = 0;
this[CIRCUIT_OPEN] = false;

// Keep total numbers for fires/failures
circuit.on('fire', () => this[FIRES]++);
circuit.on('failure', () => this[FAILS]++);

// Keep track of circuit open state
circuit.on('open', () => {
this[CIRCUIT_OPEN] = true;
this[STATS_WINDOW][0].isCircuitBreakerOpen = true;
});
circuit.on('close', () => {
this[CIRCUIT_OPEN] = false;
this[STATS_WINDOW][0].isCircuitBreakerOpen = false;
});

circuit.on('success', increment(this, 'successes'));
circuit.on('failure', increment(this, 'failures'));
circuit.on('fallback', increment(this, 'fallbacks'));
circuit.on('timeout', increment(this, 'timeouts'));
circuit.on('fire', increment(this, 'fires'));
circuit.on('reject', increment(this, 'rejects'));
circuit.on('cacheHit', increment(this, 'cacheHits'));
circuit.on('cacheMiss', increment(this, 'cacheMisses'));

// Set up our statistical rolling window
const buckets = circuit.options.rollingCountBuckets;
const timeout = circuit.options.rollingCountTimeout;

// Add the first bucket to the window
this[STATS_WINDOW].unshift(stats(this));

// TODO: do we guard against divide by zero, and for
// greater accuracy, do we require that timeout be
// evenly divisible by the number of buckets?
const bucketInterval = Math.floor(timeout / buckets);
const interval = setInterval(() => {
const window = this[STATS_WINDOW];
if (window.length === buckets) {
window.pop();
}
let next = stats(this);
window.unshift(next);
for (const listener of this[LISTENERS]) {
listener.call(listener, window[1]);
}
}, bucketInterval);
if (typeof interval.unref === 'function') interval.unref();
}
}

function reset (status) {
/**
* The number of times the breaker's action has failed
* Add a status listener which will be called with the most
* recently completed snapshot each time a new one is created.
* @param {any} listener
*/
status.failures = 0;
addSnapshotListener (listener) {
this[LISTENERS].add(listener);
}

/**
* The number of times a fallback function has been executed
* Gets the full stats window as an array of objects.
*/
status.fallbacks = 0;
get window () {
return this[STATS_WINDOW].slice();
}

/**
* The number of times the action for this breaker executed successfully
* during the current statistical window.
*/
get successes () {
return this[STATS_WINDOW][0].successes;
}

/**
* The number of times the breaker's action has failed
* during the current statistical window.
*/
status.successes = 0;
get failures () {
return this[STATS_WINDOW][0].failures;
}

/**
* The number of times this breaker been rejected because it was fired, but in the open state.
* The number of times a fallback function has been executed
* during the current statistical window.
*/
status.rejects = 0;
get fallbacks () {
return this[STATS_WINDOW][0].fallbacks;
}

/**
* The number of times during the current statistical window that
* this breaker been rejected because it was in the open state.
*/
get rejects () {
return this[STATS_WINDOW][0].rejects;
}

/**
* The number of times this circuit breaker has been fired
* during the current statistical window.
*/
status.fires = 0;
get fires () {
return this[STATS_WINDOW][0].fires;
}

/**
* The number of times this circuit breaker has timed out
* during the current statistical window.
*/
status.timeouts = 0;
get timeouts () {
return this[STATS_WINDOW][0].timeouts;
}

/**
* The number of the cache hits
* The number of times this circuit breaker has retrieved
* a value from the cache instead. If the circuit does not use
* caching, then this value will always be 0.
*/
status.cacheHits = 0;
get cacheHits () {
return this[STATS_WINDOW][0].cacheHits;
}

/**
* The number of the cache misses
* The number of times this circuit breaker has looked in the
* cache and found nothing. If the circuit does not use caching then
* this value will always be 0.
*/
status.cacheMisses = 0;
get cacheMisses () {
return this[STATS_WINDOW][0].cacheMisses;
}
}

const increment =
(status, property) => () => status[STATS_WINDOW][0][property]++;

const stats = (circuit) => ({
isCircuitBreakerOpen: circuit[CIRCUIT_OPEN],
failures: 0,
fallbacks: 0,
successes: 0,
rejects: 0,
fires: 0,
timeouts: 0,
cacheHits: 0,
cacheMisses: 0,
start: Date.now()
});

module.exports = exports = Status;
57 changes: 51 additions & 6 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ test('Passes parameters to the circuit function', (t) => {
});

test('Using cache', (t) => {
t.plan(7);
t.plan(9);
const expected = 34;
const options = {
cache: true
Expand All @@ -69,16 +69,18 @@ test('Using cache', (t) => {

breaker.fire(expected)
.then((arg) => {
t.equals(breaker.status.cacheHits, 0);
t.equals(breaker.status.cacheMisses, 1);
t.equals(breaker.status.cacheHits, 0, 'does not hit the cache');
t.equals(breaker.status.cacheMisses, 1, 'emits a cacheMiss');
t.equals(breaker.status.fires, 1, 'fired once');
t.equals(arg, expected, `cache hits:misses ${breaker.status.cacheHits}:${breaker.status.cacheMisses}`);
})
.catch(t.fail)
.then(() => {
breaker.fire(expected)
.then((arg) => {
t.equals(breaker.status.cacheHits, 1);
t.equals(breaker.status.cacheMisses, 1);
t.equals(breaker.status.cacheHits, 1, 'hit the cache');
t.equals(breaker.status.cacheMisses, 1, 'did not emit miss');
t.equals(breaker.status.fires, 2, 'fired twice');
t.equals(arg, expected, `cache hits:misses ${breaker.status.cacheHits}:${breaker.status.cacheMisses}`);
breaker.clearCache();
})
Expand Down Expand Up @@ -192,6 +194,15 @@ test('Breaker resets after a configurable amount of time', (t) => {
});
});

test('Breaker status reflects open state', (t) => {
t.plan(1);
const breaker = cb(passFail, {maxFailures: 0, resetTimeout: 100});
breaker.fire(-1)
.then(t.fail)
.catch(() => t.ok(breaker.status.window[0].isCircuitBreakerOpen))
.then(t.end);
});

test('Breaker resets for circuits with a fallback function', (t) => {
t.plan(2);
const fails = -1;
Expand Down Expand Up @@ -330,7 +341,8 @@ test('CircuitBreaker status', (t) => {
});

test('CircuitBreaker rolling counts', (t) => {
const breaker = cb(passFail, { rollingCountTimeout: 100 });
const opts = { rollingCountTimeout: 1000, rollingCountBuckets: 10 };
const breaker = cb(passFail, opts);
const deepEqual = (t, expected) => (actual) => t.deepEqual(actual, expected, 'expected status values');
Fidelity.all([
breaker.fire(10).then(deepEqual(t, 10)),
Expand All @@ -341,12 +353,45 @@ test('CircuitBreaker rolling counts', (t) => {
t.deepEqual(breaker.status.successes, 3, 'breaker succeeded 3 times'))
.then(() => {
setTimeout(() => {
const window = breaker.status.window;
t.ok(window.length > 1);
t.equal(window[window.length - 1].fires, 3, 'breaker stats are rolling');
t.deepEqual(breaker.status.successes, 0, 'breaker reset stats');
t.end();
}, 100);
});
});

test('CircuitBreaker status listeners', (t) => {
// 100ms snapshot intervals should ensure that event stats
// will be scattered across > 1 snapshot
const opts = { rollingCountTimeout: 2500, rollingCountBuckets: 25 };
const breaker = cb(passFail, opts);

const results = {
successes: 0,
fires: 0
};
breaker.status.addSnapshotListener((snapshot) => {
t.ok(snapshot.successes !== undefined, 'has successes stat');
t.ok(snapshot.fires !== undefined, 'has fires stat');
t.ok(snapshot.failures !== undefined, 'has failures stat');
t.ok(snapshot.fallbacks !== undefined, 'has fallbacks stat');
t.ok(snapshot.rejects !== undefined, 'has rejects stat');
t.ok(snapshot.timeouts !== undefined, 'has timeouts stat');

results.successes += snapshot.successes;
results.fires += snapshot.fires;
});
breaker.fire(10)
.then(() => breaker.fire(10))
.then(() => breaker.fire(10))
.then(() => breaker.fire(10))
.then(() => breaker.fire(10))
.then(() => t.equal(results.fires, 5) && t.equal(results.successes, 5))
.then(t.end);
});

test('CircuitBreaker fallback event', (t) => {
t.plan(1);
const breaker = cb(passFail, {maxFailures: 0});
Expand Down

0 comments on commit 05c0a2f

Please sign in to comment.