diff --git a/PROTOCOL.md b/PROTOCOL.md index 2bcc83bd..08b4836c 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -116,7 +116,7 @@ Direction: **bidirectional** - **Server -> Client** indicates that the requested operation execution has completed. If the server dispatched the `Error` message relative to the original `Subscribe` message, no `Complete` message will be emitted. -- **Client -> Server** indicates that the client has stopped listening and wants to complete the source stream. No further events, relevant to the original subscription, should be sent through. +- **Client -> Server** indicates that the client has stopped listening and wants to complete the subscription. No further events, relevant to the original subscription, should be sent through. Even if the client completed a single result operation before it resolved, the result should not be sent through once it does. ```typescript interface CompleteMessage { diff --git a/src/server.ts b/src/server.ts index 000a3b16..50dfc7f9 100644 --- a/src/server.ts +++ b/src/server.ts @@ -629,7 +629,10 @@ export function makeServer(options: ServerOptions): Server { } } else { /** single emitted result */ - await emit.next(operationResult, execArgs); + // if the client completed the subscription before the single result + // became available, he effectively canceled it and no data should be sent + if (id in ctx.subscriptions) + await emit.next(operationResult, execArgs); } // lack of subscription at this point indicates that the client @@ -640,7 +643,7 @@ export function makeServer(options: ServerOptions): Server { } case MessageType.Complete: { await ctx.subscriptions[message.id]?.return?.(); - delete ctx.subscriptions[message.id]; // deleting the subscription means no further action + delete ctx.subscriptions[message.id]; // deleting the subscription means no further activity should take place break; } default: diff --git a/src/tests/server.ts b/src/tests/server.ts index 56d31f00..d106f057 100644 --- a/src/tests/server.ts +++ b/src/tests/server.ts @@ -5,6 +5,7 @@ import { subscribe, GraphQLError, ExecutionArgs, + ExecutionResult, } from 'graphql'; import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../protocol'; import { MessageType, parseMessage, stringifyMessage } from '../message'; @@ -943,6 +944,69 @@ describe('Subscribe', () => { }); }); + it('should be able to complete a long running query before the result becomes available', async () => { + let resultIsHere = (_result: ExecutionResult) => { + /* noop for calming typescript */ + }, + execute = () => { + /* noop for calming typescript */ + }; + const waitForExecute = new Promise((resolve) => (execute = resolve)); + + const { url, ws } = await startTServer({ + schema, + execute: () => + new Promise((resolve) => { + resultIsHere = resolve; + execute(); + }), + }); + + const client = await createTClient(url); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + query: 'query { getValue }', + }, + }), + ); + }); + + await waitForExecute; + + // complete before resolve + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Complete, + }), + ); + + // will be just one client and the only next message can be "complete" + for (const client of ws.clients) { + await new Promise((resolve) => + client.once('message', () => resolve()), + ); + } + + // result became available after complete + resultIsHere({ data: { getValue: 'nope' } }); + + await client.waitForMessage(() => { + fail('No further activity expected after complete'); + }, 30); + }); + it('should execute the query and "error" out because of validation errors', async () => { const { url } = await startTServer({ schema,