-
Notifications
You must be signed in to change notification settings - Fork 30
/
generic-reader.ts
98 lines (78 loc) · 3.56 KB
/
generic-reader.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import assert from '../../stub/assert';
import { ReadableStream, ReadableStreamCancel, ReadableStreamReader } from '../readable-stream';
import { newPromise, setPromiseIsHandledToTrue } from '../helpers/webidl';
export function ReadableStreamReaderGenericInitialize<R>(reader: ReadableStreamReader<R>, stream: ReadableStream<R>) {
reader._ownerReadableStream = stream;
stream._reader = reader;
if (stream._state === 'readable') {
defaultReaderClosedPromiseInitialize(reader);
} else if (stream._state === 'closed') {
defaultReaderClosedPromiseInitializeAsResolved(reader);
} else {
assert(stream._state === 'errored');
defaultReaderClosedPromiseInitializeAsRejected(reader, stream._storedError);
}
}
// A client of ReadableStreamDefaultReader and ReadableStreamBYOBReader may use these functions directly to bypass state
// check.
export function ReadableStreamReaderGenericCancel(reader: ReadableStreamReader<any>, reason: any): Promise<undefined> {
const stream = reader._ownerReadableStream;
assert(stream !== undefined);
return ReadableStreamCancel(stream, reason);
}
export function ReadableStreamReaderGenericRelease(reader: ReadableStreamReader<any>) {
assert(reader._ownerReadableStream !== undefined);
assert(reader._ownerReadableStream._reader === reader);
if (reader._ownerReadableStream._state === 'readable') {
defaultReaderClosedPromiseReject(
reader,
new TypeError(`Reader was released and can no longer be used to monitor the stream's closedness`));
} else {
defaultReaderClosedPromiseResetToRejected(
reader,
new TypeError(`Reader was released and can no longer be used to monitor the stream's closedness`));
}
reader._ownerReadableStream._reader = undefined;
reader._ownerReadableStream = undefined!;
}
// Helper functions for the readers.
export function readerLockException(name: string): TypeError {
return new TypeError('Cannot ' + name + ' a stream using a released reader');
}
// Helper functions for the ReadableStreamDefaultReader.
export function defaultReaderClosedPromiseInitialize(reader: ReadableStreamReader<any>) {
reader._closedPromise = newPromise((resolve, reject) => {
reader._closedPromise_resolve = resolve;
reader._closedPromise_reject = reject;
});
}
export function defaultReaderClosedPromiseInitializeAsRejected(reader: ReadableStreamReader<any>, reason: any) {
defaultReaderClosedPromiseInitialize(reader);
defaultReaderClosedPromiseReject(reader, reason);
}
export function defaultReaderClosedPromiseInitializeAsResolved(reader: ReadableStreamReader<any>) {
defaultReaderClosedPromiseInitialize(reader);
defaultReaderClosedPromiseResolve(reader);
}
export function defaultReaderClosedPromiseReject(reader: ReadableStreamReader<any>, reason: any) {
if (reader._closedPromise_reject === undefined) {
return;
}
setPromiseIsHandledToTrue(reader._closedPromise);
reader._closedPromise_reject(reason);
reader._closedPromise_resolve = undefined;
reader._closedPromise_reject = undefined;
}
export function defaultReaderClosedPromiseResetToRejected(reader: ReadableStreamReader<any>, reason: any) {
assert(reader._closedPromise_resolve === undefined);
assert(reader._closedPromise_reject === undefined);
defaultReaderClosedPromiseInitializeAsRejected(reader, reason);
}
export function defaultReaderClosedPromiseResolve(reader: ReadableStreamReader<any>) {
if (reader._closedPromise_resolve === undefined) {
return;
}
reader._closedPromise_resolve(undefined);
reader._closedPromise_resolve = undefined;
reader._closedPromise_reject = undefined;
}