From 6c633da5933d1d47390cdc8d2042c9ea4266fecd Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Tue, 14 Mar 2023 21:23:22 +0000 Subject: [PATCH 1/8] Add support for text/event-stream --- packages/core/src/internal/fetchSource.ts | 52 +++++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/packages/core/src/internal/fetchSource.ts b/packages/core/src/internal/fetchSource.ts index b242371fda..9a6199aff3 100644 --- a/packages/core/src/internal/fetchSource.ts +++ b/packages/core/src/internal/fetchSource.ts @@ -4,7 +4,9 @@ import { makeResult, makeErrorResult, mergeResultPatch } from '../utils'; const decoder = typeof TextDecoder !== 'undefined' ? new TextDecoder() : null; const boundaryHeaderRe = /boundary="?([^=";]+)"?/i; +const eventStreamRe = /\s*event:\s*([\w-_]+)(?:[ \r\n]+data:)?/g; +type ContentMode = 'json' | 'multipart' | 'event-stream'; type ChunkData = Buffer | Uint8Array; // NOTE: We're avoiding referencing the `Buffer` global here to prevent @@ -14,6 +16,18 @@ const toString = (input: Buffer | ArrayBuffer): string => ? (input as Buffer).toString() : decoder!.decode(input as ArrayBuffer); +const parseContentMode = (contentType: string): ContentMode | null => { + if (/multipart\/mixed/i.test(contentType)) { + return 'multipart'; + } else if (/text\/event-stream/.test(contentType)) { + return 'event-stream'; + } else if (/text\//.test(contentType)) { + return null; + } else { + return 'json'; + } +}; + async function* streamBody(response: Response): AsyncIterableIterator { if (response.body![Symbol.asyncIterator]) { for await (const chunk of response.body! as any) @@ -29,6 +43,33 @@ async function* streamBody(response: Response): AsyncIterableIterator { } } +async function* parseEventStream(response: Response) { + let payload: any; + + chunks: for await (const chunk of streamBody(response)) { + for (const message of chunk.split('\n\n')) { + const match = message.match(eventStreamRe); + if (match) { + const type = match[1]; + if (type === 'complete') { + break chunks; + } else if (type === 'next') { + const chunk = message.slice(match[0].length); + try { + yield (payload = JSON.parse(chunk)); + } catch (error) { + if (!payload) throw error; + } + + if (payload && !payload.hasNext) break chunks; + } + } + } + } + + if (payload && payload.hasNext) yield { hasNext: false }; +} + async function* parseMultipartMixed( contentType: string, response: Response @@ -85,17 +126,22 @@ async function* fetchOperation( // Delay for a tick to give the Client a chance to cancel the request // if a teardown comes in immediately await Promise.resolve(); + response = await (operation.context.fetch || fetch)(url, fetchOptions); const contentType = response.headers.get('Content-Type') || ''; - if (/text\//i.test(contentType)) { + const mode = parseContentMode(contentType); + if (!mode) { const text = await response.text(); return yield makeErrorResult(operation, new Error(text), response); - } else if (!/multipart\/mixed/i.test(contentType)) { + } else if (mode === 'json') { const text = await response.text(); return yield makeResult(operation, JSON.parse(text), response); } - const iterator = parseMultipartMixed(contentType, response); + const iterator = + mode === 'multipart' + ? parseMultipartMixed(contentType, response) + : parseEventStream(response); for await (const payload of iterator) { yield (result = result ? mergeResultPatch(result, payload, response) From c5b1a9f78016d916ee02f9e0a44b2f4c17b0fbc1 Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Tue, 14 Mar 2023 21:47:19 +0000 Subject: [PATCH 2/8] Add text/event-stream to Accept header --- packages/core/src/internal/fetchOptions.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/core/src/internal/fetchOptions.ts b/packages/core/src/internal/fetchOptions.ts index 8540a3d464..7aea160340 100644 --- a/packages/core/src/internal/fetchOptions.ts +++ b/packages/core/src/internal/fetchOptions.ts @@ -91,8 +91,9 @@ export const makeFetchOptions = ( const headers: HeadersInit = { accept: - 'application/graphql-response+json, application/graphql+json, application/json, multipart/mixed', + 'application/graphql-response+json, application/graphql+json, application/json, text/event-stream, multipart/mixed', }; + if (!useGETMethod) headers['content-type'] = 'application/json'; const extraOptions = (typeof operation.context.fetchOptions === 'function' From ed695f2b7c47c2f44dc8922f5c26b13bcfa874c1 Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Wed, 15 Mar 2023 12:10:53 +0000 Subject: [PATCH 3/8] Add permissive message parsing ignoring all but "data:" events --- packages/core/src/internal/fetchSource.ts | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/packages/core/src/internal/fetchSource.ts b/packages/core/src/internal/fetchSource.ts index 9a6199aff3..21e9adf25b 100644 --- a/packages/core/src/internal/fetchSource.ts +++ b/packages/core/src/internal/fetchSource.ts @@ -4,7 +4,7 @@ import { makeResult, makeErrorResult, mergeResultPatch } from '../utils'; const decoder = typeof TextDecoder !== 'undefined' ? new TextDecoder() : null; const boundaryHeaderRe = /boundary="?([^=";]+)"?/i; -const eventStreamRe = /\s*event:\s*([\w-_]+)(?:[ \r\n]+data:)?/g; +const eventStreamRe = /data: ?([^\n]+)/; type ContentMode = 'json' | 'multipart' | 'event-stream'; type ChunkData = Buffer | Uint8Array; @@ -50,19 +50,14 @@ async function* parseEventStream(response: Response) { for (const message of chunk.split('\n\n')) { const match = message.match(eventStreamRe); if (match) { - const type = match[1]; - if (type === 'complete') { - break chunks; - } else if (type === 'next') { - const chunk = message.slice(match[0].length); - try { - yield (payload = JSON.parse(chunk)); - } catch (error) { - if (!payload) throw error; - } - - if (payload && !payload.hasNext) break chunks; + const chunk = match[1]; + try { + yield (payload = JSON.parse(chunk)); + } catch (error) { + if (!payload) throw error; } + + if (payload && !payload.hasNext) break chunks; } } } From 4b5c7eb0ca3358ba826bca3fdcbacf8a227dfc5f Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Wed, 15 Mar 2023 12:21:16 +0000 Subject: [PATCH 4/8] Add changeset --- .changeset/proud-buses-change.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/proud-buses-change.md diff --git a/.changeset/proud-buses-change.md b/.changeset/proud-buses-change.md new file mode 100644 index 0000000000..564f6547e9 --- /dev/null +++ b/.changeset/proud-buses-change.md @@ -0,0 +1,5 @@ +--- +'@urql/core': minor +--- + +Implement `text/event-stream` response support. This generally adheres to the GraphQL SSE protocol and GraphQL Yoga push responses, and is an alternative to `multipart/mixed`. From fb2192488f17912a637d5b52f40551e8c9854060 Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Wed, 15 Mar 2023 15:48:12 +0000 Subject: [PATCH 5/8] Implement response stream chunking generically --- packages/core/src/internal/fetchSource.ts | 84 +++++++++++------------ 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/packages/core/src/internal/fetchSource.ts b/packages/core/src/internal/fetchSource.ts index 21e9adf25b..175e66a9c7 100644 --- a/packages/core/src/internal/fetchSource.ts +++ b/packages/core/src/internal/fetchSource.ts @@ -31,7 +31,7 @@ const parseContentMode = (contentType: string): ContentMode | null => { async function* streamBody(response: Response): AsyncIterableIterator { if (response.body![Symbol.asyncIterator]) { for await (const chunk of response.body! as any) - yield toString(chunk as ChunkData); + toString(chunk as ChunkData); } else { const reader = response.body!.getReader(); let result: ReadableStreamReadResult; @@ -43,26 +43,38 @@ async function* streamBody(response: Response): AsyncIterableIterator { } } +async function* split( + chunks: AsyncIterableIterator, + boundary: string +): AsyncIterableIterator { + let buffer = ''; + let boundaryIndex: number; + for await (const chunk of chunks) { + buffer += chunk; + while ((boundaryIndex = buffer.indexOf(boundary)) > -1) { + yield buffer.slice(0, boundaryIndex); + buffer = buffer.slice(boundaryIndex + boundary.length); + } + } +} + async function* parseEventStream(response: Response) { let payload: any; - - chunks: for await (const chunk of streamBody(response)) { - for (const message of chunk.split('\n\n')) { - const match = message.match(eventStreamRe); - if (match) { - const chunk = match[1]; - try { - yield (payload = JSON.parse(chunk)); - } catch (error) { - if (!payload) throw error; - } - - if (payload && !payload.hasNext) break chunks; + for await (const chunk of split(streamBody(response), '\n\n')) { + const match = chunk.match(eventStreamRe); + if (match) { + const chunk = match[1]; + try { + yield (payload = JSON.parse(chunk)); + } catch (error) { + if (!payload) throw error; } + if (payload && !payload.hasNext) break; } } - - if (payload && payload.hasNext) yield { hasNext: false }; + if (payload && payload.hasNext) { + yield { hasNext: false }; + } } async function* parseMultipartMixed( @@ -71,37 +83,25 @@ async function* parseMultipartMixed( ): AsyncIterableIterator { const boundaryHeader = contentType.match(boundaryHeaderRe); const boundary = '--' + (boundaryHeader ? boundaryHeader[1] : '-'); - - let buffer = ''; let isPreamble = true; - let boundaryIndex: number; let payload: any; - - chunks: for await (const chunk of streamBody(response)) { - buffer += chunk; - while ((boundaryIndex = buffer.indexOf(boundary)) > -1) { - if (isPreamble) { - isPreamble = false; - } else { - const chunk = buffer.slice( - buffer.indexOf('\r\n\r\n') + 4, - boundaryIndex - ); - - try { - yield (payload = JSON.parse(chunk)); - } catch (error) { - if (!payload) throw error; - } + for await (const chunk of split(streamBody(response), boundary)) { + if (isPreamble) { + isPreamble = false; + } else { + try { + yield (payload = JSON.parse( + chunk.slice(chunk.indexOf('\r\n\r\n') + 4) + )); + } catch (error) { + if (!payload) throw error; } - - buffer = buffer.slice(boundaryIndex + boundary.length); - if (buffer.startsWith('--') || (payload && !payload.hasNext)) - break chunks; } + if (payload && !payload.hasNext) break; + } + if (payload && payload.hasNext) { + yield { hasNext: false }; } - - if (payload && payload.hasNext) yield { hasNext: false }; } async function* fetchOperation( From 3472cd920070dc67f79aa54d243f872814efdd76 Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Wed, 15 Mar 2023 15:51:40 +0000 Subject: [PATCH 6/8] Update and shorten tests for both transports --- .../core/src/internal/fetchSource.test.ts | 179 ++---------------- 1 file changed, 19 insertions(+), 160 deletions(-) diff --git a/packages/core/src/internal/fetchSource.test.ts b/packages/core/src/internal/fetchSource.test.ts index 0c210b8aea..0aa40e24b1 100644 --- a/packages/core/src/internal/fetchSource.test.ts +++ b/packages/core/src/internal/fetchSource.test.ts @@ -202,7 +202,7 @@ describe('on multipart/mixed', () => { JSON.stringify(json) + '\r\n---'; - it('listens for more responses (stream)', async () => { + it('listens for more streamed responses', async () => { fetch.mockResolvedValue({ status: 200, headers: { @@ -226,9 +226,7 @@ describe('on multipart/mixed', () => { data: { author: { id: '1', - name: 'Steve', __typename: 'Author', - todos: [{ id: '1', text: 'stream', __typename: 'Todo' }], }, }, }) @@ -240,8 +238,8 @@ describe('on multipart/mixed', () => { wrap({ incremental: [ { - path: ['author', 'todos', 1], - data: { id: '2', text: 'defer', __typename: 'Todo' }, + path: ['author'], + data: { name: 'Steve' }, }, ], hasNext: true, @@ -269,6 +267,12 @@ describe('on multipart/mixed', () => { }, }); + const AuthorFragment = gql` + fragment authorFields on Author { + name + } + `; + const streamedQueryOperation: Operation = makeOperation( 'query', { @@ -276,13 +280,11 @@ describe('on multipart/mixed', () => { query { author { id - name - todos @stream { - id - text - } + ...authorFields @defer } } + + ${AuthorFragment} `, variables: {}, key: 1, @@ -301,9 +303,7 @@ describe('on multipart/mixed', () => { expect(chunks[0].data).toEqual({ author: { id: '1', - name: 'Steve', __typename: 'Author', - todos: [{ id: '1', text: 'stream', __typename: 'Todo' }], }, }); @@ -312,10 +312,6 @@ describe('on multipart/mixed', () => { id: '1', name: 'Steve', __typename: 'Author', - todos: [ - { id: '1', text: 'stream', __typename: 'Todo' }, - { id: '2', text: 'defer', __typename: 'Todo' }, - ], }, }); @@ -324,30 +320,26 @@ describe('on multipart/mixed', () => { id: '1', name: 'Steve', __typename: 'Author', - todos: [ - { id: '1', text: 'stream', __typename: 'Todo' }, - { id: '2', text: 'defer', __typename: 'Todo' }, - ], }, }); }); +}); - it('listens for more responses (defer)', async () => { +describe('on text/event-stream', () => { + const wrap = (json: object) => 'data: ' + JSON.stringify(json) + '\n\n'; + + it('listens for streamed responses', async () => { fetch.mockResolvedValue({ status: 200, headers: { get() { - return 'multipart/mixed'; + return 'text/event-stream'; }, }, body: { getReader: function () { let cancelled = false; const results = [ - { - done: false, - value: Buffer.from('\r\n---'), - }, { done: false, value: Buffer.from( @@ -378,7 +370,7 @@ describe('on multipart/mixed', () => { }, { done: false, - value: Buffer.from(wrap({ hasNext: false }) + '--'), + value: Buffer.from(wrap({ hasNext: false })), }, { done: true }, ]; @@ -453,137 +445,4 @@ describe('on multipart/mixed', () => { }, }); }); - - it('listens for more responses (defer-neted)', async () => { - fetch.mockResolvedValue({ - status: 200, - headers: { - get() { - return 'multipart/mixed'; - }, - }, - body: { - getReader: function () { - let cancelled = false; - const results = [ - { - done: false, - value: Buffer.from('\r\n---'), - }, - { - done: false, - value: Buffer.from( - wrap({ - hasNext: true, - data: { - author: { - id: '1', - name: 'Steve', - address: { - country: 'UK', - __typename: 'Address', - }, - __typename: 'Author', - }, - }, - }) - ), - }, - { - done: false, - value: Buffer.from( - wrap({ - incremental: [ - { - path: ['author', 'address'], - data: { street: 'home' }, - }, - ], - hasNext: true, - }) - ), - }, - { - done: false, - value: Buffer.from(wrap({ hasNext: false }) + '--'), - }, - { done: true }, - ]; - let count = 0; - return { - cancel: function () { - cancelled = true; - }, - read: function () { - if (cancelled) throw new Error('No'); - - return Promise.resolve(results[count++]); - }, - }; - }, - }, - }); - - const AddressFragment = gql` - fragment addressFields on Address { - street - } - `; - - const streamedQueryOperation: Operation = makeOperation( - 'query', - { - query: gql` - query { - author { - id - address { - id - country - ...addressFields @defer - } - } - } - - ${AddressFragment} - `, - variables: {}, - key: 1, - }, - context - ); - - const chunks: OperationResult[] = await pipe( - makeFetchSource(streamedQueryOperation, 'https://test.com/graphql', {}), - scan((prev: OperationResult[], item) => [...prev, item], []), - toPromise - ); - - expect(chunks.length).toEqual(3); - - expect(chunks[0].data).toEqual({ - author: { - id: '1', - name: 'Steve', - address: { - country: 'UK', - __typename: 'Address', - }, - __typename: 'Author', - }, - }); - - expect(chunks[1].data).toEqual({ - author: { - id: '1', - name: 'Steve', - address: { - country: 'UK', - street: 'home', - __typename: 'Address', - }, - __typename: 'Author', - }, - }); - }); }); From 077b65de4a2df60d92949a3f81871a3cd3559faa Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Wed, 15 Mar 2023 16:15:45 +0000 Subject: [PATCH 7/8] Simplify iterator selection in fetchSource.ts --- packages/core/src/internal/fetchSource.ts | 38 +++++++++-------------- 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/packages/core/src/internal/fetchSource.ts b/packages/core/src/internal/fetchSource.ts index 175e66a9c7..412652097b 100644 --- a/packages/core/src/internal/fetchSource.ts +++ b/packages/core/src/internal/fetchSource.ts @@ -6,7 +6,6 @@ const decoder = typeof TextDecoder !== 'undefined' ? new TextDecoder() : null; const boundaryHeaderRe = /boundary="?([^=";]+)"?/i; const eventStreamRe = /data: ?([^\n]+)/; -type ContentMode = 'json' | 'multipart' | 'event-stream'; type ChunkData = Buffer | Uint8Array; // NOTE: We're avoiding referencing the `Buffer` global here to prevent @@ -16,17 +15,9 @@ const toString = (input: Buffer | ArrayBuffer): string => ? (input as Buffer).toString() : decoder!.decode(input as ArrayBuffer); -const parseContentMode = (contentType: string): ContentMode | null => { - if (/multipart\/mixed/i.test(contentType)) { - return 'multipart'; - } else if (/text\/event-stream/.test(contentType)) { - return 'event-stream'; - } else if (/text\//.test(contentType)) { - return null; - } else { - return 'json'; - } -}; +async function* emit(result: ExecutionResult | Promise) { + yield await result; +} async function* streamBody(response: Response): AsyncIterableIterator { if (response.body![Symbol.asyncIterator]) { @@ -124,20 +115,19 @@ async function* fetchOperation( response = await (operation.context.fetch || fetch)(url, fetchOptions); const contentType = response.headers.get('Content-Type') || ''; - const mode = parseContentMode(contentType); - if (!mode) { - const text = await response.text(); - return yield makeErrorResult(operation, new Error(text), response); - } else if (mode === 'json') { - const text = await response.text(); - return yield makeResult(operation, JSON.parse(text), response); + + let results: AsyncIterable; + if (/multipart\/mixed/i.test(contentType)) { + results = parseMultipartMixed(contentType, response); + } else if (/text\/event-stream/i.test(contentType)) { + results = parseEventStream(response); + } else if (/text\//i.test(contentType)) { + throw new Error(await response.text()); + } else { + results = emit(JSON.parse(await response.text())); } - const iterator = - mode === 'multipart' - ? parseMultipartMixed(contentType, response) - : parseEventStream(response); - for await (const payload of iterator) { + for await (const payload of results) { yield (result = result ? mergeResultPatch(result, payload, response) : makeResult(operation, payload, response)); From 6930c96e280c2268b28ca97e6231f8cd997d3777 Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Wed, 15 Mar 2023 16:36:15 +0000 Subject: [PATCH 8/8] Replace emit with parseJSON in fetchSource.ts for symmetry --- packages/core/src/internal/fetchSource.ts | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/packages/core/src/internal/fetchSource.ts b/packages/core/src/internal/fetchSource.ts index 412652097b..c8e013facc 100644 --- a/packages/core/src/internal/fetchSource.ts +++ b/packages/core/src/internal/fetchSource.ts @@ -15,10 +15,6 @@ const toString = (input: Buffer | ArrayBuffer): string => ? (input as Buffer).toString() : decoder!.decode(input as ArrayBuffer); -async function* emit(result: ExecutionResult | Promise) { - yield await result; -} - async function* streamBody(response: Response): AsyncIterableIterator { if (response.body![Symbol.asyncIterator]) { for await (const chunk of response.body! as any) @@ -49,7 +45,15 @@ async function* split( } } -async function* parseEventStream(response: Response) { +async function* parseJSON( + response: Response +): AsyncIterableIterator { + yield JSON.parse(await response.text()); +} + +async function* parseEventStream( + response: Response +): AsyncIterableIterator { let payload: any; for await (const chunk of split(streamBody(response), '\n\n')) { const match = chunk.match(eventStreamRe); @@ -121,10 +125,10 @@ async function* fetchOperation( results = parseMultipartMixed(contentType, response); } else if (/text\/event-stream/i.test(contentType)) { results = parseEventStream(response); - } else if (/text\//i.test(contentType)) { - throw new Error(await response.text()); + } else if (!/text\//i.test(contentType)) { + results = parseJSON(response); } else { - results = emit(JSON.parse(await response.text())); + throw new Error(await response.text()); } for await (const payload of results) {