Skip to content

Commit

Permalink
fix(server): Operation result can be async generator or iterable
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Sep 8, 2021
1 parent d4d5e3f commit b1fb883
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ function subscribe<T>(payload: SubscribePayload): AsyncIterableIterator<T> {
? { done: true, value: undefined }
: { value: pending.shift()! };
},
async throw(err) {
throw err
}
async return() {
dispose();
return { done: true, value: undefined };
Expand Down
33 changes: 23 additions & 10 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,22 @@ import {
PingMessage,
PongMessage,
} from './common';
import { isObject, isAsyncIterable, areGraphQLErrors } from './utils';
import {
isObject,
isAsyncGenerator,
isAsyncIterable,
areGraphQLErrors,
} from './utils';

/** @category Server */
export type OperationResult =
| Promise<AsyncIterableIterator<ExecutionResult> | ExecutionResult>
| AsyncIterableIterator<ExecutionResult>
| Promise<
| AsyncGenerator<ExecutionResult>
| AsyncIterable<ExecutionResult>
| ExecutionResult
>
| AsyncGenerator<ExecutionResult>
| AsyncIterable<ExecutionResult>
| ExecutionResult;

/**
Expand Down Expand Up @@ -488,7 +498,10 @@ export interface Context<E = unknown> {
* a reservation, meaning - the operation resolves to a single result or is still
* pending/being prepared.
*/
readonly subscriptions: Record<ID, AsyncIterator<unknown> | null>;
readonly subscriptions: Record<
ID,
AsyncGenerator<unknown> | AsyncIterable<unknown> | null
>;
/**
* An extra field where you can store your own context values
* to pass between callbacks.
Expand Down Expand Up @@ -769,8 +782,8 @@ export function makeServer<E = unknown>(options: ServerOptions<E>): Server<E> {
/** multiple emitted results */
if (!(id in ctx.subscriptions)) {
// subscription was completed/canceled before the operation settled
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
operationResult.return!(); // iterator must implement the return method
if (isAsyncGenerator(operationResult))
operationResult.return(undefined);
} else {
ctx.subscriptions[id] = operationResult;
for await (const result of operationResult) {
Expand All @@ -792,8 +805,9 @@ export function makeServer<E = unknown>(options: ServerOptions<E>): Server<E> {
return;
}
case MessageType.Complete: {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await ctx.subscriptions[message.id]?.return!(); // iterator must implement the return method
const subscription = ctx.subscriptions[message.id];
if (isAsyncGenerator(subscription))
await subscription.return(undefined);
delete ctx.subscriptions[message.id]; // deleting the subscription means no further activity should take place
return;
}
Expand All @@ -808,8 +822,7 @@ export function makeServer<E = unknown>(options: ServerOptions<E>): Server<E> {
return async (code, reason) => {
if (connectionInitWait) clearTimeout(connectionInitWait);
for (const sub of Object.values(ctx.subscriptions)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await sub?.return!(); // iterator must implement the return method
if (isAsyncGenerator(sub)) await sub.return(undefined);
}
if (ctx.acknowledged) await onDisconnect?.(ctx, code, reason);
await onClose?.(ctx, code, reason);
Expand Down
16 changes: 15 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,24 @@ export function isObject(val: unknown): val is Record<PropertyKey, unknown> {
/** @private */
export function isAsyncIterable<T = unknown>(
val: unknown,
): val is AsyncIterableIterator<T> {
): val is AsyncIterable<T> {
return typeof Object(val)[Symbol.asyncIterator] === 'function';
}

/** @private */
export function isAsyncGenerator<T = unknown>(
val: unknown,
): val is AsyncGenerator<T> {
return (
isObject(val) &&
typeof val[Symbol.asyncIterator] === 'function' &&
typeof val.return === 'function'
// for lazy ones, we only need the return anyway
// typeof val.throw === 'function' &&
// typeof val.next === 'function'
);
}

/** @private */
export function areGraphQLErrors(obj: unknown): obj is readonly GraphQLError[] {
return (
Expand Down

0 comments on commit b1fb883

Please sign in to comment.