From ac57dc833c9523b0af8e6fe4e1fcf0d41beb83de Mon Sep 17 00:00:00 2001 From: Andy Richardson Date: Tue, 17 Aug 2021 22:10:22 +0100 Subject: [PATCH 1/3] Add @defer and @stream directive support for queries+mutations --- src/messages/subscribe.ts | 31 ++++++++++++++++++++++--------- src/utils/index.ts | 1 + src/utils/isAsyncIterable.ts | 4 ++++ 3 files changed, 27 insertions(+), 9 deletions(-) create mode 100644 src/utils/isAsyncIterable.ts diff --git a/src/messages/subscribe.ts b/src/messages/subscribe.ts index 922a8dd..6dda09d 100644 --- a/src/messages/subscribe.ts +++ b/src/messages/subscribe.ts @@ -1,5 +1,11 @@ import { SubscribeMessage, MessageType } from 'graphql-ws'; -import { validate, parse, execute, GraphQLError } from 'graphql'; +import { + validate, + parse, + execute, + GraphQLError, + ExecutionResult, +} from 'graphql'; import { buildExecutionContext, assertValidExecutionArguments, @@ -9,6 +15,7 @@ import { constructContext, deleteConnection, getResolverAndArgs, + isAsyncIterable, promisify, sendMessage, } from '../utils'; @@ -80,14 +87,20 @@ export const subscribe: MessageHandler = undefined ); - await sendMessage({ - ...event.requestContext, - message: { - type: MessageType.Next, - id: message.id, - payload: result, - }, - }); + // Support for @defer and @stream directives + const parts = isAsyncIterable(result) + ? result + : [result]; + for await (let part of parts) { + await sendMessage({ + ...event.requestContext, + message: { + type: MessageType.Next, + id: message.id, + payload: part, + }, + }); + } await sendMessage({ ...event.requestContext, diff --git a/src/utils/index.ts b/src/utils/index.ts index 2346c94..fade44c 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -1,4 +1,5 @@ export * from './aws'; export * from './date'; +export * from './isAsyncIterable'; export * from './promise'; export * from './graphql'; diff --git a/src/utils/isAsyncIterable.ts b/src/utils/isAsyncIterable.ts new file mode 100644 index 0000000..65a12c4 --- /dev/null +++ b/src/utils/isAsyncIterable.ts @@ -0,0 +1,4 @@ +export const isAsyncIterable = (arg: any): arg is AsyncIterable => + arg !== null && + typeof arg == 'object' && + typeof arg[Symbol.asyncIterator] === 'function'; From 98525a9ac0d7aeab07ed35b2fe559a24a7e7a487 Mon Sep 17 00:00:00 2001 From: Andy Richardson Date: Tue, 17 Aug 2021 22:12:09 +0100 Subject: [PATCH 2/3] Add defer support for subscription resolvers --- src/pubsub/publish.ts | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/pubsub/publish.ts b/src/pubsub/publish.ts index 9d9c0c8..8de04e2 100644 --- a/src/pubsub/publish.ts +++ b/src/pubsub/publish.ts @@ -3,11 +3,11 @@ import { equals, ConditionExpression, } from '@aws/dynamodb-expressions'; -import { parse, execute } from 'graphql'; +import { parse, execute, ExecutionResult } from 'graphql'; import { MessageType } from 'graphql-ws'; import { assign, Subscription } from '../model'; import { ServerClosure } from '../types'; -import { constructContext, sendMessage } from '../utils'; +import { constructContext, isAsyncIterable, sendMessage } from '../utils'; type PubSubEvent = { topic: string; @@ -27,14 +27,18 @@ export const publish = (c: ServerClosure) => async (event: PubSubEvent) => { undefined ); - await sendMessage({ - ...sub.requestContext, - message: { - id: sub.subscriptionId, - type: MessageType.Next, - payload: await result, - }, - }); + // Support for @defer and @stream directives + const parts = isAsyncIterable(result) ? result : [result]; + for await (let part of parts) { + await sendMessage({ + ...sub.requestContext, + message: { + id: sub.subscriptionId, + type: MessageType.Next, + payload: part, + }, + }); + } }); return await Promise.all(iters); }; From 91462e06e68e9b8c5a6e9672c7b430b018843e5d Mon Sep 17 00:00:00 2001 From: Andy Richardson Date: Tue, 17 Aug 2021 23:22:31 +0100 Subject: [PATCH 3/3] Update execute implementation for v16 support --- src/messages/subscribe.ts | 18 ++++++++---------- src/pubsub/publish.ts | 19 +++++++++---------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/messages/subscribe.ts b/src/messages/subscribe.ts index 6dda09d..27f34cf 100644 --- a/src/messages/subscribe.ts +++ b/src/messages/subscribe.ts @@ -52,10 +52,10 @@ export const subscribe: MessageHandler = } const contextValue = await constructContext(c)({ connectionParams }); - + const query = parse(message.payload.query); const execContext = buildExecutionContext( c.schema, - parse(message.payload.query), + query, undefined, contextValue, message.payload.variables, @@ -77,15 +77,13 @@ export const subscribe: MessageHandler = } if (execContext.operation.operation !== 'subscription') { - const result = await execute( - c.schema, - parse(message.payload.query), - undefined, + const result = await execute({ + schema: c.schema, + document: query, contextValue, - message.payload.variables, - message.payload.operationName, - undefined - ); + variableValues: message.payload.variables, + operationName: message.payload.operationName, + }); // Support for @defer and @stream directives const parts = isAsyncIterable(result) diff --git a/src/pubsub/publish.ts b/src/pubsub/publish.ts index 8de04e2..ad10d97 100644 --- a/src/pubsub/publish.ts +++ b/src/pubsub/publish.ts @@ -5,7 +5,7 @@ import { } from '@aws/dynamodb-expressions'; import { parse, execute, ExecutionResult } from 'graphql'; import { MessageType } from 'graphql-ws'; -import { assign, Subscription } from '../model'; +import { Subscription } from '../model'; import { ServerClosure } from '../types'; import { constructContext, isAsyncIterable, sendMessage } from '../utils'; @@ -17,15 +17,14 @@ type PubSubEvent = { export const publish = (c: ServerClosure) => async (event: PubSubEvent) => { const subscriptions = await getFilteredSubs(c)(event); const iters = subscriptions.map(async (sub) => { - const result = execute( - c.schema, - parse(sub.subscription.query), - event, - await constructContext(c)(sub), - sub.subscription.variables, - sub.subscription.operationName, - undefined - ); + const result = execute({ + schema: c.schema, + document: parse(sub.subscription.query), + rootValue: event, + contextValue: await constructContext(c)(sub), + variableValues: sub.subscription.variables, + operationName: sub.subscription.operationName, + }); // Support for @defer and @stream directives const parts = isAsyncIterable(result) ? result : [result];