Skip to content

Commit

Permalink
Add support for @defer and @stream (#23)
Browse files Browse the repository at this point in the history
* Add @defer and @stream directive support for queries+mutations

* Add defer support for subscription resolvers

* Update execute implementation for v16 support
  • Loading branch information
andyrichardson authored Aug 17, 2021
1 parent dc0c10d commit 1bc1618
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 39 deletions.
49 changes: 30 additions & 19 deletions src/messages/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { SubscribeMessage, MessageType } from 'graphql-ws';
import { validate, parse, execute, GraphQLError } from 'graphql';
import {
validate,
parse,
execute,
GraphQLError,
ExecutionResult,
} from 'graphql';
import {
buildExecutionContext,
assertValidExecutionArguments,
Expand All @@ -9,6 +15,7 @@ import {
constructContext,
deleteConnection,
getResolverAndArgs,
isAsyncIterable,
promisify,
sendMessage,
} from '../utils';
Expand Down Expand Up @@ -45,10 +52,10 @@ export const subscribe: MessageHandler<SubscribeMessage> =
}

const contextValue = await constructContext(c)({ connectionParams });

const query = parse(message.payload.query);
const execContext = buildExecutionContext(
c.schema,
parse(message.payload.query),
query,
undefined,
contextValue,
message.payload.variables,
Expand All @@ -70,25 +77,29 @@ export const subscribe: MessageHandler<SubscribeMessage> =
}

if (execContext.operation.operation !== 'subscription') {
const result = await execute(
c.schema,
parse(message.payload.query),
undefined,
const result = await execute({
schema: c.schema,
document: query,
contextValue,
message.payload.variables,
message.payload.operationName,
undefined
);

await sendMessage({
...event.requestContext,
message: {
type: MessageType.Next,
id: message.id,
payload: result,
},
variableValues: message.payload.variables,
operationName: message.payload.operationName,
});

// Support for @defer and @stream directives
const parts = isAsyncIterable<ExecutionResult>(result)
? result
: [result];
for await (let part of parts) {
await sendMessage({
...event.requestContext,
message: {
type: MessageType.Next,
id: message.id,
payload: part,
},
});
}

await sendMessage({
...event.requestContext,
message: {
Expand Down
43 changes: 23 additions & 20 deletions src/pubsub/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import {
equals,
ConditionExpression,
} from '@aws/dynamodb-expressions';
import { parse, execute } from 'graphql';
import { parse, execute, ExecutionResult } from 'graphql';
import { MessageType } from 'graphql-ws';
import { assign, Subscription } from '../model';
import { Subscription } from '../model';
import { ServerClosure } from '../types';
import { constructContext, sendMessage } from '../utils';
import { constructContext, isAsyncIterable, sendMessage } from '../utils';

type PubSubEvent = {
topic: string;
Expand All @@ -17,24 +17,27 @@ type PubSubEvent = {
export const publish = (c: ServerClosure) => async (event: PubSubEvent) => {
const subscriptions = await getFilteredSubs(c)(event);
const iters = subscriptions.map(async (sub) => {
const result = execute(
c.schema,
parse(sub.subscription.query),
event,
await constructContext(c)(sub),
sub.subscription.variables,
sub.subscription.operationName,
undefined
);

await sendMessage({
...sub.requestContext,
message: {
id: sub.subscriptionId,
type: MessageType.Next,
payload: await result,
},
const result = execute({
schema: c.schema,
document: parse(sub.subscription.query),
rootValue: event,
contextValue: await constructContext(c)(sub),
variableValues: sub.subscription.variables,
operationName: sub.subscription.operationName,
});

// Support for @defer and @stream directives
const parts = isAsyncIterable<ExecutionResult>(result) ? result : [result];
for await (let part of parts) {
await sendMessage({
...sub.requestContext,
message: {
id: sub.subscriptionId,
type: MessageType.Next,
payload: part,
},
});
}
});
return await Promise.all(iters);
};
Expand Down
1 change: 1 addition & 0 deletions src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './aws';
export * from './date';
export * from './isAsyncIterable';
export * from './promise';
export * from './graphql';
4 changes: 4 additions & 0 deletions src/utils/isAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export const isAsyncIterable = <T>(arg: any): arg is AsyncIterable<T> =>
arg !== null &&
typeof arg == 'object' &&
typeof arg[Symbol.asyncIterator] === 'function';

0 comments on commit 1bc1618

Please sign in to comment.