Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/streams #20

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
"globals": {
"Uint8Array": false,
"TextEncoder": false,
"TextDecoder": false
"TextDecoder": false,
"Promise": false,
"ReadableStream": false
},
"parserOptions": {
"ecmaVersion": 6,
Expand Down
1 change: 1 addition & 0 deletions karma.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ module.exports = function(config) {
// list of files / patterns to load in the browser
files: [
'node_modules/text-encoding/lib/encoding.js',
'node_modules/web-streams-polyfill/dist/polyfill.js',
'build/integration-tests.js'
],

Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"karma-sauce-launcher": "^1.0.0",
"lodash": "^4.15.0",
"text-encoding": "^0.6.0",
"url": "^0.11.0"
},
"dependencies": {}
"url": "^0.11.0",
"web-streams-polyfill": "^1.3.0"
}
}
38 changes: 0 additions & 38 deletions src/defaultChunkParser.js

This file was deleted.

56 changes: 56 additions & 0 deletions src/defaultParser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
const entryDelimiter = '\n';

export default function jsonLiteralParser(res, onData) {
const textDecoder = new TextDecoder();
const reader = res.body.getReader();
let trailer = '';

function processChunk(bytes, flush = false) {
const str = textDecoder.decode(bytes, { stream: !flush });
const jsonLiterals = str.split(entryDelimiter);

// process any trailing state left over from a previous call.
if (trailer) {
jsonLiterals[0] = `${trailer}${jsonLiterals[0]}`;
trailer = '';
}

// Is this a complete message? If not; push the trailing (incomplete) string
// into the state.
if (!flush && !hasSuffix(str, entryDelimiter)) {
trailer = jsonLiterals.pop();
}

try {
const jsonObjects = jsonLiterals
.filter(v => v.trim() !== '')
.map(v => JSON.parse(v));

if (jsonObjects.length) {
onData(null, jsonObjects);
}
}
catch (err) {
onData(err);
}
}

// call read() recursively until it's exhausted.
function pump() {
return reader.read()
.then(next => {
if (next.done) {
processChunk(new Uint8Array(), true);
return res;
}
processChunk(next.value);
return pump();
});
}

return pump();
}

function hasSuffix(s, suffix) {
return s.substr(s.length - suffix.length) === suffix;
}
37 changes: 29 additions & 8 deletions src/defaultTransportFactory.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,40 @@
import fetchRequest from './impl/fetch';
import mozXhrRequest from './impl/mozXhr';
import xhrRequest from './impl/xhr';
import { makeXhrTransport } from './impl/xhr';

let selected = null;

export default function defaultTransportFactory() {
const userAgent = navigator.userAgent.toLowerCase();

if (!selected) {
if (userAgent.indexOf("chrome") !== -1) {
if (window.Response && window.Response.prototype.hasOwnProperty("body")) {
console.log("selected fetch with ReadableStream");
selected = fetchRequest;
} else if (userAgent.indexOf('firefox') !== -1) {
selected = mozXhrRequest;
} else {
selected = xhrRequest;
const tmpXhr = new XMLHttpRequest();
const mozChunked = 'moz-chunked-arraybuffer';
tmpXhr.responseType = mozChunked;
if (tmpXhr.responseType === mozChunked) {
console.log("selected moz!");
selected = makeXhrTransport({
responseType: mozChunked,
responseParserFactory: function () {
return response => new Uint8Array(response);
}
});
} else {
console.log("selected plain xhr");
selected = makeXhrTransport({
responseType: 'text',
responseParserFactory: function () {
const encoder = new TextEncoder();
let offset = 0;
return function (response) {
const chunk = response.substr(offset);
offset = response.length;
return encoder.encode(chunk, { stream: true });
}
}
});
}
}
}
return selected;
Expand Down
39 changes: 11 additions & 28 deletions src/impl/fetch.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,20 @@
import { isObject } from '../util';

export const READABLE_BYTE_STREAM = 'readable-byte-stream';

export default function fetchRequest(options) {
const { onRawChunk, onRawComplete, method, body, credentials } = options;
const { method, body, credentials } = options;
const headers = marshallHeaders(options.headers);

function pump(reader, res) {
return reader.read()
.then(result => {
if (result.done) {
return onRawComplete({
statusCode: res.status,
transport: READABLE_BYTE_STREAM,
raw: res
});
}
onRawChunk(result.value);
return pump(reader, res);
});
}

function onError(err) {
options.onRawComplete({
statusCode: 0,
transport: READABLE_BYTE_STREAM,
raw: err
return fetch(options.url, { headers, method, body, credentials })
.then(res => {
return {
body: res.body,
headers: res.headers,
ok: res.ok,
status: res.status,
statusText: res.statusText,
url: res.url
};
});
}

fetch(options.url, { headers, method, body, credentials })
.then(res => pump(res.body.getReader(), res))
.catch(onError);
}

function marshallHeaders(v) {
Expand Down
40 changes: 0 additions & 40 deletions src/impl/mozXhr.js

This file was deleted.

88 changes: 49 additions & 39 deletions src/impl/xhr.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,56 @@
export const XHR = 'xhr';
import { parseResposneHeaders } from '../util';

export default function xhrRequest(options) {
const textEncoder = new TextEncoder();
const xhr = new XMLHttpRequest();
let index = 0;
export function makeXhrTransport({ responseType, responseParserFactory }) {
return function xhrTransport(options) {
const xhr = new XMLHttpRequest();
const responseParser = responseParserFactory();

function onProgressEvent() {
const rawText = xhr.responseText.substr(index);
index = xhr.responseText.length;
options.onRawChunk(textEncoder.encode(rawText, { stream: true }));
}

function onLoadEvent() {
// Force the textEncoder to flush.
options.onRawChunk(textEncoder.encode("", { stream: false }));
options.onRawComplete({
statusCode: xhr.status,
transport: XHR,
raw: xhr
let responseStreamController;
const responseStream = new ReadableStream({
start(c) {
responseStreamController = c
},
cancel() {
xhr.abort()
}
});
}

function onError(err) {
options.onRawComplete({
statusCode: 0,
transport: XHR,
raw: err
});
}
xhr.open(options.method, options.url);
xhr.responseType = responseType;
if (options.headers) {
Object.getOwnPropertyNames(options.headers).forEach(k => {
xhr.setRequestHeader(k, options.headers[k]);
})
}
if (options.credentials === 'include') {
xhr.withCredentials = true;
}

xhr.open(options.method, options.url);
xhr.responseType = 'text';
if (options.headers) {
Object.getOwnPropertyNames(options.headers).forEach(k => {
xhr.setRequestHeader(k, options.headers[k]);
})
}
if (options.credentials === 'include') {
xhr.withCredentials = true;
return new Promise((resolve, reject) => {
xhr.onreadystatechange = function () {
if (xhr.readyState === xhr.HEADERS_RECEIVED) {
return resolve({
body: responseStream,
headers: parseResposneHeaders(xhr.getAllResponseHeaders()),
ok: xhr.status >= 200 && xhr.status < 300,
status: xhr.status,
statusText: xhr.statusText,
url: options.url,
});
}
};
xhr.onerror = function (e) {
return reject(e);
};
xhr.onprogress = function () {
const bytes = responseParser(xhr.response);
responseStreamController.enqueue(bytes);
};
xhr.onload = function () {
responseStreamController.close();
};

xhr.send(options.body);
});
}
xhr.addEventListener('progress', onProgressEvent);
xhr.addEventListener('loadend', onLoadEvent);
xhr.addEventListener('error', onError);
xhr.send(options.body);
}
Loading