diff --git a/package.json b/package.json index 8f0671447e..8c0fa7e59a 100644 --- a/package.json +++ b/package.json @@ -61,7 +61,7 @@ "react": "^17.0.2", "react-dom": "^17.0.2", "react-is": "^17.0.2", - "wonka": "^6.0.0" + "wonka": "./wonka-v6.0.1-rc-iterables.tgz" }, "devDependencies": { "@actions/artifact": "^0.5.1", @@ -115,5 +115,8 @@ "tar": "^6.1.0", "terser": "^5.14.1", "typescript": "^4.7.3" + }, + "dependencies": { + "wonka": "./wonka-v6.0.1-rc-iterables.tgz" } } diff --git a/packages/core/src/internal/fetchSource.ts b/packages/core/src/internal/fetchSource.ts index aaeb1a7d75..bef416f9d6 100644 --- a/packages/core/src/internal/fetchSource.ts +++ b/packages/core/src/internal/fetchSource.ts @@ -1,14 +1,12 @@ -import { Source, make } from 'wonka'; +import { Source, fromAsyncIterable } from 'wonka'; import { Operation, OperationResult } from '../types'; import { makeResult, makeErrorResult, mergeResultPatch } from '../utils'; -const asyncIterator = - typeof Symbol !== 'undefined' ? Symbol.asyncIterator : null; const decoder = typeof TextDecoder !== 'undefined' ? new TextDecoder() : null; const jsonHeaderRe = /content-type:[^\r\n]*application\/json/i; const boundaryHeaderRe = /boundary="?([^=";]+)"?/i; -type ChunkData = { done: false; value: Buffer | Uint8Array } | { done: true }; +type ChunkData = Buffer | Uint8Array; // NOTE: We're avoiding referencing the `Buffer` global here to prevent // auto-polyfilling in Webpack @@ -17,168 +15,144 @@ const toString = (input: Buffer | ArrayBuffer): string => ? (input as Buffer).toString() : decoder!.decode(input as ArrayBuffer); -export const makeFetchSource = ( +async function* fetchOperation( operation: Operation, url: string, fetchOptions: RequestInit -): Source => { +) { const maxStatus = fetchOptions.redirect === 'manual' ? 400 : 300; const fetcher = operation.context.fetch; - return make(({ next, complete }) => { - const abortController = - typeof AbortController !== 'undefined' ? new AbortController() : null; - if (abortController) { + let abortController: AbortController | void; + let response: Response; + let hasResults = false; + let statusNotOk = false; + + try { + if (typeof AbortController !== 'undefined') { + abortController = new AbortController(); fetchOptions.signal = abortController.signal; } - let hasResults = false; - // DERIVATIVE: Copyright (c) 2021 Marais Rossouw - // See: https://github.com/maraisr/meros/blob/219fe95/src/browser.ts - const executeIncrementalFetch = ( - onResult: (result: OperationResult) => void, - operation: Operation, - response: Response - ): Promise => { - // NOTE: Guarding against fetch polyfills here - const contentType = - (response.headers && response.headers.get('Content-Type')) || ''; - if (/text\//i.test(contentType)) { - return response.text().then(text => { - onResult(makeErrorResult(operation, new Error(text), response)); - }); - } else if (!/multipart\/mixed/i.test(contentType)) { - return response.text().then(payload => { - onResult(makeResult(operation, JSON.parse(payload), response)); - }); - } - - let boundary = '---'; - const boundaryHeader = contentType.match(boundaryHeaderRe); - if (boundaryHeader) boundary = '--' + boundaryHeader[1]; + response = await (fetcher || fetch)(url, fetchOptions); + statusNotOk = response.status < 200 || response.status >= maxStatus; + + const contentType = + (response.headers && response.headers.get('Content-Type')) || ''; + if (/text\//i.test(contentType)) { + const text = await response.text(); + return yield makeErrorResult(operation, new Error(text), response); + } else if (!/multipart\/mixed/i.test(contentType)) { + const text = await response.text(); + return yield makeResult(operation, JSON.parse(text), response); + } - let read: () => Promise; - let cancel = () => { - /*noop*/ + let boundary = '---'; + const boundaryHeader = contentType.match(boundaryHeaderRe); + if (boundaryHeader) boundary = '--' + boundaryHeader[1]; + + let iterator: AsyncIterableIterator; + if (response[Symbol.asyncIterator]) { + iterator = response[Symbol.asyncIterator](); + } else if (response.body) { + const reader = response.body.getReader(); + iterator = { + next() { + return reader.read() as Promise>; + }, + async return() { + await reader.cancel(); + return { done: true } as IteratorReturnResult; + }, + [Symbol.asyncIterator]() { + return iterator; + }, }; - if (asyncIterator && response[asyncIterator]) { - const iterator = response[asyncIterator](); - read = iterator.next.bind(iterator); - } else if ('body' in response && response.body) { - const reader = response.body.getReader(); - cancel = reader.cancel.bind(reader); - read = reader.read.bind(reader); - } else { - throw new TypeError('Streaming requests unsupported'); - } + } else { + throw new TypeError('Streaming requests unsupported'); + } + try { let buffer = ''; let isPreamble = true; let nextResult: OperationResult | null = null; let prevResult: OperationResult | null = null; + for await (const data of iterator) { + hasResults = true; - function next(data: ChunkData): Promise | void { - if (!data.done) { - const chunk = toString(data.value); - let boundaryIndex = chunk.indexOf(boundary); - if (boundaryIndex > -1) { - boundaryIndex += buffer.length; - } else { - boundaryIndex = buffer.indexOf(boundary); - } + const chunk = toString(data); + let boundaryIndex = chunk.indexOf(boundary); + if (boundaryIndex > -1) { + boundaryIndex += buffer.length; + } else { + boundaryIndex = buffer.indexOf(boundary); + } - buffer += chunk; - while (boundaryIndex > -1) { - const current = buffer.slice(0, boundaryIndex); - const next = buffer.slice(boundaryIndex + boundary.length); - - if (isPreamble) { - isPreamble = false; - } else { - const headersEnd = current.indexOf('\r\n\r\n') + 4; - const headers = current.slice(0, headersEnd); - const body = current.slice( - headersEnd, - current.lastIndexOf('\r\n') - ); - - let payload: any; - if (jsonHeaderRe.test(headers)) { - try { - payload = JSON.parse(body); - nextResult = prevResult = prevResult - ? mergeResultPatch(prevResult, payload, response) - : makeResult(operation, payload, response); - } catch (_error) {} - } - - if (next.slice(0, 2) === '--' || (payload && !payload.hasNext)) { - if (!prevResult) - return onResult(makeResult(operation, {}, response)); - break; - } + buffer += chunk; + while (boundaryIndex > -1) { + const current = buffer.slice(0, boundaryIndex); + const next = buffer.slice(boundaryIndex + boundary.length); + + if (isPreamble) { + isPreamble = false; + } else { + const headersEnd = current.indexOf('\r\n\r\n') + 4; + const headers = current.slice(0, headersEnd); + const body = current.slice(headersEnd, current.lastIndexOf('\r\n')); + + let payload: any; + if (jsonHeaderRe.test(headers)) { + try { + payload = JSON.parse(body); + nextResult = prevResult = prevResult + ? mergeResultPatch(prevResult, payload, response) + : makeResult(operation, payload, response); + } catch (_error) {} } - buffer = next; - boundaryIndex = buffer.indexOf(boundary); + if (next.slice(0, 2) === '--' || (payload && !payload.hasNext)) { + if (!prevResult) yield makeResult(operation, {}, response); + break; + } } - } else { - hasResults = true; + + buffer = next; + boundaryIndex = buffer.indexOf(boundary); } if (nextResult) { - onResult(nextResult); + yield nextResult; nextResult = null; } - - if (!data.done && (!prevResult || prevResult.hasNext)) { - return read().then(next); - } } + } finally { + iterator.return?.(); + } + } catch (error: any) { + if (hasResults) { + throw error; + } - return read().then(next).finally(cancel); - }; - - let ended = false; - let statusNotOk = false; - let response: Response; - - Promise.resolve() - .then(() => { - if (ended) return; - return (fetcher || fetch)(url, fetchOptions); - }) - .then((_response: Response | void) => { - if (!_response) return; - response = _response; - statusNotOk = response.status < 200 || response.status >= maxStatus; - return executeIncrementalFetch(next, operation, response); - }) - .then(complete) - .catch((error: Error) => { - if (hasResults) { - throw error; - } + yield makeErrorResult( + operation, + statusNotOk + ? response!.statusText + ? new Error(response!.statusText) + : error + : error, + response! + ); + } finally { + if (abortController) { + abortController.abort(); + } + } +} - const result = makeErrorResult( - operation, - statusNotOk - ? response.statusText - ? new Error(response.statusText) - : error - : error, - response - ); - - next(result); - complete(); - }); - - return () => { - ended = true; - if (abortController) { - abortController.abort(); - } - }; - }); -}; +export function makeFetchSource( + operation: Operation, + url: string, + fetchOptions: RequestInit +): Source { + return fromAsyncIterable(fetchOperation(operation, url, fetchOptions)); +} diff --git a/wonka-v6.0.1-rc-iterables.tgz b/wonka-v6.0.1-rc-iterables.tgz new file mode 100644 index 0000000000..9d76b4e75b Binary files /dev/null and b/wonka-v6.0.1-rc-iterables.tgz differ diff --git a/yarn.lock b/yarn.lock index f3637da4d4..941d1bef30 100644 --- a/yarn.lock +++ b/yarn.lock @@ -17538,10 +17538,9 @@ windows-release@^3.1.0: dependencies: execa "^1.0.0" -"wonka@>= 4.0.9", wonka@^6.0.0: - version "6.0.0" - resolved "https://registry.yarnpkg.com/wonka/-/wonka-6.0.0.tgz#38cd39a517fc3ff721ea3bf353642b353bf48860" - integrity sha512-TEiIOqkhQXbcmL1RrjxPCzTX15V5FSyJvZRSiTxvgTgrJMaOVKmzGTdRVh349CfaNo9dsIhWDyg1/GNq4NWrEg== +wonka@./wonka-v6.0.1-rc-iterables.tgz, "wonka@>= 4.0.9", wonka@^6.0.0: + version "6.0.1-rc-iterables" + resolved "./wonka-v6.0.1-rc-iterables.tgz#927222feca605a28de4168a42b4c5fb1169f211c" word-wrap@^1.2.3, word-wrap@~1.2.3: version "1.2.3"