Skip to content

Commit

Permalink
fix(server): Respect completed subscriptions even if subscribe or `…
Browse files Browse the repository at this point in the history
…onOperation` didnt resolve yet
  • Loading branch information
enisdenjo committed Mar 25, 2021
1 parent b68d56c commit 4700154
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 3 deletions.
11 changes: 8 additions & 3 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,14 @@ export function makeServer<E = unknown>(options: ServerOptions<E>): Server<E> {

if (isAsyncIterable(operationResult)) {
/** multiple emitted results */
ctx.subscriptions[id] = operationResult;
for await (const result of operationResult) {
await emit.next(result, execArgs);
if (!(id in ctx.subscriptions)) {
// subscription was completed/canceled before the operation settled
operationResult.return?.();
} else {
ctx.subscriptions[id] = operationResult;
for await (const result of operationResult) {
await emit.next(result, execArgs);
}
}
} else {
/** single emitted result */
Expand Down
57 changes: 57 additions & 0 deletions src/tests/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,63 @@ describe('Subscribe', () => {
// terminate socket abruptly
client.ws.terminate();
});

it('should respect completed subscriptions even if subscribe operation stalls', async () => {
let continueSubscribe: (() => void) | undefined = undefined;
const server = await startTServer({
subscribe: async (...args) => {
await new Promise<void>((resolve) => (continueSubscribe = resolve));
return subscribe(...args);
},
});

const client = await createTClient(server.url);
client.ws.send(
stringifyMessage<MessageType.ConnectionInit>({
type: MessageType.ConnectionInit,
}),
);
await client.waitForMessage(); // ack

client.ws.send(
stringifyMessage<MessageType.Subscribe>({
id: '1',
type: MessageType.Subscribe,
payload: {
query: 'subscription { ping }',
},
}),
);

// wait for the subscribe lock
while (!continueSubscribe) {
await new Promise((resolve) => setTimeout(resolve, 10));
}

// send complete
client.ws.send(
stringifyMessage<MessageType.Complete>({
id: '1',
type: MessageType.Complete,
}),
);

// wait for complete message
for (const client of server.ws.clients) {
await new Promise((resolve) => client.once('message', resolve));
}

// then continue
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
continueSubscribe!();

// emit
server.pong();

await client.waitForMessage(() => {
fail("Shouldn't have received a message");
}, 30);
});
});

describe('Disconnect/close', () => {
Expand Down

0 comments on commit 4700154

Please sign in to comment.