Skip to content

Commit

Permalink
cancel execution despite pending resolvers
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Oct 30, 2024
1 parent d59c725 commit 3cf7d57
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 48 deletions.
52 changes: 52 additions & 0 deletions src/execution/Canceller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

/**
* A Canceller object that can be used to cancel multiple promises
* using a single AbortSignal.
*
* @internal
*/
export class Canceller {
abortSignal: AbortSignal;
abort: () => void;

private _aborts: Set<() => void>;

constructor(abortSignal: AbortSignal) {
this.abortSignal = abortSignal;
this._aborts = new Set<() => void>();
this.abort = () => {
for (const abort of this._aborts) {
abort();
}
};

abortSignal.addEventListener('abort', this.abort);
}

unsubscribe(): void {
this.abortSignal.removeEventListener('abort', this.abort);
}

withCancellation<T>(originalPromise: Promise<T>): Promise<T> {
if (this.abortSignal === undefined) {
return originalPromise;
}

const { promise, resolve, reject } = promiseWithResolvers<T>();
const abort = () => reject(this.abortSignal.reason);
this._aborts.add(abort);
originalPromise.then(
(resolved) => {
this._aborts.delete(abort);
resolve(resolved);
},
(error: unknown) => {
this._aborts.delete(abort);
reject(error);
},
);

return promise;
}
}
3 changes: 3 additions & 0 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { pathToArray } from '../jsutils/Path.js';

import type { GraphQLError } from '../error/GraphQLError.js';

import type { Canceller } from './Canceller.js';
import { IncrementalGraph } from './IncrementalGraph.js';
import type {
CancellableStreamRecord,
Expand Down Expand Up @@ -43,6 +44,7 @@ export function buildIncrementalResponse(
}

interface IncrementalPublisherContext {
canceller: Canceller | undefined;
cancellableStreams: Set<CancellableStreamRecord> | undefined;
}

Expand Down Expand Up @@ -171,6 +173,7 @@ class IncrementalPublisher {
batch = await this._incrementalGraph.nextCompletedBatch();
} while (batch !== undefined);

this._context.canceller?.unsubscribe();
await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
};
Expand Down
175 changes: 149 additions & 26 deletions src/execution/__tests__/abort-signal-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import { parse } from '../../language/parser.js';

import { buildSchema } from '../../utilities/buildASTSchema.js';

import { execute, experimentalExecuteIncrementally } from '../execute.js';
import {
execute,
experimentalExecuteIncrementally,
subscribe,
} from '../execute.js';
import type {
InitialIncrementalExecutionResult,
SubsequentIncrementalExecutionResult,
Expand Down Expand Up @@ -52,12 +56,17 @@ const schema = buildSchema(`
type Query {
todo: Todo
nonNullableTodo: Todo!
}
type Mutation {
foo: String
bar: String
}
type Subscription {
foo: String
}
`);

describe('Execute: Cancellation', () => {
Expand Down Expand Up @@ -300,6 +309,97 @@ describe('Execute: Cancellation', () => {
});
});

it('should stop the execution when aborted despite a hanging resolver', async () => {
const abortController = new AbortController();
const document = parse(`
query {
todo {
id
author {
id
}
}
}
`);

const resultPromise = execute({
document,
schema,
abortSignal: abortController.signal,
rootValue: {
todo: () =>
new Promise(() => {
/* will never resolve */
}),
},
});

abortController.abort();

const result = await resultPromise;

expect(result.errors?.[0].originalError?.name).to.equal('AbortError');

expectJSON(result).toDeepEqual({
data: {
todo: null,
},
errors: [
{
message: 'This operation was aborted',
path: ['todo'],
locations: [{ line: 3, column: 9 }],
},
],
});
});

it('should stop the execution when aborted with proper null bubbling', async () => {
const abortController = new AbortController();
const document = parse(`
query {
nonNullableTodo {
id
author {
id
}
}
}
`);

const resultPromise = execute({
document,
schema,
abortSignal: abortController.signal,
rootValue: {
nonNullableTodo: async () =>
Promise.resolve({
id: '1',
text: 'Hello, World!',
/* c8 ignore next */
author: () => expect.fail('Should not be called'),
}),
},
});

abortController.abort();

const result = await resultPromise;

expect(result.errors?.[0].originalError?.name).to.equal('AbortError');

expectJSON(result).toDeepEqual({
data: null,
errors: [
{
message: 'This operation was aborted',
path: ['nonNullableTodo'],
locations: [{ line: 3, column: 9 }],
},
],
});
});

it('should stop deferred execution when aborted', async () => {
const abortController = new AbortController();
const document = parse(`
Expand Down Expand Up @@ -353,14 +453,12 @@ describe('Execute: Cancellation', () => {
const abortController = new AbortController();
const document = parse(`
query {
todo {
id
... on Todo @defer {
... on Query @defer {
todo {
id
text
author {
... on Author @defer {
id
}
id
}
}
}
Expand All @@ -382,41 +480,27 @@ describe('Execute: Cancellation', () => {
abortController.signal,
);

await resolveOnNextTick();
await resolveOnNextTick();
await resolveOnNextTick();

abortController.abort();

const result = await resultPromise;

expectJSON(result).toDeepEqual([
{
data: {
todo: {
id: '1',
},
},
pending: [{ id: '0', path: ['todo'] }],
data: {},
pending: [{ id: '0', path: [] }],
hasNext: true,
},
{
incremental: [
{
data: {
text: 'hello world',
author: null,
todo: null,
},
errors: [
{
locations: [
{
column: 13,
line: 7,
},
],
message: 'This operation was aborted',
path: ['todo', 'author'],
path: ['todo'],
locations: [{ line: 4, column: 11 }],
},
],
id: '0',
Expand Down Expand Up @@ -448,6 +532,10 @@ describe('Execute: Cancellation', () => {
},
});

await resolveOnNextTick();
await resolveOnNextTick();
await resolveOnNextTick();

abortController.abort();

const result = await resultPromise;
Expand Down Expand Up @@ -498,4 +586,39 @@ describe('Execute: Cancellation', () => {
],
});
});

it('should stop the execution when aborted during subscription', async () => {
const abortController = new AbortController();
const document = parse(`
subscription {
foo
}
`);

const resultPromise = subscribe({
document,
schema,
abortSignal: abortController.signal,
rootValue: {
foo: async () =>
new Promise(() => {
/* will never resolve */
}),
},
});

abortController.abort();

const result = await resultPromise;

expectJSON(result).toDeepEqual({
errors: [
{
message: 'This operation was aborted',
path: ['foo'],
locations: [{ line: 3, column: 9 }],
},
],
});
});
});
Loading

0 comments on commit 3cf7d57

Please sign in to comment.