Skip to content

Commit

Permalink
feat: implement subscription operation
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Aug 16, 2020
1 parent 1889fff commit 665b16d
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 10 deletions.
95 changes: 90 additions & 5 deletions src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
parse,
validate,
getOperationAST,
subscribe,
GraphQLError,
} from 'graphql';
import { Disposable } from '../types';
import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../protocol';
Expand All @@ -28,8 +30,10 @@ import {
import {
Optional,
isObject,
isAsyncIterable,
hasOwnObjectProperty,
hasOwnStringProperty,
noop,
} from '../utils';

export interface ServerOptions {
Expand Down Expand Up @@ -140,6 +144,12 @@ export interface Context {
*/
acknowledged: boolean;
connectionParams?: Readonly<Record<string, unknown>>;
/**
* Holds the active subscriptions for this context.
* Subscriptions are for `subscription` operations **only**,
* other operations (`query`/`mutation`) are resolved immediately.
*/
subscriptions: Record<string, AsyncIterator<unknown>>;
}

export interface Server extends Disposable {
Expand Down Expand Up @@ -182,7 +192,12 @@ export function createServer(
}

const ctxRef: { current: Context } = {
current: { socket, connectionInitReceived: false, acknowledged: false },
current: {
socket,
connectionInitReceived: false,
acknowledged: false,
subscriptions: {},
},
};

// kick the client off (close socket) if the connection has
Expand Down Expand Up @@ -211,7 +226,11 @@ export function createServer(
ctxRef.current.socket.close(1011, errorOrClose.message);
}

// TODO-db-200702 close all active subscriptions
Object.entries(ctxRef.current.subscriptions).forEach(
([, subscription]) => {
(subscription.return || noop)();
},
);
}

socket.onerror = errorOrCloseHandler;
Expand Down Expand Up @@ -335,7 +354,65 @@ export function createServer(
throw new Error('Unable to get operation AST');
}
if (operationAST.operation === 'subscription') {
// TODO-db-200808 implement subscription operation
const subscriptionOrResult = await subscribe(execArgs);
if (isAsyncIterable(subscriptionOrResult)) {
ctx.subscriptions[message.id] = subscriptionOrResult;

try {
for await (let result of subscriptionOrResult) {
if (formatExecutionResult) {
result = await formatExecutionResult(ctx, result);
}
await sendMessage<MessageType.Next>(ctx, {
id: message.id,
type: MessageType.Next,
payload: result,
});
}

const completeMessage: CompleteMessage = {
id: message.id,
type: MessageType.Complete,
};
await sendMessage<MessageType.Complete>(ctx, completeMessage);
if (onComplete) {
onComplete(ctx, completeMessage);
}
} catch (err) {
await sendMessage<MessageType.Error>(ctx, {
id: message.id,
type: MessageType.Error,
payload: [
new GraphQLError(
err instanceof Error
? err.message
: new Error(err).message,
),
],
});
} finally {
delete ctx.subscriptions[message.id];
}
} else {
let result = subscriptionOrResult;
if (formatExecutionResult) {
result = await formatExecutionResult(ctx, result);
}
await sendMessage<MessageType.Next>(ctx, {
id: message.id,
type: MessageType.Next,
payload: result,
});

const completeMessage: CompleteMessage = {
id: message.id,
type: MessageType.Complete,
};
await sendMessage<MessageType.Complete>(ctx, completeMessage);
if (onComplete) {
onComplete(ctx, completeMessage);
}
}
} else {
// operationAST.operation === 'query' || 'mutation'

Expand All @@ -360,8 +437,16 @@ export function createServer(
}
break;
}

// TODO-db-200808 handle other message types
case MessageType.Complete: {
if (ctx.subscriptions[message.id]) {
await (ctx.subscriptions[message.id].return ?? noop)();
}
break;
}
default:
throw new Error(
`Unexpected message of type ${message.type} received`,
);
}
} catch (err) {
ctx.socket.close(4400, err.message);
Expand Down
69 changes: 64 additions & 5 deletions src/tests/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,10 @@ const schema = new GraphQLSchema({
subscription: new GraphQLObjectType({
name: 'Subscription',
fields: {
person: {
becameHappy: {
type: personType,
args: {
id: { type: GraphQLString },
},
subscribe: () => {
return pubsub.asyncIterator('person');
return pubsub.asyncIterator('becameHappy');
},
},
},
Expand Down Expand Up @@ -694,4 +691,66 @@ describe('Subscribe', () => {
// socket shouldnt close or error because of GraphQL errors
expect(closeOrErrorFn).not.toBeCalled();
});

it('should execute the subscription and "next" the published payload', async () => {
expect.assertions(1);

await makeServer({
schema,
});

const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL);
client.onopen = () => {
client.send(
stringifyMessage<MessageType.ConnectionInit>({
type: MessageType.ConnectionInit,
}),
);
};

client.onmessage = ({ data }) => {
const message = parseMessage(data);
switch (message.type) {
case MessageType.ConnectionAck: {
client.send(
stringifyMessage<MessageType.Subscribe>({
id: '1',
type: MessageType.Subscribe,
payload: {
operationName: 'BecomingHappy',
query: `subscription BecomingHappy {
becameHappy {
name
}
}`,
variables: {},
},
}),
() =>
setTimeout(
() =>
pubsub.publish('becameHappy', {
becameHappy: {
name: 'john',
},
}),
0,
),
);
break;
}
case MessageType.Next:
expect(message).toEqual({
id: '1',
type: MessageType.Next,
payload: { data: { becameHappy: { name: 'john' } } },
});
break;
default:
fail(`Not supposed to receive a message of type ${message.type}`);
}
};

await wait(20);
});
});
10 changes: 10 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ export function isArray(val: unknown): val is unknown[] {
return typeof val === 'object' && val !== null && Array.isArray(val);
}

export function isAsyncIterable<T = unknown>(
val: unknown,
): val is AsyncIterableIterator<T> {
return typeof Object(val)[Symbol.asyncIterator] === 'function';
}

export function hasOwnProperty<
O extends Record<PropertyKey, unknown>,
P extends PropertyKey
Expand Down Expand Up @@ -45,3 +51,7 @@ export function hasOwnStringProperty<
typeof obj[prop] === 'string'
);
}

export function noop(): void {
/**/
}

0 comments on commit 665b16d

Please sign in to comment.