Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sd-streams from https://github.com/stardazed/sd-streams/blob/mast… #3192

Merged
merged 12 commits into from
Oct 28, 2019
77 changes: 76 additions & 1 deletion cli/js/body.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as blob from "./blob.ts";
import * as encoding from "./text_encoding.ts";
import * as headers from "./headers.ts";
import * as domTypes from "./dom_types.ts";
import { ReadableStream } from "./streams/mod.ts";

const { Headers } = headers;

Expand All @@ -12,6 +13,13 @@ const { TextEncoder, TextDecoder } = encoding;
const Blob = blob.DenoBlob;
const DenoBlob = blob.DenoBlob;

type ReadableStreamReader = domTypes.ReadableStreamReader;

interface ReadableStreamController {
enqueue(chunk: string | ArrayBuffer): void;
close(): void;
}

export type BodySource =
| domTypes.Blob
| domTypes.BufferSource
Expand All @@ -37,6 +45,8 @@ function validateBodyType(owner: Body, bodySource: BodySource): boolean {
return true;
} else if (typeof bodySource === "string") {
return true;
} else if (bodySource instanceof ReadableStream) {
return true;
} else if (bodySource instanceof FormData) {
return true;
} else if (!bodySource) {
Expand All @@ -47,6 +57,58 @@ function validateBodyType(owner: Body, bodySource: BodySource): boolean {
);
}

function concatenate(...arrays: Uint8Array[]): ArrayBuffer {
let totalLength = 0;
for (const arr of arrays) {
totalLength += arr.length;
}
const result = new Uint8Array(totalLength);
let offset = 0;
for (const arr of arrays) {
result.set(arr, offset);
offset += arr.length;
}
return result.buffer as ArrayBuffer;
}

function bufferFromStream(stream: ReadableStreamReader): Promise<ArrayBuffer> {
return new Promise(
(resolve, reject): void => {
const parts: Uint8Array[] = [];
const encoder = new TextEncoder();
// recurse
(function pump(): void {
stream
.read()
.then(
({ done, value }): void => {
if (done) {
return resolve(concatenate(...parts));
}

if (typeof value === "string") {
parts.push(encoder.encode(value));
} else if (value instanceof ArrayBuffer) {
parts.push(new Uint8Array(value));
} else if (!value) {
// noop for undefined
} else {
reject("unhandled type on stream read");
}

return pump();
}
)
.catch(
(err): void => {
reject(err);
}
);
})();
}
);
}

function getHeaderValueParams(value: string): Map<string, string> {
const params = new Map();
// Forced to do so for some Map constructor param mismatch
Expand Down Expand Up @@ -81,8 +143,18 @@ export class Body implements domTypes.Body {
if (this._stream) {
return this._stream;
}

if (this._bodySource instanceof ReadableStream) {
// @ts-ignore
this._stream = this._bodySource;
}
if (typeof this._bodySource === "string") {
throw Error("not implemented");
this._stream = new ReadableStream({
start(controller: ReadableStreamController): void {
controller.enqueue(this._bodySource);
controller.close();
}
});
}
return this._stream;
}
Expand Down Expand Up @@ -259,6 +331,9 @@ export class Body implements domTypes.Body {
} else if (typeof this._bodySource === "string") {
const enc = new TextEncoder();
return enc.encode(this._bodySource).buffer as ArrayBuffer;
} else if (this._bodySource instanceof ReadableStream) {
// @ts-ignore
return bufferFromStream(this._bodySource.getReader());
} else if (this._bodySource instanceof FormData) {
const enc = new TextEncoder();
return enc.encode(this._bodySource.toString()).buffer as ArrayBuffer;
Expand Down
138 changes: 124 additions & 14 deletions cli/js/dom_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ export interface AddEventListenerOptions extends EventListenerOptions {
passive: boolean;
}

interface AbortSignal extends EventTarget {
export interface AbortSignal extends EventTarget {
readonly aborted: boolean;
onabort: ((this: AbortSignal, ev: ProgressEvent) => any) | null;
addEventListener<K extends keyof AbortSignalEventMap>(
Expand All @@ -273,19 +273,6 @@ interface AbortSignal extends EventTarget {
): void;
}

export interface ReadableStream {
readonly locked: boolean;
cancel(): Promise<void>;
getReader(): ReadableStreamReader;
tee(): [ReadableStream, ReadableStream];
}

export interface ReadableStreamReader {
cancel(): Promise<void>;
read(): Promise<any>;
releaseLock(): void;
}

export interface FormData extends DomIterable<string, FormDataEntryValue> {
append(name: string, value: string | Blob, fileName?: string): void;
delete(name: string): void;
Expand Down Expand Up @@ -343,6 +330,129 @@ export interface Body {
text(): Promise<string>;
}

export interface ReadableStream {
readonly locked: boolean;
cancel(reason?: any): Promise<void>;
getReader(): ReadableStreamReader;
tee(): ReadableStream[];
}

export interface WritableStream<W = any> {
readonly locked: boolean;
abort(reason?: any): Promise<void>;
getWriter(): WritableStreamDefaultWriter<W>;
}

export interface PipeOptions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are called from cli/js/streams, and seem to be expected to exist in the dom.

I pulled most of these interfaces from https://github.com/microsoft/TypeScript/blob/master/lib/lib.dom.d.ts, I'm not sure whether this is right, or the right place to put these

preventAbort?: boolean;
preventCancel?: boolean;
preventClose?: boolean;
signal?: AbortSignal;
}

export interface UnderlyingSource<R = any> {
cancel?: ReadableStreamErrorCallback;
pull?: ReadableStreamDefaultControllerCallback<R>;
start?: ReadableStreamDefaultControllerCallback<R>;
type?: undefined;
}

export interface UnderlyingByteSource {
autoAllocateChunkSize?: number;
cancel?: ReadableStreamErrorCallback;
pull?: ReadableByteStreamControllerCallback;
start?: ReadableByteStreamControllerCallback;
type: "bytes";
}

export interface UnderlyingSink<W = any> {
abort?: WritableStreamErrorCallback;
close?: WritableStreamDefaultControllerCloseCallback;
start?: WritableStreamDefaultControllerStartCallback;
type?: undefined;
write?: WritableStreamDefaultControllerWriteCallback<W>;
}

export interface ReadableStreamReader {
cancel(reason?: any): Promise<void>;
read(): Promise<any>;
releaseLock(): void;
}

export interface ReadableStreamErrorCallback {
(reason: any): void | PromiseLike<void>;
}

export interface ReadableByteStreamControllerCallback {
(controller: ReadableByteStreamController): void | PromiseLike<void>;
}

export interface ReadableStreamDefaultControllerCallback<R> {
(controller: ReadableStreamDefaultController<R>): void | PromiseLike<void>;
}

export interface ReadableStreamDefaultController<R = any> {
readonly desiredSize: number | null;
close(): void;
enqueue(chunk: R): void;
error(error?: any): void;
}

export interface ReadableByteStreamController {
readonly byobRequest: ReadableStreamBYOBRequest | undefined;
readonly desiredSize: number | null;
close(): void;
enqueue(chunk: ArrayBufferView): void;
error(error?: any): void;
}

export interface ReadableStreamBYOBRequest {
readonly view: ArrayBufferView;
respond(bytesWritten: number): void;
respondWithNewView(view: ArrayBufferView): void;
}

export interface WritableStreamDefaultWriter<W = any> {
readonly closed: Promise<void>;
readonly desiredSize: number | null;
readonly ready: Promise<void>;
abort(reason?: any): Promise<void>;
close(): Promise<void>;
releaseLock(): void;
write(chunk: W): Promise<void>;
}

export interface WritableStreamErrorCallback {
(reason: any): void | PromiseLike<void>;
}

export interface WritableStreamDefaultControllerCloseCallback {
(): void | PromiseLike<void>;
}

export interface WritableStreamDefaultControllerStartCallback {
(controller: WritableStreamDefaultController): void | PromiseLike<void>;
}

export interface WritableStreamDefaultControllerWriteCallback<W> {
(chunk: W, controller: WritableStreamDefaultController): void | PromiseLike<
void
>;
}

export interface WritableStreamDefaultController {
error(error?: any): void;
}

export interface QueuingStrategy<T = any> {
highWaterMark?: number;
size?: QueuingStrategySizeCallback<T>;
}

export interface QueuingStrategySizeCallback<T = any> {
(chunk: T): number;
}

export interface Headers extends DomIterable<string, string> {
/** Appends a new value onto an existing header inside a `Headers` object, or
* adds the header if it does not already exist.
Expand Down
6 changes: 5 additions & 1 deletion cli/js/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,9 @@ export enum ErrorKind {
UnsupportedFetchScheme = 47,
TooManyRedirects = 48,
Diagnostic = 49,
JSError = 50
JSError = 50,

/** TODO These are DomError Types, and should be moved there when it exists */
DataCloneError = 51,
AbortError = 52
}
1 change: 0 additions & 1 deletion cli/js/globals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import * as url from "./url.ts";
import * as urlSearchParams from "./url_search_params.ts";
import * as workers from "./workers.ts";
import * as performanceUtil from "./performance.ts";

import * as request from "./request.ts";

// These imports are not exposed and therefore are fine to just import the
Expand Down
10 changes: 9 additions & 1 deletion cli/js/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import * as headers from "./headers.ts";
import * as body from "./body.ts";
import * as domTypes from "./dom_types.ts";
import * as streams from "./streams/mod.ts";

const { Headers } = headers;
const { ReadableStream } = streams;

function byteUpperCase(s: string): string {
return String(s).replace(/[a-z]/g, function byteUpperCaseReplace(c): string {
Expand Down Expand Up @@ -138,7 +140,13 @@ export class Request extends body.Body implements domTypes.Request {
headersList.push(header);
}

const body2 = this._bodySource;
let body2 = this._bodySource;

if (this._bodySource instanceof ReadableStream) {
const tees = (this._bodySource as domTypes.ReadableStream).tee();
this._stream = this._bodySource = tees[0];
body2 = tees[1];
}

const cloned = new Request(this.url, {
body: body2,
Expand Down
34 changes: 33 additions & 1 deletion cli/js/request_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { test, assertEquals } from "./test_util.ts";
import { test, assert, assertEquals } from "./test_util.ts";

test(function fromInit(): void {
const req = new Request("https://example.com", {
Expand All @@ -15,3 +15,35 @@ test(function fromInit(): void {
assertEquals(req.url, "https://example.com");
assertEquals(req.headers.get("test-header"), "value");
});

test(function fromRequest(): void {
const r = new Request("https://example.com");
// @ts-ignore
r._bodySource = "ahoyhoy";
r.headers.set("test-header", "value");

const req = new Request(r);

// @ts-ignore
assertEquals(req._bodySource, r._bodySource);
assertEquals(req.url, r.url);
assertEquals(req.headers.get("test-header"), r.headers.get("test-header"));
});

test(async function cloneRequestBodyStream(): Promise<void> {
// hack to get a stream
const stream = new Request("", { body: "a test body" }).body;
const r1 = new Request("https://example.com", {
body: stream
});

const r2 = r1.clone();

const b1 = await r1.text();
const b2 = await r2.text();

assertEquals(b1, b2);

// @ts-ignore
assert(r1._bodySource !== r2._bodySource);
});
19 changes: 19 additions & 0 deletions cli/js/streams/mod.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT

/**
* @stardazed/streams - implementation of the web streams standard
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/

export { SDReadableStream as ReadableStream } from "./readable-stream.ts";
export { WritableStream } from "./writable-stream.ts";

export { TransformStream } from "./transform-stream.ts";
export {
ByteLengthQueuingStrategy,
CountQueuingStrategy
} from "./strategies.ts";

Loading