Skip to content

Commit

Permalink
feat: implement wrap, fallback, more documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
connor4312 committed Nov 22, 2019
1 parent 443d055 commit ee47684
Show file tree
Hide file tree
Showing 14 changed files with 510 additions and 62 deletions.
185 changes: 174 additions & 11 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,16 @@ npm install --save cockatiel

## Contents

Cockatiel is a work-in progress; this serves as a table of contents and progress checklist.
This table lists the API which Cockatiel provides. I recommend reading the [Polly wiki](https://github.com/App-vNext/Polly/wiki) for more information for details and mechanics around the patterns we provide.

- [x] Base [Policy](#policy)
- [Policy.handleAll()](#policyhandleAll)
- [Policy.handleType(ctor[, filter])](#policyhandletypector-filter)
- [Policy.handleWhen(filter)](#policyhandlewhen)
- [Policy.handleResultType(ctor[, filter])](#policyhandleresulttypector-filter)
- [Policy.handleResultWhen(filter)](#policyhandleresultwhenfilter)
- [Policy.wrap(policies)](#policywrappolicies)
- [Policy.noop](#policynoop)
- [x] [Backoffs](#backoffs)
- [ConstantBackoff](#constantConstantBackoffbackoff)
- [ExponentialBackoff](#ExponentialBackoff)
Expand All @@ -26,12 +33,17 @@ Cockatiel is a work-in progress; this serves as a table of contents and progress
- [cancellationToken.isCancellationRequested](#cancellationtokeniscancellationrequested)
- [cancellationToken.onCancellationRequested(listener)](#cancellationtokenoncancellationrequestedlistener)
- [cancellationToken.cancelled()](#cancellationtokencancelledcancellationToken)
- [x] [Events](#events)
- [Event.toPromise(event[, cancellationToken])](#eventtopromiseevent-cancellationtoken)
- [Event.once(event, listener)](#eventonceeventlistener)
- [x] [Policy.retry](#policyretry)
- [retry.execute(fn)](#retryexecutefn)
- [retry.attempts(count)](#retryattemptscount)
- [retry.delay(amount)](#retrydelayamount)
- [retry.delegate(fn)](#retrydelegatefn)
- [retry.backoff(policy)](#retrybackoffpolicy)
- [retry.onRetry(listener)](#retryonretrylistener)
- [retry.onGiveUp(listener)](#retryongiveuplistener)
- [x] [Policy.circuitBreaker](#policycircuitbreakeropenafter-breaker)
- [ConsecutiveBreaker](#ConsecutiveBreaker)
- [SamplingBreaker](#SamplingBreaker)
Expand All @@ -48,8 +60,9 @@ Cockatiel is a work-in progress; this serves as a table of contents and progress
- [bulkhead.onReject(callback)](#bulkheadonrejectcallback)
- [bulkhead.executionSlots](#bulkheadexecutionslots)
- [bulkhead.queueSlots](#bulkheadqueueslots)
- [ ] Fallback
- [ ] PolicyWrap
- [x] [Policy.fallback](#policyfallbackvalueorfactory)
- [fallback.execute(fn)](#fallbackexecutefn)
- [fallback.onFallback(callback)](#fallbackonfallbackcallback)

## `Policy`

Expand Down Expand Up @@ -101,8 +114,8 @@ Policy
// ...
```

### `Policy.handleResultWhen(filter])`
### `policy.orWhenResult(filter])`
### `Policy.handleResultWhen(filter)`
### `policy.orWhenResult(filter)`

Tells the policy to treat certain return values of the function as errors--retrying if they appear, for instance. Results will be retried the filter function returns true.

Expand All @@ -113,6 +126,47 @@ Policy
// ...
```

### `Policy.wrap(...policies)`

Wraps the given set of policies into a single policy. For instance, this:

```js
const result = await retry.execute(() =>
breaker.execute(() =>
timeout.execute(({ cancellation }) => getData(cancellation))))
```

Is the equivalent to:

```js
const result = await Policy
.wrap(retry, breaker, timeout)
.execute(({ cancellation }) => getData(cancellation)));
```

The `context` argument passed to the executed function is the merged object of all previous policies. So for instance, in the above example you'll get the cancellation token from the [TimeoutPolicy](#policytimeout) as well as the attempt number from the [RetryPolicy](#policyretry):

```ts
Policy.wrap(retry, breaker, timeout).execute(context => {
console.log(context);
// => { attempts: 1, cancellation: }
});
```

### `Policy.noop`

A no-op policy, which may be useful for tests and stubs.

```ts
import { Policy } from 'cockatiel';

const policy = isProduction ? Policy.handleAll().retry().attempts(3) : Policy.noop;

export async function handleRequest() {
return policy.execute(() => getInfoFromDatabase());
}
```

## Backoffs

Backoff algorithms are immutable. They adhere to the interface:
Expand Down Expand Up @@ -322,7 +376,7 @@ if (token.isCancellationRequested) {
### `cancellationToken.onCancellationRequested(listener)`
Returns whether cancellation has yet been requested.
An [event emitter](#events) that fires when a cancellation is requested. Fires immediately if cancellation has already been requested. Returns a disposable instance.
```ts
const listener = token.onCancellationRequested(() => console.log('cancellation requested'))
Expand All @@ -339,6 +393,53 @@ Returns a promise that resolves once cancellation is requested. You can optional
await token.cancelled();
```
## Events
Cockatiel uses a simple bespoke style for events, similar to those that we use in VS Code. These events provide better type-safety (you can never subscribe to the wrong event name) and better functionality around triggering listeners, which we use to implement cancellation tokens.
An event can be subscribed to simply by passing a callback. Take [`onFallback`](#fallbackonfallbackcallback) for instance:
```js
const listener = policy.onFallback(error => {
console.log(error);
});
```
The event returns an `IDisposable` instance. To unsubscribe the listener, call `.dispose()` on the returned instance. It's always safe to call an IDisposable's `.dispose()` multiple times.
```js
listener.dispose()
```
We provide a couple extra utilities around events as well.
### `Event.toPromise(event[, cancellationToken])`
Returns a promise that resolves once the event fires. Optionally, you can pass in a [CancellationToken](#cancellationtoken) to control when you stop listening, which will reject the promise with a `TaskCancelledError` if it's not already resolved.
```js
import { Event } from 'cockatiel';

async function waitForFallback(policy) {
await Event.toPromise(policy.onFallback);
console.log('a fallback happened!');
}
```
### `Event.once(callback)`
Waits for the event to fire once, and then automatically unregisters the listener. This method itself returns an `IDisposable`, which you could use to unregister the listener if needed.
```js
import { Event } from 'cockatiel';

async function waitForFallback(policy) {
Event.once(policy.onFallback, () => {
console.log('a fallback happened!');
});
}
```
## `Policy.retry()`
If you know how to use Polly, you already almost know how to use Cockatiel. The `Policy` object is the base builder, and you can get a RetryBuilder off of that by calling `.retry()`.
Expand Down Expand Up @@ -428,6 +529,33 @@ Policy
// ...
```
### `retry.onRetry(listener)`
An [event emitter](#events) that fires when we retry a call, before any backoff. It's invoked with an object that includes:
- the `delay` we're going to wait before retrying, and;
- either a thrown error like `{ error: someError, delay: number }`, or an errorful result in an object like `{ value: someValue, delay: number }` when using [result filtering](#policyhandleresulttypector-filter).
Useful for telemetry. Returns a dispable instance.
```js
const listener = retry.onRetry(reason => console.log('retrying a function call:', reason));

// ...
listener.dispose();
```
### `retry.onGiveUp(listener)`
An [event emitter](#events) that fires when we're no longer retrying a call and are giving up. It's invoked with either a thrown error in an object like `{ error: someError }`, or an errorful result in an object like `{ value: someValue }` when using [result filtering](#policyhandleresulttypector-filter). Useful for telemetry. Returns a dispable instance.
```js
const listener = retry.onGiveUp(reason => console.log('retrying a function call:', reason));

// ...
listener.dispose();
```
## `Policy.circuitBreaker(openAfter, breaker)`
Circuit breakers stop execution for a period of time after a failure threshold has been reached. This is very useful to allow faulting systems to recover without overloading them. See the [Polly docs](https://github.com/App-vNext/Polly/wiki/Circuit-Breaker#how-the-polly-circuitbreaker-works) for more detailed information around circuit breakers.
Expand Down Expand Up @@ -523,7 +651,7 @@ if (breaker.state === CircuitState.Open) {
### `breaker.onBreak(callback)`
A method on the circuit breaker that gives you a callback fired when the circuit opens as a result of failures. Returns a disposable instance.
An [event emitter](#events) that fires when the circuit opens as a result of failures. Returns a disposable instance.
```js
const listener = breaker.onBreak(() => console.log('circuit is open'));
Expand All @@ -534,7 +662,7 @@ listener.dispose();
### `breaker.onReset(callback)`
A method on the circuit breaker that gives you a callback fired when the circuit closes after being broken. Returns a disposable instance.
An [event emitter](#events) that fires when the circuit closes after being broken. Returns a disposable instance.
```js
const listener = breaker.onReset(() => console.log('circuit is closed'));
Expand Down Expand Up @@ -579,15 +707,15 @@ export async function handleRequest() {
### `timeout.execute(fn)`
Executes the given function as configured in the policy. A [ CancellationToken](#cancellationtoken) will be pass to the function, which it should use for aborting operations as needed.
Executes the given function as configured in the policy. A [CancellationToken](#cancellationtoken) will be pass to the function, which it should use for aborting operations as needed.
```ts
await timeout.execute(cancellationToken => getInfoFromDatabase(cancellationToken))
```
### `timeout.onTimeout(listener)`
A method on the timeout that gives you a callback fired when a timeout is reached. Useful for telemetry. Returns a disposable instance.
An [event emitter](#events) that fires when a timeout is reached. Useful for telemetry. Returns a disposable instance.
```ts
const listener = timeout.onTimeout(() => console.log('timeout was reached'));
Expand Down Expand Up @@ -639,7 +767,7 @@ const data = await bulkhead.execute(() => getInfoFromDatabase());
### `bulkhead.onReject(callback)`
A method on the bulkhead that gives you a callback fired when a call is rejected. Useful for telemetry. Returns a disposable instance.
An [event emitter](#events) that fires when a call is rejected. Useful for telemetry. Returns a disposable instance.
```js
const listener = bulkhead.onReject(() => console.log('bulkhead call was rejected'));
Expand All @@ -655,3 +783,38 @@ Returns the number of execution slots left in the bulkhead. If either this or `b
### `bulkhead.queueSlots`
Returns the number of queue slots left in the bulkhead. If either this or `bulkhead.executionSlots` is greater than zero, the `execute()` will not throw a `BulkheadRejectedError`.
## `Policy.fallback(valueOrFactory)`
Creates a policy that returns the `valueOrFactory` if an executed function fails. As the name suggests, `valueOrFactory` either be a value, or a function we'll call when a failure happens to create a value.
```js
import { Policy } from 'cockatiel';

const fallback = Policy
.handleType(DatabaseError)
.fallback(() => getStaleData());

export function handleRequest() {
return fallback.execute(() => getInfoFromDatabase());
}
```
### `fallback.execute(fn)`
Executes the given function. Any _handled_ error or errorful value will be eaten, and instead the fallback value will be returned.
```js
const result = await fallback.execute(() => getInfoFromDatabase());
```
### `fallback.onFallback(callback)`
An [event emitter](#events) that fires when a fallback occurs. It's invoked with either a thrown error in an object like `{ error: someError }`, or an errorful result in an object like `{ value: someValue }` when using [result filtering](#policyhandleresulttypector-filter). Useful for telemetry. Returns a disposable instance.
```js
const listener = bulkhead.onReject(() => console.log('bulkhead call was rejected'));

// later:
listener.dispose();
```
File renamed without changes.
7 changes: 4 additions & 3 deletions src/Bulkhead.ts → src/BulkheadPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { defer } from './common/defer';
import { EventEmitter } from './common/Event';
import { BulkheadRejectedError } from './errors/BulkheadRejectedError';
import { IPolicy } from './Policy';

interface IQueueItem<T> {
fn(): Promise<T> | T;
fn(context: void): Promise<T> | T;
resolve(value: T): void;
reject(error: Error): void;
}

/**
* Bulkhead limits concurrent requests made.
*/
export class Bulkhead {
export class BulkheadPolicy implements IPolicy<void> {
private active = 0;
private queue: Array<IQueueItem<unknown>> = [];
private onRejectEmitter = new EventEmitter<void>();
Expand Down Expand Up @@ -43,7 +44,7 @@ export class Bulkhead {
* @param fn -- Function to execute
* @throws a {@link BulkheadRejectedException} if the bulkhead limits are exceeeded
*/
public async execute<T>(fn: () => PromiseLike<T> | T): Promise<T> {
public async execute<T>(fn: (context: void) => PromiseLike<T> | T): Promise<T> {
if (this.active < this.capacity) {
this.active++;
try {
Expand Down
8 changes: 4 additions & 4 deletions src/CircuitBreakerPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { EventEmitter } from './common/Event';
import { execute, returnOrThrow } from './common/execute';
import { BrokenCircuitError } from './errors/Errors';
import { IsolatedCircuitError } from './errors/IsolatedCircuitError';
import { FailureReason, IBasePolicyOptions } from './Policy';
import { FailureReason, IBasePolicyOptions, IPolicy } from './Policy';

export enum CircuitState {
/**
Expand Down Expand Up @@ -40,7 +40,7 @@ type InnerState =
| { value: CircuitState.Open; openedAt: number }
| { value: CircuitState.HalfOpen; test: Promise<any> };

export class CircuitBreakerPolicy<R> {
export class CircuitBreakerPolicy<R> implements IPolicy<void, R> {
private readonly breakEmitter = new EventEmitter<FailureReason<R> | { isolated: true }>();
private readonly resetEmitter = new EventEmitter<void>();
private innerLastFailure?: FailureReason<R>;
Expand Down Expand Up @@ -110,7 +110,7 @@ export class CircuitBreakerPolicy<R> {
* open via {@link CircuitBreakerPolicy.isolate}
* @returns a Promise that resolves or rejects with the function results.
*/
public async execute<T extends R>(fn: () => Promise<T> | T): Promise<T> {
public async execute<T extends R>(fn: (context: void) => PromiseLike<T> | T): Promise<T> {
const state = this.innerState;
switch (state.value) {
case CircuitState.Closed:
Expand Down Expand Up @@ -146,7 +146,7 @@ export class CircuitBreakerPolicy<R> {
}
}

private async halfOpen<T extends R>(fn: () => Promise<T> | T): Promise<T> {
private async halfOpen<T extends R>(fn: (context: void) => PromiseLike<T> | T): Promise<T> {
try {
const result = await execute(this.options, fn);
if ('success' in result) {
Expand Down
25 changes: 25 additions & 0 deletions src/FallbackPolicy.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { expect } from 'chai';
import { stub } from 'sinon';
import { Policy } from './Policy';

describe('FallbackPolicy', () => {
it('does not fall back when not necessary', async () => {
const result = await Policy.handleAll()
.fallback('error')
.execute(() => 'ok');
expect(result).to.equal('ok');
});

it('returns a fallback and emits an error if necessary', async () => {
const policy = await Policy.handleAll().fallback('error');
const onFallback = stub();
policy.onFallback(onFallback);

const error = new Error('oh no!');
const result = await policy.execute(() => {
throw error;
});
expect(result).to.equal('error');
expect(onFallback).calledWith({ error });
});
});
Loading

0 comments on commit ee47684

Please sign in to comment.