diff --git a/.changeset/proud-buses-change.md b/.changeset/proud-buses-change.md new file mode 100644 index 0000000000..564f6547e9 --- /dev/null +++ b/.changeset/proud-buses-change.md @@ -0,0 +1,5 @@ +--- +'@urql/core': minor +--- + +Implement `text/event-stream` response support. This generally adheres to the GraphQL SSE protocol and GraphQL Yoga push responses, and is an alternative to `multipart/mixed`. diff --git a/packages/core/src/internal/fetchOptions.ts b/packages/core/src/internal/fetchOptions.ts index 8540a3d464..7aea160340 100644 --- a/packages/core/src/internal/fetchOptions.ts +++ b/packages/core/src/internal/fetchOptions.ts @@ -91,8 +91,9 @@ export const makeFetchOptions = ( const headers: HeadersInit = { accept: - 'application/graphql-response+json, application/graphql+json, application/json, multipart/mixed', + 'application/graphql-response+json, application/graphql+json, application/json, text/event-stream, multipart/mixed', }; + if (!useGETMethod) headers['content-type'] = 'application/json'; const extraOptions = (typeof operation.context.fetchOptions === 'function' diff --git a/packages/core/src/internal/fetchSource.test.ts b/packages/core/src/internal/fetchSource.test.ts index 0c210b8aea..0aa40e24b1 100644 --- a/packages/core/src/internal/fetchSource.test.ts +++ b/packages/core/src/internal/fetchSource.test.ts @@ -202,7 +202,7 @@ describe('on multipart/mixed', () => { JSON.stringify(json) + '\r\n---'; - it('listens for more responses (stream)', async () => { + it('listens for more streamed responses', async () => { fetch.mockResolvedValue({ status: 200, headers: { @@ -226,9 +226,7 @@ describe('on multipart/mixed', () => { data: { author: { id: '1', - name: 'Steve', __typename: 'Author', - todos: [{ id: '1', text: 'stream', __typename: 'Todo' }], }, }, }) @@ -240,8 +238,8 @@ describe('on multipart/mixed', () => { wrap({ incremental: [ { - path: ['author', 'todos', 1], - data: { id: '2', text: 'defer', __typename: 'Todo' }, + path: ['author'], + data: { name: 'Steve' }, }, ], hasNext: true, @@ -269,6 +267,12 @@ describe('on multipart/mixed', () => { }, }); + const AuthorFragment = gql` + fragment authorFields on Author { + name + } + `; + const streamedQueryOperation: Operation = makeOperation( 'query', { @@ -276,13 +280,11 @@ describe('on multipart/mixed', () => { query { author { id - name - todos @stream { - id - text - } + ...authorFields @defer } } + + ${AuthorFragment} `, variables: {}, key: 1, @@ -301,9 +303,7 @@ describe('on multipart/mixed', () => { expect(chunks[0].data).toEqual({ author: { id: '1', - name: 'Steve', __typename: 'Author', - todos: [{ id: '1', text: 'stream', __typename: 'Todo' }], }, }); @@ -312,10 +312,6 @@ describe('on multipart/mixed', () => { id: '1', name: 'Steve', __typename: 'Author', - todos: [ - { id: '1', text: 'stream', __typename: 'Todo' }, - { id: '2', text: 'defer', __typename: 'Todo' }, - ], }, }); @@ -324,30 +320,26 @@ describe('on multipart/mixed', () => { id: '1', name: 'Steve', __typename: 'Author', - todos: [ - { id: '1', text: 'stream', __typename: 'Todo' }, - { id: '2', text: 'defer', __typename: 'Todo' }, - ], }, }); }); +}); - it('listens for more responses (defer)', async () => { +describe('on text/event-stream', () => { + const wrap = (json: object) => 'data: ' + JSON.stringify(json) + '\n\n'; + + it('listens for streamed responses', async () => { fetch.mockResolvedValue({ status: 200, headers: { get() { - return 'multipart/mixed'; + return 'text/event-stream'; }, }, body: { getReader: function () { let cancelled = false; const results = [ - { - done: false, - value: Buffer.from('\r\n---'), - }, { done: false, value: Buffer.from( @@ -378,7 +370,7 @@ describe('on multipart/mixed', () => { }, { done: false, - value: Buffer.from(wrap({ hasNext: false }) + '--'), + value: Buffer.from(wrap({ hasNext: false })), }, { done: true }, ]; @@ -453,137 +445,4 @@ describe('on multipart/mixed', () => { }, }); }); - - it('listens for more responses (defer-neted)', async () => { - fetch.mockResolvedValue({ - status: 200, - headers: { - get() { - return 'multipart/mixed'; - }, - }, - body: { - getReader: function () { - let cancelled = false; - const results = [ - { - done: false, - value: Buffer.from('\r\n---'), - }, - { - done: false, - value: Buffer.from( - wrap({ - hasNext: true, - data: { - author: { - id: '1', - name: 'Steve', - address: { - country: 'UK', - __typename: 'Address', - }, - __typename: 'Author', - }, - }, - }) - ), - }, - { - done: false, - value: Buffer.from( - wrap({ - incremental: [ - { - path: ['author', 'address'], - data: { street: 'home' }, - }, - ], - hasNext: true, - }) - ), - }, - { - done: false, - value: Buffer.from(wrap({ hasNext: false }) + '--'), - }, - { done: true }, - ]; - let count = 0; - return { - cancel: function () { - cancelled = true; - }, - read: function () { - if (cancelled) throw new Error('No'); - - return Promise.resolve(results[count++]); - }, - }; - }, - }, - }); - - const AddressFragment = gql` - fragment addressFields on Address { - street - } - `; - - const streamedQueryOperation: Operation = makeOperation( - 'query', - { - query: gql` - query { - author { - id - address { - id - country - ...addressFields @defer - } - } - } - - ${AddressFragment} - `, - variables: {}, - key: 1, - }, - context - ); - - const chunks: OperationResult[] = await pipe( - makeFetchSource(streamedQueryOperation, 'https://test.com/graphql', {}), - scan((prev: OperationResult[], item) => [...prev, item], []), - toPromise - ); - - expect(chunks.length).toEqual(3); - - expect(chunks[0].data).toEqual({ - author: { - id: '1', - name: 'Steve', - address: { - country: 'UK', - __typename: 'Address', - }, - __typename: 'Author', - }, - }); - - expect(chunks[1].data).toEqual({ - author: { - id: '1', - name: 'Steve', - address: { - country: 'UK', - street: 'home', - __typename: 'Address', - }, - __typename: 'Author', - }, - }); - }); }); diff --git a/packages/core/src/internal/fetchSource.ts b/packages/core/src/internal/fetchSource.ts index b242371fda..c8e013facc 100644 --- a/packages/core/src/internal/fetchSource.ts +++ b/packages/core/src/internal/fetchSource.ts @@ -4,6 +4,7 @@ import { makeResult, makeErrorResult, mergeResultPatch } from '../utils'; const decoder = typeof TextDecoder !== 'undefined' ? new TextDecoder() : null; const boundaryHeaderRe = /boundary="?([^=";]+)"?/i; +const eventStreamRe = /data: ?([^\n]+)/; type ChunkData = Buffer | Uint8Array; @@ -17,7 +18,7 @@ const toString = (input: Buffer | ArrayBuffer): string => async function* streamBody(response: Response): AsyncIterableIterator { if (response.body![Symbol.asyncIterator]) { for await (const chunk of response.body! as any) - yield toString(chunk as ChunkData); + toString(chunk as ChunkData); } else { const reader = response.body!.getReader(); let result: ReadableStreamReadResult; @@ -29,43 +30,73 @@ async function* streamBody(response: Response): AsyncIterableIterator { } } +async function* split( + chunks: AsyncIterableIterator, + boundary: string +): AsyncIterableIterator { + let buffer = ''; + let boundaryIndex: number; + for await (const chunk of chunks) { + buffer += chunk; + while ((boundaryIndex = buffer.indexOf(boundary)) > -1) { + yield buffer.slice(0, boundaryIndex); + buffer = buffer.slice(boundaryIndex + boundary.length); + } + } +} + +async function* parseJSON( + response: Response +): AsyncIterableIterator { + yield JSON.parse(await response.text()); +} + +async function* parseEventStream( + response: Response +): AsyncIterableIterator { + let payload: any; + for await (const chunk of split(streamBody(response), '\n\n')) { + const match = chunk.match(eventStreamRe); + if (match) { + const chunk = match[1]; + try { + yield (payload = JSON.parse(chunk)); + } catch (error) { + if (!payload) throw error; + } + if (payload && !payload.hasNext) break; + } + } + if (payload && payload.hasNext) { + yield { hasNext: false }; + } +} + async function* parseMultipartMixed( contentType: string, response: Response ): AsyncIterableIterator { const boundaryHeader = contentType.match(boundaryHeaderRe); const boundary = '--' + (boundaryHeader ? boundaryHeader[1] : '-'); - - let buffer = ''; let isPreamble = true; - let boundaryIndex: number; let payload: any; - - chunks: for await (const chunk of streamBody(response)) { - buffer += chunk; - while ((boundaryIndex = buffer.indexOf(boundary)) > -1) { - if (isPreamble) { - isPreamble = false; - } else { - const chunk = buffer.slice( - buffer.indexOf('\r\n\r\n') + 4, - boundaryIndex - ); - - try { - yield (payload = JSON.parse(chunk)); - } catch (error) { - if (!payload) throw error; - } + for await (const chunk of split(streamBody(response), boundary)) { + if (isPreamble) { + isPreamble = false; + } else { + try { + yield (payload = JSON.parse( + chunk.slice(chunk.indexOf('\r\n\r\n') + 4) + )); + } catch (error) { + if (!payload) throw error; } - - buffer = buffer.slice(boundaryIndex + boundary.length); - if (buffer.startsWith('--') || (payload && !payload.hasNext)) - break chunks; } + if (payload && !payload.hasNext) break; + } + if (payload && payload.hasNext) { + yield { hasNext: false }; } - - if (payload && payload.hasNext) yield { hasNext: false }; } async function* fetchOperation( @@ -85,18 +116,22 @@ async function* fetchOperation( // Delay for a tick to give the Client a chance to cancel the request // if a teardown comes in immediately await Promise.resolve(); + response = await (operation.context.fetch || fetch)(url, fetchOptions); const contentType = response.headers.get('Content-Type') || ''; - if (/text\//i.test(contentType)) { - const text = await response.text(); - return yield makeErrorResult(operation, new Error(text), response); - } else if (!/multipart\/mixed/i.test(contentType)) { - const text = await response.text(); - return yield makeResult(operation, JSON.parse(text), response); + + let results: AsyncIterable; + if (/multipart\/mixed/i.test(contentType)) { + results = parseMultipartMixed(contentType, response); + } else if (/text\/event-stream/i.test(contentType)) { + results = parseEventStream(response); + } else if (!/text\//i.test(contentType)) { + results = parseJSON(response); + } else { + throw new Error(await response.text()); } - const iterator = parseMultipartMixed(contentType, response); - for await (const payload of iterator) { + for await (const payload of results) { yield (result = result ? mergeResultPatch(result, payload, response) : makeResult(operation, payload, response));