Skip to content

Commit

Permalink
temp: Implement fetchSource as async iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
kitten committed Aug 14, 2022
1 parent 308f27a commit 6aab7d1
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 145 deletions.
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"react": "^17.0.2",
"react-dom": "^17.0.2",
"react-is": "^17.0.2",
"wonka": "^6.0.0"
"wonka": "./wonka-v6.0.1-rc-iterables.tgz"
},
"devDependencies": {
"@actions/artifact": "^0.5.1",
Expand Down Expand Up @@ -115,5 +115,8 @@
"tar": "^6.1.0",
"terser": "^5.14.1",
"typescript": "^4.7.3"
},
"dependencies": {
"wonka": "./wonka-v6.0.1-rc-iterables.tgz"
}
}
254 changes: 114 additions & 140 deletions packages/core/src/internal/fetchSource.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import { Source, make } from 'wonka';
import { Source, fromAsyncIterable } from 'wonka';
import { Operation, OperationResult } from '../types';
import { makeResult, makeErrorResult, mergeResultPatch } from '../utils';

const asyncIterator =
typeof Symbol !== 'undefined' ? Symbol.asyncIterator : null;
const decoder = typeof TextDecoder !== 'undefined' ? new TextDecoder() : null;
const jsonHeaderRe = /content-type:[^\r\n]*application\/json/i;
const boundaryHeaderRe = /boundary="?([^=";]+)"?/i;

type ChunkData = { done: false; value: Buffer | Uint8Array } | { done: true };
type ChunkData = Buffer | Uint8Array;

// NOTE: We're avoiding referencing the `Buffer` global here to prevent
// auto-polyfilling in Webpack
Expand All @@ -17,168 +15,144 @@ const toString = (input: Buffer | ArrayBuffer): string =>
? (input as Buffer).toString()
: decoder!.decode(input as ArrayBuffer);

export const makeFetchSource = (
async function* fetchOperation(
operation: Operation,
url: string,
fetchOptions: RequestInit
): Source<OperationResult> => {
) {
const maxStatus = fetchOptions.redirect === 'manual' ? 400 : 300;
const fetcher = operation.context.fetch;

return make<OperationResult>(({ next, complete }) => {
const abortController =
typeof AbortController !== 'undefined' ? new AbortController() : null;
if (abortController) {
let abortController: AbortController | void;
let response: Response;
let hasResults = false;
let statusNotOk = false;

try {
if (typeof AbortController !== 'undefined') {
abortController = new AbortController();
fetchOptions.signal = abortController.signal;
}

let hasResults = false;
// DERIVATIVE: Copyright (c) 2021 Marais Rossouw <[email protected]>
// See: https://github.com/maraisr/meros/blob/219fe95/src/browser.ts
const executeIncrementalFetch = (
onResult: (result: OperationResult) => void,
operation: Operation,
response: Response
): Promise<void> => {
// NOTE: Guarding against fetch polyfills here
const contentType =
(response.headers && response.headers.get('Content-Type')) || '';
if (/text\//i.test(contentType)) {
return response.text().then(text => {
onResult(makeErrorResult(operation, new Error(text), response));
});
} else if (!/multipart\/mixed/i.test(contentType)) {
return response.text().then(payload => {
onResult(makeResult(operation, JSON.parse(payload), response));
});
}

let boundary = '---';
const boundaryHeader = contentType.match(boundaryHeaderRe);
if (boundaryHeader) boundary = '--' + boundaryHeader[1];
response = await (fetcher || fetch)(url, fetchOptions);
statusNotOk = response.status < 200 || response.status >= maxStatus;

const contentType =
(response.headers && response.headers.get('Content-Type')) || '';
if (/text\//i.test(contentType)) {
const text = await response.text();
return yield makeErrorResult(operation, new Error(text), response);
} else if (!/multipart\/mixed/i.test(contentType)) {
const text = await response.text();
return yield makeResult(operation, JSON.parse(text), response);
}

let read: () => Promise<ChunkData>;
let cancel = () => {
/*noop*/
let boundary = '---';
const boundaryHeader = contentType.match(boundaryHeaderRe);
if (boundaryHeader) boundary = '--' + boundaryHeader[1];

let iterator: AsyncIterableIterator<ChunkData>;
if (response[Symbol.asyncIterator]) {
iterator = response[Symbol.asyncIterator]();
} else if (response.body) {
const reader = response.body.getReader();
iterator = {
next() {
return reader.read() as Promise<IteratorResult<ChunkData>>;
},
async return() {
await reader.cancel();
return { done: true } as IteratorReturnResult<any>;
},
[Symbol.asyncIterator]() {
return iterator;
},
};
if (asyncIterator && response[asyncIterator]) {
const iterator = response[asyncIterator]();
read = iterator.next.bind(iterator);
} else if ('body' in response && response.body) {
const reader = response.body.getReader();
cancel = reader.cancel.bind(reader);
read = reader.read.bind(reader);
} else {
throw new TypeError('Streaming requests unsupported');
}
} else {
throw new TypeError('Streaming requests unsupported');
}

try {
let buffer = '';
let isPreamble = true;
let nextResult: OperationResult | null = null;
let prevResult: OperationResult | null = null;
for await (const data of iterator) {
hasResults = true;

function next(data: ChunkData): Promise<void> | void {
if (!data.done) {
const chunk = toString(data.value);
let boundaryIndex = chunk.indexOf(boundary);
if (boundaryIndex > -1) {
boundaryIndex += buffer.length;
} else {
boundaryIndex = buffer.indexOf(boundary);
}
const chunk = toString(data);
let boundaryIndex = chunk.indexOf(boundary);
if (boundaryIndex > -1) {
boundaryIndex += buffer.length;
} else {
boundaryIndex = buffer.indexOf(boundary);
}

buffer += chunk;
while (boundaryIndex > -1) {
const current = buffer.slice(0, boundaryIndex);
const next = buffer.slice(boundaryIndex + boundary.length);

if (isPreamble) {
isPreamble = false;
} else {
const headersEnd = current.indexOf('\r\n\r\n') + 4;
const headers = current.slice(0, headersEnd);
const body = current.slice(
headersEnd,
current.lastIndexOf('\r\n')
);

let payload: any;
if (jsonHeaderRe.test(headers)) {
try {
payload = JSON.parse(body);
nextResult = prevResult = prevResult
? mergeResultPatch(prevResult, payload, response)
: makeResult(operation, payload, response);
} catch (_error) {}
}

if (next.slice(0, 2) === '--' || (payload && !payload.hasNext)) {
if (!prevResult)
return onResult(makeResult(operation, {}, response));
break;
}
buffer += chunk;
while (boundaryIndex > -1) {
const current = buffer.slice(0, boundaryIndex);
const next = buffer.slice(boundaryIndex + boundary.length);

if (isPreamble) {
isPreamble = false;
} else {
const headersEnd = current.indexOf('\r\n\r\n') + 4;
const headers = current.slice(0, headersEnd);
const body = current.slice(headersEnd, current.lastIndexOf('\r\n'));

let payload: any;
if (jsonHeaderRe.test(headers)) {
try {
payload = JSON.parse(body);
nextResult = prevResult = prevResult
? mergeResultPatch(prevResult, payload, response)
: makeResult(operation, payload, response);
} catch (_error) {}
}

buffer = next;
boundaryIndex = buffer.indexOf(boundary);
if (next.slice(0, 2) === '--' || (payload && !payload.hasNext)) {
if (!prevResult) yield makeResult(operation, {}, response);
break;
}
}
} else {
hasResults = true;

buffer = next;
boundaryIndex = buffer.indexOf(boundary);
}

if (nextResult) {
onResult(nextResult);
yield nextResult;
nextResult = null;
}

if (!data.done && (!prevResult || prevResult.hasNext)) {
return read().then(next);
}
}
} finally {
iterator.return?.();
}
} catch (error: any) {
if (hasResults) {
throw error;
}

return read().then(next).finally(cancel);
};

let ended = false;
let statusNotOk = false;
let response: Response;

Promise.resolve()
.then(() => {
if (ended) return;
return (fetcher || fetch)(url, fetchOptions);
})
.then((_response: Response | void) => {
if (!_response) return;
response = _response;
statusNotOk = response.status < 200 || response.status >= maxStatus;
return executeIncrementalFetch(next, operation, response);
})
.then(complete)
.catch((error: Error) => {
if (hasResults) {
throw error;
}
yield makeErrorResult(
operation,
statusNotOk
? response!.statusText
? new Error(response!.statusText)
: error
: error,
response!
);
} finally {
if (abortController) {
abortController.abort();
}
}
}

const result = makeErrorResult(
operation,
statusNotOk
? response.statusText
? new Error(response.statusText)
: error
: error,
response
);

next(result);
complete();
});

return () => {
ended = true;
if (abortController) {
abortController.abort();
}
};
});
};
export function makeFetchSource(
operation: Operation,
url: string,
fetchOptions: RequestInit
): Source<OperationResult> {
return fromAsyncIterable(fetchOperation(operation, url, fetchOptions));
}
Binary file added wonka-v6.0.1-rc-iterables.tgz
Binary file not shown.
7 changes: 3 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17538,10 +17538,9 @@ windows-release@^3.1.0:
dependencies:
execa "^1.0.0"

"wonka@>= 4.0.9", wonka@^6.0.0:
version "6.0.0"
resolved "https://registry.yarnpkg.com/wonka/-/wonka-6.0.0.tgz#38cd39a517fc3ff721ea3bf353642b353bf48860"
integrity sha512-TEiIOqkhQXbcmL1RrjxPCzTX15V5FSyJvZRSiTxvgTgrJMaOVKmzGTdRVh349CfaNo9dsIhWDyg1/GNq4NWrEg==
wonka@./wonka-v6.0.1-rc-iterables.tgz, "wonka@>= 4.0.9", wonka@^6.0.0:
version "6.0.1-rc-iterables"
resolved "./wonka-v6.0.1-rc-iterables.tgz#927222feca605a28de4168a42b4c5fb1169f211c"

word-wrap@^1.2.3, word-wrap@~1.2.3:
version "1.2.3"
Expand Down

0 comments on commit 6aab7d1

Please sign in to comment.