From ee4768448263e8dcc0a0039f7d076084cf0e5f68 Mon Sep 17 00:00:00 2001 From: Connor Peet Date: Thu, 21 Nov 2019 22:50:03 -0800 Subject: [PATCH] feat: implement wrap, fallback, more documentation --- readme.md | 185 ++++++++++++++++-- ...ulkhead.test.ts => BulkheadPolicy.test.ts} | 0 src/{Bulkhead.ts => BulkheadPolicy.ts} | 7 +- src/CircuitBreakerPolicy.ts | 8 +- src/FallbackPolicy.test.ts | 25 +++ src/FallbackPolicy.ts | 36 ++++ src/Policy.test.ts | 77 ++++++++ src/Policy.ts | 132 +++++++++++-- src/RetryPolicy.test.ts | 21 +- src/RetryPolicy.ts | 47 ++++- src/TimeoutPolicy.test.ts | 12 +- src/TimeoutPolicy.ts | 15 +- src/common/execute.ts | 2 +- src/index.ts | 5 +- 14 files changed, 510 insertions(+), 62 deletions(-) rename src/{Bulkhead.test.ts => BulkheadPolicy.test.ts} (100%) rename src/{Bulkhead.ts => BulkheadPolicy.ts} (89%) create mode 100644 src/FallbackPolicy.test.ts create mode 100644 src/FallbackPolicy.ts create mode 100644 src/Policy.test.ts diff --git a/readme.md b/readme.md index bf658b6..825f415 100644 --- a/readme.md +++ b/readme.md @@ -10,9 +10,16 @@ npm install --save cockatiel ## Contents -Cockatiel is a work-in progress; this serves as a table of contents and progress checklist. +This table lists the API which Cockatiel provides. I recommend reading the [Polly wiki](https://github.com/App-vNext/Polly/wiki) for more information for details and mechanics around the patterns we provide. - [x] Base [Policy](#policy) + - [Policy.handleAll()](#policyhandleAll) + - [Policy.handleType(ctor[, filter])](#policyhandletypector-filter) + - [Policy.handleWhen(filter)](#policyhandlewhen) + - [Policy.handleResultType(ctor[, filter])](#policyhandleresulttypector-filter) + - [Policy.handleResultWhen(filter)](#policyhandleresultwhenfilter) + - [Policy.wrap(policies)](#policywrappolicies) + - [Policy.noop](#policynoop) - [x] [Backoffs](#backoffs) - [ConstantBackoff](#constantConstantBackoffbackoff) - [ExponentialBackoff](#ExponentialBackoff) @@ -26,12 +33,17 @@ Cockatiel is a work-in progress; this serves as a table of contents and progress - [cancellationToken.isCancellationRequested](#cancellationtokeniscancellationrequested) - [cancellationToken.onCancellationRequested(listener)](#cancellationtokenoncancellationrequestedlistener) - [cancellationToken.cancelled()](#cancellationtokencancelledcancellationToken) +- [x] [Events](#events) + - [Event.toPromise(event[, cancellationToken])](#eventtopromiseevent-cancellationtoken) + - [Event.once(event, listener)](#eventonceeventlistener) - [x] [Policy.retry](#policyretry) - [retry.execute(fn)](#retryexecutefn) - [retry.attempts(count)](#retryattemptscount) - [retry.delay(amount)](#retrydelayamount) - [retry.delegate(fn)](#retrydelegatefn) - [retry.backoff(policy)](#retrybackoffpolicy) + - [retry.onRetry(listener)](#retryonretrylistener) + - [retry.onGiveUp(listener)](#retryongiveuplistener) - [x] [Policy.circuitBreaker](#policycircuitbreakeropenafter-breaker) - [ConsecutiveBreaker](#ConsecutiveBreaker) - [SamplingBreaker](#SamplingBreaker) @@ -48,8 +60,9 @@ Cockatiel is a work-in progress; this serves as a table of contents and progress - [bulkhead.onReject(callback)](#bulkheadonrejectcallback) - [bulkhead.executionSlots](#bulkheadexecutionslots) - [bulkhead.queueSlots](#bulkheadqueueslots) -- [ ] Fallback -- [ ] PolicyWrap +- [x] [Policy.fallback](#policyfallbackvalueorfactory) + - [fallback.execute(fn)](#fallbackexecutefn) + - [fallback.onFallback(callback)](#fallbackonfallbackcallback) ## `Policy` @@ -101,8 +114,8 @@ Policy // ... ``` -### `Policy.handleResultWhen(filter])` -### `policy.orWhenResult(filter])` +### `Policy.handleResultWhen(filter)` +### `policy.orWhenResult(filter)` Tells the policy to treat certain return values of the function as errors--retrying if they appear, for instance. Results will be retried the filter function returns true. @@ -113,6 +126,47 @@ Policy // ... ``` +### `Policy.wrap(...policies)` + +Wraps the given set of policies into a single policy. For instance, this: + +```js +const result = await retry.execute(() => + breaker.execute(() => + timeout.execute(({ cancellation }) => getData(cancellation)))) +``` + +Is the equivalent to: + +```js +const result = await Policy + .wrap(retry, breaker, timeout) + .execute(({ cancellation }) => getData(cancellation))); +``` + +The `context` argument passed to the executed function is the merged object of all previous policies. So for instance, in the above example you'll get the cancellation token from the [TimeoutPolicy](#policytimeout) as well as the attempt number from the [RetryPolicy](#policyretry): + +```ts +Policy.wrap(retry, breaker, timeout).execute(context => { + console.log(context); + // => { attempts: 1, cancellation: } +}); +``` + +### `Policy.noop` + +A no-op policy, which may be useful for tests and stubs. + +```ts +import { Policy } from 'cockatiel'; + +const policy = isProduction ? Policy.handleAll().retry().attempts(3) : Policy.noop; + +export async function handleRequest() { + return policy.execute(() => getInfoFromDatabase()); +} +``` + ## Backoffs Backoff algorithms are immutable. They adhere to the interface: @@ -322,7 +376,7 @@ if (token.isCancellationRequested) { ### `cancellationToken.onCancellationRequested(listener)` -Returns whether cancellation has yet been requested. +An [event emitter](#events) that fires when a cancellation is requested. Fires immediately if cancellation has already been requested. Returns a disposable instance. ```ts const listener = token.onCancellationRequested(() => console.log('cancellation requested')) @@ -339,6 +393,53 @@ Returns a promise that resolves once cancellation is requested. You can optional await token.cancelled(); ``` +## Events + +Cockatiel uses a simple bespoke style for events, similar to those that we use in VS Code. These events provide better type-safety (you can never subscribe to the wrong event name) and better functionality around triggering listeners, which we use to implement cancellation tokens. + +An event can be subscribed to simply by passing a callback. Take [`onFallback`](#fallbackonfallbackcallback) for instance: + +```js +const listener = policy.onFallback(error => { + console.log(error); +}); +``` + +The event returns an `IDisposable` instance. To unsubscribe the listener, call `.dispose()` on the returned instance. It's always safe to call an IDisposable's `.dispose()` multiple times. + +```js +listener.dispose() +``` + +We provide a couple extra utilities around events as well. + +### `Event.toPromise(event[, cancellationToken])` + +Returns a promise that resolves once the event fires. Optionally, you can pass in a [CancellationToken](#cancellationtoken) to control when you stop listening, which will reject the promise with a `TaskCancelledError` if it's not already resolved. + +```js +import { Event } from 'cockatiel'; + +async function waitForFallback(policy) { + await Event.toPromise(policy.onFallback); + console.log('a fallback happened!'); +} +``` + +### `Event.once(callback)` + +Waits for the event to fire once, and then automatically unregisters the listener. This method itself returns an `IDisposable`, which you could use to unregister the listener if needed. + +```js +import { Event } from 'cockatiel'; + +async function waitForFallback(policy) { + Event.once(policy.onFallback, () => { + console.log('a fallback happened!'); + }); +} +``` + ## `Policy.retry()` If you know how to use Polly, you already almost know how to use Cockatiel. The `Policy` object is the base builder, and you can get a RetryBuilder off of that by calling `.retry()`. @@ -428,6 +529,33 @@ Policy // ... ``` +### `retry.onRetry(listener)` + +An [event emitter](#events) that fires when we retry a call, before any backoff. It's invoked with an object that includes: + + - the `delay` we're going to wait before retrying, and; + - either a thrown error like `{ error: someError, delay: number }`, or an errorful result in an object like `{ value: someValue, delay: number }` when using [result filtering](#policyhandleresulttypector-filter). + +Useful for telemetry. Returns a dispable instance. + +```js +const listener = retry.onRetry(reason => console.log('retrying a function call:', reason)); + +// ... +listener.dispose(); +``` + +### `retry.onGiveUp(listener)` + +An [event emitter](#events) that fires when we're no longer retrying a call and are giving up. It's invoked with either a thrown error in an object like `{ error: someError }`, or an errorful result in an object like `{ value: someValue }` when using [result filtering](#policyhandleresulttypector-filter). Useful for telemetry. Returns a dispable instance. + +```js +const listener = retry.onGiveUp(reason => console.log('retrying a function call:', reason)); + +// ... +listener.dispose(); +``` + ## `Policy.circuitBreaker(openAfter, breaker)` Circuit breakers stop execution for a period of time after a failure threshold has been reached. This is very useful to allow faulting systems to recover without overloading them. See the [Polly docs](https://github.com/App-vNext/Polly/wiki/Circuit-Breaker#how-the-polly-circuitbreaker-works) for more detailed information around circuit breakers. @@ -523,7 +651,7 @@ if (breaker.state === CircuitState.Open) { ### `breaker.onBreak(callback)` -A method on the circuit breaker that gives you a callback fired when the circuit opens as a result of failures. Returns a disposable instance. +An [event emitter](#events) that fires when the circuit opens as a result of failures. Returns a disposable instance. ```js const listener = breaker.onBreak(() => console.log('circuit is open')); @@ -534,7 +662,7 @@ listener.dispose(); ### `breaker.onReset(callback)` -A method on the circuit breaker that gives you a callback fired when the circuit closes after being broken. Returns a disposable instance. +An [event emitter](#events) that fires when the circuit closes after being broken. Returns a disposable instance. ```js const listener = breaker.onReset(() => console.log('circuit is closed')); @@ -579,7 +707,7 @@ export async function handleRequest() { ### `timeout.execute(fn)` -Executes the given function as configured in the policy. A [ CancellationToken](#cancellationtoken) will be pass to the function, which it should use for aborting operations as needed. +Executes the given function as configured in the policy. A [CancellationToken](#cancellationtoken) will be pass to the function, which it should use for aborting operations as needed. ```ts await timeout.execute(cancellationToken => getInfoFromDatabase(cancellationToken)) @@ -587,7 +715,7 @@ await timeout.execute(cancellationToken => getInfoFromDatabase(cancellationToken ### `timeout.onTimeout(listener)` -A method on the timeout that gives you a callback fired when a timeout is reached. Useful for telemetry. Returns a disposable instance. +An [event emitter](#events) that fires when a timeout is reached. Useful for telemetry. Returns a disposable instance. ```ts const listener = timeout.onTimeout(() => console.log('timeout was reached')); @@ -639,7 +767,7 @@ const data = await bulkhead.execute(() => getInfoFromDatabase()); ### `bulkhead.onReject(callback)` -A method on the bulkhead that gives you a callback fired when a call is rejected. Useful for telemetry. Returns a disposable instance. +An [event emitter](#events) that fires when a call is rejected. Useful for telemetry. Returns a disposable instance. ```js const listener = bulkhead.onReject(() => console.log('bulkhead call was rejected')); @@ -655,3 +783,38 @@ Returns the number of execution slots left in the bulkhead. If either this or `b ### `bulkhead.queueSlots` Returns the number of queue slots left in the bulkhead. If either this or `bulkhead.executionSlots` is greater than zero, the `execute()` will not throw a `BulkheadRejectedError`. + +## `Policy.fallback(valueOrFactory)` + +Creates a policy that returns the `valueOrFactory` if an executed function fails. As the name suggests, `valueOrFactory` either be a value, or a function we'll call when a failure happens to create a value. + +```js +import { Policy } from 'cockatiel'; + +const fallback = Policy + .handleType(DatabaseError) + .fallback(() => getStaleData()); + +export function handleRequest() { + return fallback.execute(() => getInfoFromDatabase()); +} +``` + +### `fallback.execute(fn)` + +Executes the given function. Any _handled_ error or errorful value will be eaten, and instead the fallback value will be returned. + +```js +const result = await fallback.execute(() => getInfoFromDatabase()); +``` + +### `fallback.onFallback(callback)` + +An [event emitter](#events) that fires when a fallback occurs. It's invoked with either a thrown error in an object like `{ error: someError }`, or an errorful result in an object like `{ value: someValue }` when using [result filtering](#policyhandleresulttypector-filter). Useful for telemetry. Returns a disposable instance. + +```js +const listener = bulkhead.onReject(() => console.log('bulkhead call was rejected')); + +// later: +listener.dispose(); +``` diff --git a/src/Bulkhead.test.ts b/src/BulkheadPolicy.test.ts similarity index 100% rename from src/Bulkhead.test.ts rename to src/BulkheadPolicy.test.ts diff --git a/src/Bulkhead.ts b/src/BulkheadPolicy.ts similarity index 89% rename from src/Bulkhead.ts rename to src/BulkheadPolicy.ts index c627f55..2e04e90 100644 --- a/src/Bulkhead.ts +++ b/src/BulkheadPolicy.ts @@ -1,9 +1,10 @@ import { defer } from './common/defer'; import { EventEmitter } from './common/Event'; import { BulkheadRejectedError } from './errors/BulkheadRejectedError'; +import { IPolicy } from './Policy'; interface IQueueItem { - fn(): Promise | T; + fn(context: void): Promise | T; resolve(value: T): void; reject(error: Error): void; } @@ -11,7 +12,7 @@ interface IQueueItem { /** * Bulkhead limits concurrent requests made. */ -export class Bulkhead { +export class BulkheadPolicy implements IPolicy { private active = 0; private queue: Array> = []; private onRejectEmitter = new EventEmitter(); @@ -43,7 +44,7 @@ export class Bulkhead { * @param fn -- Function to execute * @throws a {@link BulkheadRejectedException} if the bulkhead limits are exceeeded */ - public async execute(fn: () => PromiseLike | T): Promise { + public async execute(fn: (context: void) => PromiseLike | T): Promise { if (this.active < this.capacity) { this.active++; try { diff --git a/src/CircuitBreakerPolicy.ts b/src/CircuitBreakerPolicy.ts index 93e80e5..5ca3201 100644 --- a/src/CircuitBreakerPolicy.ts +++ b/src/CircuitBreakerPolicy.ts @@ -3,7 +3,7 @@ import { EventEmitter } from './common/Event'; import { execute, returnOrThrow } from './common/execute'; import { BrokenCircuitError } from './errors/Errors'; import { IsolatedCircuitError } from './errors/IsolatedCircuitError'; -import { FailureReason, IBasePolicyOptions } from './Policy'; +import { FailureReason, IBasePolicyOptions, IPolicy } from './Policy'; export enum CircuitState { /** @@ -40,7 +40,7 @@ type InnerState = | { value: CircuitState.Open; openedAt: number } | { value: CircuitState.HalfOpen; test: Promise }; -export class CircuitBreakerPolicy { +export class CircuitBreakerPolicy implements IPolicy { private readonly breakEmitter = new EventEmitter | { isolated: true }>(); private readonly resetEmitter = new EventEmitter(); private innerLastFailure?: FailureReason; @@ -110,7 +110,7 @@ export class CircuitBreakerPolicy { * open via {@link CircuitBreakerPolicy.isolate} * @returns a Promise that resolves or rejects with the function results. */ - public async execute(fn: () => Promise | T): Promise { + public async execute(fn: (context: void) => PromiseLike | T): Promise { const state = this.innerState; switch (state.value) { case CircuitState.Closed: @@ -146,7 +146,7 @@ export class CircuitBreakerPolicy { } } - private async halfOpen(fn: () => Promise | T): Promise { + private async halfOpen(fn: (context: void) => PromiseLike | T): Promise { try { const result = await execute(this.options, fn); if ('success' in result) { diff --git a/src/FallbackPolicy.test.ts b/src/FallbackPolicy.test.ts new file mode 100644 index 0000000..1cf0c16 --- /dev/null +++ b/src/FallbackPolicy.test.ts @@ -0,0 +1,25 @@ +import { expect } from 'chai'; +import { stub } from 'sinon'; +import { Policy } from './Policy'; + +describe('FallbackPolicy', () => { + it('does not fall back when not necessary', async () => { + const result = await Policy.handleAll() + .fallback('error') + .execute(() => 'ok'); + expect(result).to.equal('ok'); + }); + + it('returns a fallback and emits an error if necessary', async () => { + const policy = await Policy.handleAll().fallback('error'); + const onFallback = stub(); + policy.onFallback(onFallback); + + const error = new Error('oh no!'); + const result = await policy.execute(() => { + throw error; + }); + expect(result).to.equal('error'); + expect(onFallback).calledWith({ error }); + }); +}); diff --git a/src/FallbackPolicy.ts b/src/FallbackPolicy.ts new file mode 100644 index 0000000..a179f2c --- /dev/null +++ b/src/FallbackPolicy.ts @@ -0,0 +1,36 @@ +import { EventEmitter } from './common/Event'; +import { execute } from './common/execute'; +import { FailureReason, IBasePolicyOptions, IPolicy } from './Policy'; + +export class FallbackPolicy + implements IPolicy { + private readonly fallbackEmitter = new EventEmitter>(); + + /** + * Event that fires when a fallback happens. + */ + // tslint:disable-next-line: member-ordering + public readonly onFallback = this.fallbackEmitter.addListener; + + constructor( + private readonly options: IBasePolicyOptions, + private readonly value: () => AltReturn, + ) {} + + /** + * Executes the given function. + * @param fn -- Function to execute. + * @returns The function result or fallback value. + */ + public async execute( + fn: (context: void) => PromiseLike | T, + ): Promise { + const result = await execute(this.options, fn); + if ('success' in result) { + return result.success; + } + + this.fallbackEmitter.emit(result); + return this.value(); + } +} diff --git a/src/Policy.test.ts b/src/Policy.test.ts new file mode 100644 index 0000000..7807a4d --- /dev/null +++ b/src/Policy.test.ts @@ -0,0 +1,77 @@ +import { expect } from 'chai'; +import { stub } from 'sinon'; +import { ConsecutiveBreaker } from './breaker/Breaker'; +import { BrokenCircuitError } from './errors/Errors'; +import { Policy } from './Policy'; + +class MyError1 extends Error {} +class MyError2 extends Error {} +class MyError3 extends Error {} + +describe('Policy', () => { + it('wraps', async () => { + const policy = Policy.wrap( + Policy.handleType(MyError1) + .retry() + .attempts(3), + Policy.handleAll().circuitBreaker(100, new ConsecutiveBreaker(2)), + ); + + // should retry and break the circuit + await expect(policy.execute(stub().throws(new MyError1()))).to.be.rejectedWith( + BrokenCircuitError, + ); + }); + + it('applies error filters', async () => { + const fn = stub() + .onCall(0) + .throws(new MyError1()) + .onCall(1) + .throws(new MyError2()) + .onCall(2) + .throws(new MyError3('foo')) + .onCall(3) + .throws(new Error('potato')) + .onCall(4) + .throws(new MyError3('bar')); + + await expect( + Policy.handleType(MyError1) + .orType(MyError2) + .orType(MyError3, e => e.message === 'foo') + .orWhen(e => e.message === 'potato') + .retry() + .attempts(10) + .execute(fn), + ).to.be.rejectedWith(MyError3, 'bar'); + + expect(fn).to.have.callCount(5); + }); + + it('applies result filters', async () => { + const fn = stub() + .onCall(0) + .returns(new MyError1()) + .onCall(1) + .returns(new MyError2()) + .onCall(2) + .returns(new MyError3('foo')) + .onCall(3) + .returns('potato') + .onCall(4) + .returns('ok!'); + + expect( + await Policy.handleResultType(MyError1) + .orResultType(MyError2) + .orResultType(MyError3, e => e.message === 'foo') + .orWhenResult(e => e === 'potato') + .retry() + .attempts(10) + .execute(fn), + ).to.equal('ok!'); + + expect(fn).to.have.callCount(5); + }); +}); diff --git a/src/Policy.ts b/src/Policy.ts index 1dabff4..c5c8edd 100644 --- a/src/Policy.ts +++ b/src/Policy.ts @@ -1,6 +1,7 @@ import { IBreaker } from './breaker/Breaker'; -import { Bulkhead } from './Bulkhead'; +import { BulkheadPolicy } from './BulkheadPolicy'; import { CircuitBreakerPolicy } from './CircuitBreakerPolicy'; +import { FallbackPolicy } from './FallbackPolicy'; import { RetryPolicy } from './RetryPolicy'; import { TimeoutPolicy, TimeoutStrategy } from './TimeoutPolicy'; @@ -12,27 +13,100 @@ const typeFilter = (cls: Constructor, predicate?: (error: T) => boolean) = const always = () => true; const never = () => false; -export interface IBasePolicyOptions { +export interface IBasePolicyOptions { errorFilter: (error: Error) => boolean; - resultFilter: (result: ReturnType) => boolean; + resultFilter: (result: ReturnConstraint) => boolean; } /** * The reason for a call failure. Either an error, or the a value that was * marked as a failure (when using result filtering). */ -export type FailureReason = { error: Error } | { value: R }; +export type FailureReason = { error: Error } | { value: ReturnType }; + +/** + * IPolicy is the type of all policies that Cockatiel provides. It describes + * an execute() function which takes a generic argument. + */ +export interface IPolicy { + execute( + fn: (context: ContextType) => PromiseLike | T, + ): Promise; +} /** * Factory that builds a base set of filters that can be used in circuit * breakers, retries, etc. */ -export class Policy { +export class Policy { + /** + * A no-op policy, useful for unit tests and stubs. + */ + public static readonly noop: IPolicy = { execute: async fn => fn(undefined) }; + + /** + * Wraps the given set of policies into a single policy. For instance, this: + * + * ```js + * retry.execute(() => + * breaker.execute(() => + * timeout.execute(({ cancellationToken }) => getData(cancellationToken)))) + * ``` + * + * Is the equivalent to: + * + * ```js + * Policy + * .wrap(retry, breaker, timeout) + * .execute(({ cancellationToken }) => getData(cancellationToken))); + * ``` + * + * The `context` argument passed to the executed function is the merged object + * of all previous policies. + * + * @todo I think there may be a TS bug here preventing correct type-safe + * usage without casts: https://github.com/microsoft/TypeScript/issues/35288 + */ + // forgive me, for I have sinned + public static wrap(p1: IPolicy): IPolicy; + public static wrap( + p1: IPolicy, + p2: IPolicy, + ): IPolicy; + public static wrap( + p1: IPolicy, + p2: IPolicy, + p3: IPolicy, + ): IPolicy; + public static wrap( + p1: IPolicy, + p2: IPolicy, + p3: IPolicy, + p4: IPolicy, + ): IPolicy; + public static wrap( + p1: IPolicy, + p2: IPolicy, + p3: IPolicy, + p4: IPolicy, + p5: IPolicy, + ): IPolicy; + public static wrap(...p: Array>): IPolicy; + public static wrap(...p: Array>): IPolicy { + return { + execute(fn: (context: T) => PromiseLike | R): Promise { + const run = (context: any, i: number): R | PromiseLike => + i === p.length ? fn(context) : p[i].execute(next => run({ ...context, ...next }, i + 1)); + return Promise.resolve(run({}, 0)); + }, + }; + } + /** * Creates a bulkhead--a policy that limits the number of concurrent calls. */ public static bulkhead(limit: number, queue: number = 0) { - return new Bulkhead(limit, queue); + return new BulkheadPolicy(limit, queue); } /** @@ -83,7 +157,7 @@ export class Policy { return new TimeoutPolicy(duration, strategy); } - protected constructor(private readonly options: Readonly>) {} + protected constructor(private readonly options: Readonly>) {} /** * Allows the policy to additionally handles errors of the given type. @@ -149,15 +223,15 @@ export class Policy { * .execute(() => getJsonFrom('https://example.com')); * ``` */ - public orWhenResult(predicate: (r: ReturnType) => boolean) { + public orWhenResult(predicate: (r: ReturnConstraint) => boolean) { /** * Bounty on this. Properly, you should also be able to discriminate the - * return types. So if you add a handler like `(result: ReturnType) => - * result is T` where T extends ReturnType, then the policy should then - * say that the 'wrapped' function returns `ReturnType - T`. However, I + * return types. So if you add a handler like `(result: ReturnConstraint) => + * result is T` where T extends ReturnConstraint, then the policy should then + * say that the 'wrapped' function returns `ReturnConstraint - T`. However, I * can't seem to figure out how to get this to work... */ - return new Policy({ + return new Policy({ ...this.options, resultFilter: r => this.options.resultFilter(r) || predicate(r), }); @@ -179,12 +253,12 @@ export class Policy { * .execute(() => getJsonFrom('https://example.com')); * ``` */ - public orResultType( + public orResultType( cls: Constructor, - predicate?: (error: ReturnType) => boolean, + predicate?: (error: T) => boolean, ) { const filter = typeFilter(cls, predicate); - return new Policy({ + return new Policy({ ...this.options, resultFilter: r => this.options.resultFilter(r) || filter(r), }); @@ -230,4 +304,32 @@ export class Policy { halfOpenAfter, }); } + + /** + * Falls back to the given value in the event of an error. + * + * ```ts + * import { Policy } from 'cockatiel'; + * + * const fallback = Policy + * .handleType(DatabaseError) + * .fallback(() => getStaleData()); + * + * export function handleRequest() { + * return fallback.execute(() => getInfoFromDatabase()); + * } + * ``` + * + * @param toValue -- Value to fall back to, or a function that creates the + * value to return (any may return a promise) + */ + public fallback(valueOrFactory: (() => Promise | R) | R) { + return new FallbackPolicy( + this.options, + // not technically type-safe, since if they actually want to _return_ + // a function, that gets lost here. We'll just advice in the docs to + // use a higher-order function if necessary. + (typeof valueOrFactory === 'function' ? valueOrFactory : () => valueOrFactory) as () => R, + ); + } } diff --git a/src/RetryPolicy.test.ts b/src/RetryPolicy.test.ts index 7974616..6329f1a 100644 --- a/src/RetryPolicy.test.ts +++ b/src/RetryPolicy.test.ts @@ -1,5 +1,5 @@ import { expect, use } from 'chai'; -import { SinonStub, stub } from 'sinon'; +import { SinonFakeTimers, SinonStub, stub, useFakeTimers } from 'sinon'; import { Policy } from './Policy'; import { RetryPolicy } from './RetryPolicy'; @@ -32,39 +32,46 @@ describe('RetryPolicy', () => { describe('setting backoffs', () => { let p: RetryPolicy; let s: SinonStub; + let clock: SinonFakeTimers; + let delays: number[]; beforeEach(() => { + clock = useFakeTimers(); p = Policy.handleAll().retry(); + delays = []; + p.onRetry(({ delay }) => { + delays.push(delay); + clock.tick(delay); + }); s = stub().throws(new MyErrorA()); }); + afterEach(() => clock.restore()); + it('sets the retry delay', async () => { - const start = Date.now(); await expect( p .delay(50) .attempts(1) .execute(s), ).to.eventually.be.rejectedWith(MyErrorA); - expect(Date.now() - start).to.be.gte(50); + expect(delays).to.deep.equal([50]); expect(s).to.have.been.calledTwice; }); it('sets the retry sequence', async () => { - const start = Date.now(); await expect(p.delay([10, 20, 20]).execute(s)).to.eventually.be.rejectedWith(MyErrorA); - expect(Date.now() - start).to.be.gte(50); + expect(delays).to.deep.equal([10, 20, 20]); expect(s).to.have.callCount(4); }); it('sets the retry attempts', async () => { - const start = Date.now(); await expect( p .delay([10, 20, 20]) .attempts(1) .execute(s), ).to.eventually.be.rejectedWith(MyErrorA); - expect(Date.now() - start).to.be.gte(10); + expect(delays).to.deep.equal([10]); expect(s).to.have.been.calledTwice; }); }); diff --git a/src/RetryPolicy.ts b/src/RetryPolicy.ts index 97843e5..af00d86 100644 --- a/src/RetryPolicy.ts +++ b/src/RetryPolicy.ts @@ -1,10 +1,11 @@ -import { IBackoff } from './backoff/Backoff'; +import { ExponentialBackoff, IBackoff, IExponentialBackoffOptions } from './backoff/Backoff'; import { CompositeBackoff, CompositeBias } from './backoff/CompositeBackoff'; import { ConstantBackoff } from './backoff/ConstantBackoff'; import { DelegateBackoff, DelegateBackoffFn } from './backoff/DelegateBackoff'; import { IterableBackoff } from './backoff/IterableBackoff'; +import { EventEmitter } from './common/Event'; import { execute } from './common/execute'; -import { FailureReason, IBasePolicyOptions } from './Policy'; +import { FailureReason, IBasePolicyOptions, IPolicy } from './Policy'; const delay = (duration: number) => new Promise(resolve => setTimeout(resolve, duration)); @@ -33,8 +34,24 @@ export interface IRetryPolicyConfig extends IBasePolicyOptions { backoff?: IBackoff>; } -export class RetryPolicy { - constructor(private readonly options: IRetryPolicyConfig) {} +export class RetryPolicy implements IPolicy { + private onRetryEmitter = new EventEmitter & { delay: number }>(); + private onGiveUpEmitter = new EventEmitter>(); + + /** + * Emitter that fires when we retry a call, before any backoff. + * + */ + // tslint:disable-next-line: member-ordering + public readonly onRetry = this.onRetryEmitter.addListener; + + /** + * Emitter that fires when we retry a call. + */ + // tslint:disable-next-line: member-ordering + public readonly onGiveUp = this.onGiveUpEmitter.addListener; + + constructor(private options: IRetryPolicyConfig) {} /** * Sets the number of retry attempts for the function. @@ -62,6 +79,13 @@ export class RetryPolicy { return this.composeBackoff('b', new DelegateBackoff(backoff)); } + /** + * Uses an exponential backoff for retries. + */ + public exponential(options: IExponentialBackoffOptions>) { + return this.composeBackoff('b', new ExponentialBackoff(options)); + } + /** * Sets the baackoff to use for retries. */ @@ -74,7 +98,9 @@ export class RetryPolicy { * @param fn -- Function to run * @returns a Promise that resolves or rejects with the function results. */ - public async execute(fn: (context: IRetryContext) => Promise | T): Promise { + public async execute( + fn: (context: IRetryContext) => PromiseLike | T, + ): Promise { let backoff: IBackoff> | undefined = this.options.backoff || new ConstantBackoff(0, 1); for (let retries = 0; ; retries++) { @@ -84,11 +110,17 @@ export class RetryPolicy { } if (backoff) { - await delay(backoff.duration()); + const delayDuration = backoff.duration(); + const delayPromise = delay(delayDuration); + // A little sneaky reordering here lets us use Sinon's fake timers + // when we get an emission in our tests. + this.onRetryEmitter.emit({ ...result, delay: delayDuration }); + await delayPromise; backoff = backoff.next({ attempt: retries + 1, result }); continue; } + this.onGiveUpEmitter.emit(result); if ('error' in result) { throw result.error; } @@ -102,6 +134,7 @@ export class RetryPolicy { backoff = new CompositeBackoff(bias, this.options.backoff, backoff); } - return new RetryPolicy({ ...this.options, backoff }); + this.options = { ...this.options, backoff }; + return this; } } diff --git a/src/TimeoutPolicy.test.ts b/src/TimeoutPolicy.test.ts index 5540cbe..04bf6a7 100644 --- a/src/TimeoutPolicy.test.ts +++ b/src/TimeoutPolicy.test.ts @@ -16,10 +16,10 @@ describe('TimeoutPolicy', () => { it('properly cooperatively cancels', async () => { const policy = Policy.timeout(2, TimeoutStrategy.Cooperative); expect( - await policy.execute(async ct => { - expect(ct.isCancellationRequested).to.be.false; + await policy.execute(async ({ cancellation }) => { + expect(cancellation.isCancellationRequested).to.be.false; await delay(3); - expect(ct.isCancellationRequested).to.be.true; + expect(cancellation.isCancellationRequested).to.be.true; return 42; }), ).to.equal(42); @@ -29,10 +29,10 @@ describe('TimeoutPolicy', () => { const policy = Policy.timeout(2, TimeoutStrategy.Aggressive); const verified = defer(); await expect( - policy.execute(async ct => { - expect(ct.isCancellationRequested).to.be.false; + policy.execute(async ({ cancellation }) => { + expect(cancellation.isCancellationRequested).to.be.false; await delay(3); - expect(ct.isCancellationRequested).to.be.true; + expect(cancellation.isCancellationRequested).to.be.true; verified.resolve(undefined); return 42; }), diff --git a/src/TimeoutPolicy.ts b/src/TimeoutPolicy.ts index 7c70e8c..03b97b3 100644 --- a/src/TimeoutPolicy.ts +++ b/src/TimeoutPolicy.ts @@ -1,6 +1,7 @@ import { CancellationToken, CancellationTokenSource } from './CancellationToken'; import { EventEmitter } from './common/Event'; import { TaskCancelledError } from './errors/TaskCancelledError'; +import { IPolicy } from './Policy'; export enum TimeoutStrategy { /** @@ -15,7 +16,11 @@ export enum TimeoutStrategy { Aggressive = 'aggressive', } -export class TimeoutPolicy { +export interface ICancellationContext { + cancellation: CancellationToken; +} + +export class TimeoutPolicy implements IPolicy { private readonly timeoutEmitter = new EventEmitter(); /** @@ -31,19 +36,17 @@ export class TimeoutPolicy { * @param fn -- Function to execute. Takes in a nested cancellation token. * @throws a {@link TaskCancelledError} if a timeout occurs */ - public async execute( - fn: (cancellationToken: CancellationToken) => PromiseLike | T, - ): Promise { + public async execute(fn: (context: ICancellationContext) => PromiseLike | T): Promise { const cts = new CancellationTokenSource(); const timer = setTimeout(() => cts.cancel(), this.duration); try { if (this.strategy === TimeoutStrategy.Cooperative) { - return await fn(cts.token); + return await fn({ cancellation: cts.token }); } return Promise.race([ - fn(cts.token), + fn({ cancellation: cts.token }), cts.token.cancellation(cts.token).then(() => { throw new TaskCancelledError(`Operation timed out after ${this.duration}ms`); }), diff --git a/src/common/execute.ts b/src/common/execute.ts index 953767d..edb1db8 100644 --- a/src/common/execute.ts +++ b/src/common/execute.ts @@ -16,7 +16,7 @@ export const returnOrThrow = (failure: FailureOrSuccess) => { export const execute = async ( options: Readonly>, - fn: (...args: T) => Promise | R, + fn: (...args: T) => PromiseLike | R, ...args: T ): Promise> => { try { diff --git a/src/index.ts b/src/index.ts index bfab419..725bfe7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,10 +1,11 @@ +export { Event, EventEmitter } from './common/Event'; export * from './backoff/Backoff'; export * from './breaker/Breaker'; -export * from './Bulkhead'; +export * from './BulkheadPolicy'; export * from './CancellationToken'; export * from './CircuitBreakerPolicy'; -export { Event, EventEmitter } from './common/Event'; export * from './errors/Errors'; +export * from './FallbackPolicy'; export * from './Policy'; export * from './RetryPolicy'; export * from './TimeoutPolicy';