Skip to content

Commit

Permalink
feat(rstream): add reset option for timeout()
Browse files Browse the repository at this point in the history
- add tests
- update docs
  • Loading branch information
postspectacular committed Jul 20, 2018
1 parent 7b10e0c commit cd751fb
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 49 deletions.
65 changes: 45 additions & 20 deletions packages/rstream/src/subs/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,57 @@ import { State } from "../api"
import { Subscription } from "../subscription";

/**
* A subscription that emits an error object after a given time.
* A subscription that emits an arbitrary error object after a given
* time. If no `error` is given, uses a new `Error` instance by default.
* If `resetTimeout` is false (default), the error is emitted regardless
* of any received values in the meantime. However, if `true`, the
* timeout resets with each received value and then only triggers once
* the time interval since the last value has exceeded.
*
* @param timeoutMs Timeout value in milliseconds.
* @param error An optional error object. Will use a new instance of `Error` by default
* @param id An optional stream id.
* @param timeoutMs timeout period in milliseconds
* @param error error object
* @param resetTimeout timeout reset flag
* @param id subscription id
*/
export function timeout<T>(timeoutMs: number, error?: any, id?: string): Subscription<T, T> {
return new Timeout(timeoutMs, error, id);
export function timeout<T>(timeoutMs: number, error?: any, resetTimeout = false, id?: string): Subscription<T, T> {
return new Timeout(timeoutMs, error, resetTimeout, id);
}

class Timeout<T> extends Subscription<T, T> {
private readonly timeoutId: any;
protected timeoutMs: number;
protected timeoutId: any;
protected errorObj: any;
protected resetTimeout: boolean;

constructor(timeoutMs: number, error?: any, id?: string) {
super(undefined, undefined, undefined, id || `timeout-${Subscription.NEXT_ID++}`);
constructor(timeoutMs: number, error?: any, resetTimeout = false, id?: string) {
super(null, null, null, id || `timeout-${Subscription.NEXT_ID++}`);
this.timeoutMs = timeoutMs;
this.errorObj = error;
this.resetTimeout = resetTimeout;
this.reset();
}

this.timeoutId = setTimeout(() => {
if (this.state < State.DONE) {
this.error(error || new Error(`Timeout stream "${this.id}" after ${timeoutMs} ms`))
}
}, timeoutMs);
}
next(x: T) {
if (this.resetTimeout) {
clearTimeout(this.timeoutId);
this.reset();
}
super.next(x);
}

cleanup(): void {
clearTimeout(this.timeoutId);
super.cleanup();
}
}
reset() {
this.timeoutId = setTimeout(() => {
if (this.state < State.DONE) {
this.error(
this.errorObj ||
new Error(`Timeout stream "${this.id}" after ${this.timeoutMs} ms`)
);
}
}, this.timeoutMs);
}

cleanup(): void {
clearTimeout(this.timeoutId);
super.cleanup();
}
}
74 changes: 45 additions & 29 deletions packages/rstream/test/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,49 @@ import * as assert from "assert";
import { timeout } from "../src/subs/timeout";

describe("Timeout", () => {
it("times out", function(done) {
this.timeout(20);

timeout(10).subscribe({
error: () => done()
})
});

it("times out with error object", function (done) {
this.timeout(20);

const error = 'error object';

timeout(10, error).subscribe({
error: (err) => { assert.equal(err, error); done() }
})
});

it("cancels timeout in cleanup()", function (done) {
this.timeout(40);

timeout(10)
.subscribe({
error: () => assert.fail('timed out'),
})
.unsubscribe();

setTimeout(() => done(), 20)
});
it("times out", function (done) {
this.timeout(20);

timeout(10).subscribe({
error: () => done()
})
});

it("times out with error object", function (done) {
this.timeout(20);

const error = 'error object';

timeout(10, error).subscribe({
error: (err) => { assert.equal(err, error); done() }
})
});

it("cancels timeout in cleanup()", function (done) {
this.timeout(40);

timeout(10)
.subscribe({
error: () => assert.fail('timed out'),
})
.unsubscribe();

setTimeout(() => done(), 20)
});

it("resets timeout when value received", function (done) {
this.timeout(40);

const res = [];
const t = timeout(10, null, true);
t.subscribe({
next: (x) => { res.push(x); },
error: () => { assert.deepEqual(res, [1, 2]); }
});

setTimeout(() => t.next(1), 7);
setTimeout(() => t.next(2), 15);
setTimeout(() => t.next(3), 29);
setTimeout(() => done(), 35);
});
});

0 comments on commit cd751fb

Please sign in to comment.