From 3cf7d579ad647c2646b7db9f173bcb3dba69f817 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 29 Oct 2024 11:55:01 +0200 Subject: [PATCH 1/5] cancel execution despite pending resolvers --- src/execution/Canceller.ts | 52 ++++++ src/execution/IncrementalPublisher.ts | 3 + src/execution/__tests__/abort-signal-test.ts | 175 ++++++++++++++++--- src/execution/execute.ts | 51 +++--- 4 files changed, 233 insertions(+), 48 deletions(-) create mode 100644 src/execution/Canceller.ts diff --git a/src/execution/Canceller.ts b/src/execution/Canceller.ts new file mode 100644 index 0000000000..19f797c35b --- /dev/null +++ b/src/execution/Canceller.ts @@ -0,0 +1,52 @@ +import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; + +/** + * A Canceller object that can be used to cancel multiple promises + * using a single AbortSignal. + * + * @internal + */ +export class Canceller { + abortSignal: AbortSignal; + abort: () => void; + + private _aborts: Set<() => void>; + + constructor(abortSignal: AbortSignal) { + this.abortSignal = abortSignal; + this._aborts = new Set<() => void>(); + this.abort = () => { + for (const abort of this._aborts) { + abort(); + } + }; + + abortSignal.addEventListener('abort', this.abort); + } + + unsubscribe(): void { + this.abortSignal.removeEventListener('abort', this.abort); + } + + withCancellation(originalPromise: Promise): Promise { + if (this.abortSignal === undefined) { + return originalPromise; + } + + const { promise, resolve, reject } = promiseWithResolvers(); + const abort = () => reject(this.abortSignal.reason); + this._aborts.add(abort); + originalPromise.then( + (resolved) => { + this._aborts.delete(abort); + resolve(resolved); + }, + (error: unknown) => { + this._aborts.delete(abort); + reject(error); + }, + ); + + return promise; + } +} diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index d060ad2463..24ef905a7a 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -4,6 +4,7 @@ import { pathToArray } from '../jsutils/Path.js'; import type { GraphQLError } from '../error/GraphQLError.js'; +import type { Canceller } from './Canceller.js'; import { IncrementalGraph } from './IncrementalGraph.js'; import type { CancellableStreamRecord, @@ -43,6 +44,7 @@ export function buildIncrementalResponse( } interface IncrementalPublisherContext { + canceller: Canceller | undefined; cancellableStreams: Set | undefined; } @@ -171,6 +173,7 @@ class IncrementalPublisher { batch = await this._incrementalGraph.nextCompletedBatch(); } while (batch !== undefined); + this._context.canceller?.unsubscribe(); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; }; diff --git a/src/execution/__tests__/abort-signal-test.ts b/src/execution/__tests__/abort-signal-test.ts index ad9ba6c332..e25cbb3b38 100644 --- a/src/execution/__tests__/abort-signal-test.ts +++ b/src/execution/__tests__/abort-signal-test.ts @@ -9,7 +9,11 @@ import { parse } from '../../language/parser.js'; import { buildSchema } from '../../utilities/buildASTSchema.js'; -import { execute, experimentalExecuteIncrementally } from '../execute.js'; +import { + execute, + experimentalExecuteIncrementally, + subscribe, +} from '../execute.js'; import type { InitialIncrementalExecutionResult, SubsequentIncrementalExecutionResult, @@ -52,12 +56,17 @@ const schema = buildSchema(` type Query { todo: Todo + nonNullableTodo: Todo! } type Mutation { foo: String bar: String } + + type Subscription { + foo: String + } `); describe('Execute: Cancellation', () => { @@ -300,6 +309,97 @@ describe('Execute: Cancellation', () => { }); }); + it('should stop the execution when aborted despite a hanging resolver', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + todo { + id + author { + id + } + } + } + `); + + const resultPromise = execute({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + todo: () => + new Promise(() => { + /* will never resolve */ + }), + }, + }); + + abortController.abort(); + + const result = await resultPromise; + + expect(result.errors?.[0].originalError?.name).to.equal('AbortError'); + + expectJSON(result).toDeepEqual({ + data: { + todo: null, + }, + errors: [ + { + message: 'This operation was aborted', + path: ['todo'], + locations: [{ line: 3, column: 9 }], + }, + ], + }); + }); + + it('should stop the execution when aborted with proper null bubbling', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + nonNullableTodo { + id + author { + id + } + } + } + `); + + const resultPromise = execute({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + nonNullableTodo: async () => + Promise.resolve({ + id: '1', + text: 'Hello, World!', + /* c8 ignore next */ + author: () => expect.fail('Should not be called'), + }), + }, + }); + + abortController.abort(); + + const result = await resultPromise; + + expect(result.errors?.[0].originalError?.name).to.equal('AbortError'); + + expectJSON(result).toDeepEqual({ + data: null, + errors: [ + { + message: 'This operation was aborted', + path: ['nonNullableTodo'], + locations: [{ line: 3, column: 9 }], + }, + ], + }); + }); + it('should stop deferred execution when aborted', async () => { const abortController = new AbortController(); const document = parse(` @@ -353,14 +453,12 @@ describe('Execute: Cancellation', () => { const abortController = new AbortController(); const document = parse(` query { - todo { - id - ... on Todo @defer { + ... on Query @defer { + todo { + id text author { - ... on Author @defer { - id - } + id } } } @@ -382,41 +480,27 @@ describe('Execute: Cancellation', () => { abortController.signal, ); - await resolveOnNextTick(); - await resolveOnNextTick(); - await resolveOnNextTick(); - abortController.abort(); const result = await resultPromise; expectJSON(result).toDeepEqual([ { - data: { - todo: { - id: '1', - }, - }, - pending: [{ id: '0', path: ['todo'] }], + data: {}, + pending: [{ id: '0', path: [] }], hasNext: true, }, { incremental: [ { data: { - text: 'hello world', - author: null, + todo: null, }, errors: [ { - locations: [ - { - column: 13, - line: 7, - }, - ], message: 'This operation was aborted', - path: ['todo', 'author'], + path: ['todo'], + locations: [{ line: 4, column: 11 }], }, ], id: '0', @@ -448,6 +532,10 @@ describe('Execute: Cancellation', () => { }, }); + await resolveOnNextTick(); + await resolveOnNextTick(); + await resolveOnNextTick(); + abortController.abort(); const result = await resultPromise; @@ -498,4 +586,39 @@ describe('Execute: Cancellation', () => { ], }); }); + + it('should stop the execution when aborted during subscription', async () => { + const abortController = new AbortController(); + const document = parse(` + subscription { + foo + } + `); + + const resultPromise = subscribe({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + foo: async () => + new Promise(() => { + /* will never resolve */ + }), + }, + }); + + abortController.abort(); + + const result = await resultPromise; + + expectJSON(result).toDeepEqual({ + errors: [ + { + message: 'This operation was aborted', + path: ['foo'], + locations: [{ line: 3, column: 9 }], + }, + ], + }); + }); }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 7c06624414..7bf5dea2a3 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -50,6 +50,7 @@ import { assertValidSchema } from '../type/validate.js'; import type { DeferUsageSet, ExecutionPlan } from './buildExecutionPlan.js'; import { buildExecutionPlan } from './buildExecutionPlan.js'; +import { Canceller } from './Canceller.js'; import type { DeferUsage, FieldDetailsList, @@ -163,6 +164,7 @@ export interface ValidatedExecutionArgs { export interface ExecutionContext { validatedExecutionArgs: ValidatedExecutionArgs; errors: Array | undefined; + canceller: Canceller | undefined; cancellableStreams: Set | undefined; } @@ -310,9 +312,11 @@ export function executeQueryOrMutationOrSubscriptionEvent( export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( validatedExecutionArgs: ValidatedExecutionArgs, ): PromiseOrValue { + const abortSignal = validatedExecutionArgs.abortSignal; const exeContext: ExecutionContext = { validatedExecutionArgs, errors: undefined, + canceller: abortSignal ? new Canceller(abortSignal) : undefined, cancellableStreams: undefined, }; try { @@ -364,14 +368,18 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( if (isPromise(graphqlWrappedResult)) { return graphqlWrappedResult.then( (resolved) => buildDataResponse(exeContext, resolved), - (error: unknown) => ({ - data: null, - errors: withError(exeContext.errors, error as GraphQLError), - }), + (error: unknown) => { + exeContext.canceller?.unsubscribe(); + return { + data: null, + errors: withError(exeContext.errors, error as GraphQLError), + }; + }, ); } return buildDataResponse(exeContext, graphqlWrappedResult); } catch (error) { + exeContext.canceller?.unsubscribe(); return { data: null, errors: withError(exeContext.errors, error) }; } } @@ -462,6 +470,7 @@ function buildDataResponse( const { rawResult: data, incrementalDataRecords } = graphqlWrappedResult; const errors = exeContext.errors; if (incrementalDataRecords === undefined) { + exeContext.canceller?.unsubscribe(); return errors !== undefined ? { errors, data } : { data }; } @@ -660,11 +669,12 @@ function executeFieldsSerially( incrementalContext: IncrementalContext | undefined, deferMap: ReadonlyMap | undefined, ): PromiseOrValue>> { + const abortSignal = exeContext.validatedExecutionArgs.abortSignal; return promiseReduce( groupedFieldSet, (graphqlWrappedResult, [responseName, fieldDetailsList]) => { const fieldPath = addPath(path, responseName, parentType.name); - const abortSignal = exeContext.validatedExecutionArgs.abortSignal; + if (abortSignal?.aborted) { handleFieldError( abortSignal.reason, @@ -811,7 +821,7 @@ function executeField( incrementalContext: IncrementalContext | undefined, deferMap: ReadonlyMap | undefined, ): PromiseOrValue> | undefined { - const validatedExecutionArgs = exeContext.validatedExecutionArgs; + const { validatedExecutionArgs, canceller } = exeContext; const { schema, contextValue, variableValues, hideSuggestions, abortSignal } = validatedExecutionArgs; const fieldName = fieldDetailsList[0].node.name.value; @@ -856,7 +866,7 @@ function executeField( fieldDetailsList, info, path, - result, + canceller?.withCancellation(result) ?? result, incrementalContext, deferMap, ); @@ -1745,23 +1755,13 @@ function completeObjectValue( incrementalContext: IncrementalContext | undefined, deferMap: ReadonlyMap | undefined, ): PromiseOrValue>> { - const validatedExecutionArgs = exeContext.validatedExecutionArgs; - const abortSignal = validatedExecutionArgs.abortSignal; - if (abortSignal?.aborted) { - throw locatedError( - abortSignal.reason, - toNodes(fieldDetailsList), - pathToArray(path), - ); - } - // If there is an isTypeOf predicate function, call it with the // current result. If isTypeOf returns false, then raise an error rather // than continuing execution. if (returnType.isTypeOf) { const isTypeOf = returnType.isTypeOf( result, - validatedExecutionArgs.contextValue, + exeContext.validatedExecutionArgs.contextValue, info, ); @@ -2201,11 +2201,18 @@ function executeSubscription( const result = resolveFn(rootValue, args, contextValue, info, abortSignal); if (isPromise(result)) { - return result - .then(assertEventStream) - .then(undefined, (error: unknown) => { + const canceller = abortSignal ? new Canceller(abortSignal) : undefined; + const promise = canceller?.withCancellation(result) ?? result; + return promise.then(assertEventStream).then( + (resolved) => { + canceller?.unsubscribe(); + return resolved; + }, + (error: unknown) => { + canceller?.unsubscribe(); throw locatedError(error, fieldNodes, pathToArray(path)); - }); + }, + ); } return assertEventStream(result); From 9f634a8301628199d85f55eee28c4cd49b672934 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Wed, 30 Oct 2024 15:33:14 +0200 Subject: [PATCH 2/5] add todos --- src/execution/Canceller.ts | 4 ---- src/execution/IncrementalPublisher.ts | 3 +++ src/execution/execute.ts | 4 ++++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/execution/Canceller.ts b/src/execution/Canceller.ts index 19f797c35b..405a63e88a 100644 --- a/src/execution/Canceller.ts +++ b/src/execution/Canceller.ts @@ -29,10 +29,6 @@ export class Canceller { } withCancellation(originalPromise: Promise): Promise { - if (this.abortSignal === undefined) { - return originalPromise; - } - const { promise, resolve, reject } = promiseWithResolvers(); const abort = () => reject(this.abortSignal.reason); this._aborts.add(abort); diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 24ef905a7a..7619a51810 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -127,6 +127,7 @@ class IncrementalPublisher { IteratorResult > => { if (isDone) { + this._context.canceller?.unsubscribe(); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; } @@ -173,6 +174,8 @@ class IncrementalPublisher { batch = await this._incrementalGraph.nextCompletedBatch(); } while (batch !== undefined); + // TODO: add test for this case + /* c8 ignore next */ this._context.canceller?.unsubscribe(); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 7bf5dea2a3..6b21f1d96f 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -379,6 +379,8 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( } return buildDataResponse(exeContext, graphqlWrappedResult); } catch (error) { + // TODO: add test case for synchronous null bubbling to root with cancellation + /* c8 ignore next */ exeContext.canceller?.unsubscribe(); return { data: null, errors: withError(exeContext.errors, error) }; } @@ -2205,6 +2207,8 @@ function executeSubscription( const promise = canceller?.withCancellation(result) ?? result; return promise.then(assertEventStream).then( (resolved) => { + // TODO: add test case + /* c8 ignore next */ canceller?.unsubscribe(); return resolved; }, From 9550c71bf45133133feafbcf3ed38c3a9d0985c5 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Wed, 30 Oct 2024 22:43:30 +0200 Subject: [PATCH 3/5] feedback and additional tests --- src/execution/IncrementalPublisher.ts | 8 +-- .../{Canceller.ts => PromiseCanceller.ts} | 11 +++- .../__tests__/PromiseCanceller-test.ts | 56 +++++++++++++++++++ src/execution/execute.ts | 28 ++++++---- 4 files changed, 84 insertions(+), 19 deletions(-) rename src/execution/{Canceller.ts => PromiseCanceller.ts} (77%) create mode 100644 src/execution/__tests__/PromiseCanceller-test.ts diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 7619a51810..c5f9c4c2ca 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -4,8 +4,8 @@ import { pathToArray } from '../jsutils/Path.js'; import type { GraphQLError } from '../error/GraphQLError.js'; -import type { Canceller } from './Canceller.js'; import { IncrementalGraph } from './IncrementalGraph.js'; +import type { PromiseCanceller } from './PromiseCanceller.js'; import type { CancellableStreamRecord, CompletedExecutionGroup, @@ -44,7 +44,7 @@ export function buildIncrementalResponse( } interface IncrementalPublisherContext { - canceller: Canceller | undefined; + promiseCanceller: PromiseCanceller | undefined; cancellableStreams: Set | undefined; } @@ -127,7 +127,7 @@ class IncrementalPublisher { IteratorResult > => { if (isDone) { - this._context.canceller?.unsubscribe(); + this._context.promiseCanceller?.disconnect(); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; } @@ -176,7 +176,7 @@ class IncrementalPublisher { // TODO: add test for this case /* c8 ignore next */ - this._context.canceller?.unsubscribe(); + this._context.promiseCanceller?.disconnect(); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; }; diff --git a/src/execution/Canceller.ts b/src/execution/PromiseCanceller.ts similarity index 77% rename from src/execution/Canceller.ts rename to src/execution/PromiseCanceller.ts index 405a63e88a..60c3e3b6a3 100644 --- a/src/execution/Canceller.ts +++ b/src/execution/PromiseCanceller.ts @@ -1,12 +1,12 @@ import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; /** - * A Canceller object that can be used to cancel multiple promises + * A PromiseCanceller object can be used to cancel multiple promises * using a single AbortSignal. * * @internal */ -export class Canceller { +export class PromiseCanceller { abortSignal: AbortSignal; abort: () => void; @@ -24,11 +24,16 @@ export class Canceller { abortSignal.addEventListener('abort', this.abort); } - unsubscribe(): void { + disconnect(): void { this.abortSignal.removeEventListener('abort', this.abort); } withCancellation(originalPromise: Promise): Promise { + if (this.abortSignal.aborted) { + // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors + return Promise.reject(this.abortSignal.reason); + } + const { promise, resolve, reject } = promiseWithResolvers(); const abort = () => reject(this.abortSignal.reason); this._aborts.add(abort); diff --git a/src/execution/__tests__/PromiseCanceller-test.ts b/src/execution/__tests__/PromiseCanceller-test.ts new file mode 100644 index 0000000000..91fe6c40e5 --- /dev/null +++ b/src/execution/__tests__/PromiseCanceller-test.ts @@ -0,0 +1,56 @@ +import { describe, it } from 'mocha'; + +import { expectPromise } from '../../__testUtils__/expectPromise.js'; + +import { PromiseCanceller } from '../PromiseCanceller.js'; + +describe('PromiseCanceller', () => { + it('works to cancel an already resolved promise', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = Promise.resolve(1); + + const withCancellation = promiseCanceller.withCancellation(promise); + + abortController.abort(new Error('Cancelled!')); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to cancel a hanging promise', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + const withCancellation = promiseCanceller.withCancellation(promise); + + abortController.abort(new Error('Cancelled!')); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to cancel a hanging promise created after abort signal triggered', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + abortController.abort(new Error('Cancelled!')); + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + const withCancellation = promiseCanceller.withCancellation(promise); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); +}); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 6b21f1d96f..32ef0a0592 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -50,7 +50,6 @@ import { assertValidSchema } from '../type/validate.js'; import type { DeferUsageSet, ExecutionPlan } from './buildExecutionPlan.js'; import { buildExecutionPlan } from './buildExecutionPlan.js'; -import { Canceller } from './Canceller.js'; import type { DeferUsage, FieldDetailsList, @@ -64,6 +63,7 @@ import { import { getVariableSignature } from './getVariableSignature.js'; import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; +import { PromiseCanceller } from './PromiseCanceller.js'; import type { CancellableStreamRecord, CompletedExecutionGroup, @@ -164,7 +164,7 @@ export interface ValidatedExecutionArgs { export interface ExecutionContext { validatedExecutionArgs: ValidatedExecutionArgs; errors: Array | undefined; - canceller: Canceller | undefined; + promiseCanceller: PromiseCanceller | undefined; cancellableStreams: Set | undefined; } @@ -316,7 +316,9 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( const exeContext: ExecutionContext = { validatedExecutionArgs, errors: undefined, - canceller: abortSignal ? new Canceller(abortSignal) : undefined, + promiseCanceller: abortSignal + ? new PromiseCanceller(abortSignal) + : undefined, cancellableStreams: undefined, }; try { @@ -369,7 +371,7 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( return graphqlWrappedResult.then( (resolved) => buildDataResponse(exeContext, resolved), (error: unknown) => { - exeContext.canceller?.unsubscribe(); + exeContext.promiseCanceller?.disconnect(); return { data: null, errors: withError(exeContext.errors, error as GraphQLError), @@ -381,7 +383,7 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( } catch (error) { // TODO: add test case for synchronous null bubbling to root with cancellation /* c8 ignore next */ - exeContext.canceller?.unsubscribe(); + exeContext.promiseCanceller?.disconnect(); return { data: null, errors: withError(exeContext.errors, error) }; } } @@ -472,7 +474,7 @@ function buildDataResponse( const { rawResult: data, incrementalDataRecords } = graphqlWrappedResult; const errors = exeContext.errors; if (incrementalDataRecords === undefined) { - exeContext.canceller?.unsubscribe(); + exeContext.promiseCanceller?.disconnect(); return errors !== undefined ? { errors, data } : { data }; } @@ -823,7 +825,7 @@ function executeField( incrementalContext: IncrementalContext | undefined, deferMap: ReadonlyMap | undefined, ): PromiseOrValue> | undefined { - const { validatedExecutionArgs, canceller } = exeContext; + const { validatedExecutionArgs, promiseCanceller } = exeContext; const { schema, contextValue, variableValues, hideSuggestions, abortSignal } = validatedExecutionArgs; const fieldName = fieldDetailsList[0].node.name.value; @@ -868,7 +870,7 @@ function executeField( fieldDetailsList, info, path, - canceller?.withCancellation(result) ?? result, + promiseCanceller?.withCancellation(result) ?? result, incrementalContext, deferMap, ); @@ -2203,17 +2205,19 @@ function executeSubscription( const result = resolveFn(rootValue, args, contextValue, info, abortSignal); if (isPromise(result)) { - const canceller = abortSignal ? new Canceller(abortSignal) : undefined; - const promise = canceller?.withCancellation(result) ?? result; + const promiseCanceller = abortSignal + ? new PromiseCanceller(abortSignal) + : undefined; + const promise = promiseCanceller?.withCancellation(result) ?? result; return promise.then(assertEventStream).then( (resolved) => { // TODO: add test case /* c8 ignore next */ - canceller?.unsubscribe(); + promiseCanceller?.disconnect(); return resolved; }, (error: unknown) => { - canceller?.unsubscribe(); + promiseCanceller?.disconnect(); throw locatedError(error, fieldNodes, pathToArray(path)); }, ); From 3dd9fb241efc749876c0c0c2294ce11c13ba5264 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Wed, 30 Oct 2024 22:46:49 +0200 Subject: [PATCH 4/5] add additional cancellation sites --- src/execution/execute.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 32ef0a0592..5e2f718176 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1578,7 +1578,7 @@ function completeListItemValue( } async function completePromisedListItemValue( - item: unknown, + item: Promise, parent: GraphQLWrappedResult>, exeContext: ExecutionContext, itemType: GraphQLOutputType, @@ -1589,7 +1589,9 @@ async function completePromisedListItemValue( deferMap: ReadonlyMap | undefined, ): Promise { try { - const resolved = await item; + const resolved = await (exeContext.promiseCanceller?.withCancellation( + item, + ) ?? item); let completed = completeValue( exeContext, itemType, @@ -2581,7 +2583,7 @@ function completeStreamItem( fieldDetailsList, info, itemPath, - item, + exeContext.promiseCanceller?.withCancellation(item) ?? item, incrementalContext, new Map(), ).then( From 77919393371d5c69dd3a4fea3f3e4a3a79f61b6d Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Wed, 30 Oct 2024 23:29:34 +0200 Subject: [PATCH 5/5] add tests --- src/execution/__tests__/abort-signal-test.ts | 118 +++++++++++++++++-- 1 file changed, 108 insertions(+), 10 deletions(-) diff --git a/src/execution/__tests__/abort-signal-test.ts b/src/execution/__tests__/abort-signal-test.ts index e25cbb3b38..d12253b517 100644 --- a/src/execution/__tests__/abort-signal-test.ts +++ b/src/execution/__tests__/abort-signal-test.ts @@ -45,7 +45,7 @@ async function complete( const schema = buildSchema(` type Todo { id: ID - text: String + items: [String] author: User } @@ -91,7 +91,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'Hello, World!', /* c8 ignore next */ author: () => expect.fail('Should not be called'), }), @@ -186,7 +185,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'Hello, World!', /* c8 ignore next */ author: () => expect.fail('Should not be called'), }), @@ -235,7 +233,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'Hello, World!', /* c8 ignore next */ author: () => expect.fail('Should not be called'), }), @@ -280,7 +277,6 @@ describe('Execute: Cancellation', () => { rootValue: { todo: { id: '1', - text: 'Hello, World!', /* c8 ignore next 3 */ author: async () => Promise.resolve(() => expect.fail('Should not be called')), @@ -354,6 +350,56 @@ describe('Execute: Cancellation', () => { }); }); + it('should stop the execution when aborted despite a hanging item', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + todo { + id + items + } + } + `); + + const resultPromise = execute({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + todo: () => ({ + id: '1', + items: [ + new Promise(() => { + /* will never resolve */ + }), + ], + }), + }, + }); + + abortController.abort(); + + const result = await resultPromise; + + expect(result.errors?.[0].originalError?.name).to.equal('AbortError'); + + expectJSON(result).toDeepEqual({ + data: { + todo: { + id: '1', + items: [null], + }, + }, + errors: [ + { + message: 'This operation was aborted', + path: ['todo', 'items', 0], + locations: [{ line: 5, column: 11 }], + }, + ], + }); + }); + it('should stop the execution when aborted with proper null bubbling', async () => { const abortController = new AbortController(); const document = parse(` @@ -375,7 +421,6 @@ describe('Execute: Cancellation', () => { nonNullableTodo: async () => Promise.resolve({ id: '1', - text: 'Hello, World!', /* c8 ignore next */ author: () => expect.fail('Should not be called'), }), @@ -407,7 +452,6 @@ describe('Execute: Cancellation', () => { todo { id ... on Todo @defer { - text author { id } @@ -423,7 +467,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'hello world', /* c8 ignore next */ author: () => expect.fail('Should not be called'), }), @@ -456,7 +499,6 @@ describe('Execute: Cancellation', () => { ... on Query @defer { todo { id - text author { id } @@ -471,7 +513,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'hello world', /* c8 ignore next 2 */ author: async () => Promise.resolve(() => expect.fail('Should not be called')), @@ -512,6 +553,63 @@ describe('Execute: Cancellation', () => { ]); }); + it('should stop streamed execution when aborted', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + todo { + id + items @stream + } + } + `); + + const resultPromise = complete( + document, + { + todo: { + id: '1', + items: [Promise.resolve('item')], + }, + }, + abortController.signal, + ); + + abortController.abort(); + + const result = await resultPromise; + + expectJSON(result).toDeepEqual([ + { + data: { + todo: { + id: '1', + items: [], + }, + }, + pending: [{ id: '0', path: ['todo', 'items'] }], + hasNext: true, + }, + { + incremental: [ + { + items: [null], + errors: [ + { + message: 'This operation was aborted', + path: ['todo', 'items', 0], + locations: [{ line: 5, column: 11 }], + }, + ], + id: '0', + }, + ], + completed: [{ id: '0' }], + hasNext: false, + }, + ]); + }); + it('should stop the execution when aborted mid-mutation', async () => { const abortController = new AbortController(); const document = parse(`