Skip to content

Commit

Permalink
fix(race): concurrent next calls with defer/stream (#2975)
Browse files Browse the repository at this point in the history
* fix(race): concurrent next calls

* refactor test

* use invariant

* disable eslint error

* fix
  • Loading branch information
robrichard committed Aug 23, 2022
1 parent 3bdb787 commit ca90e59
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 0 deletions.
63 changes: 63 additions & 0 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) {
return result;
}

async function completeAsync(
document: DocumentNode,
numCalls: number,
rootValue: unknown = {},
) {
const result = await execute({ schema, document, rootValue });

assert(isAsyncIterable(result));

const iterator = result[Symbol.asyncIterator]();

const promises = [];
for (let i = 0; i < numCalls; i++) {
promises.push(iterator.next());
}
return Promise.all(promises);
}

function createResolvablePromise<T>(): [Promise<T>, (value?: T) => void] {
let resolveFn;
const promise = new Promise<T>((resolve) => {
Expand Down Expand Up @@ -566,6 +584,51 @@ describe('Execute: stream directive', () => {
},
});
});
it('Can handle concurrent calls to .next() without waiting', async () => {
const document = parse(`
query {
friendList @stream(initialCount: 2) {
name
id
}
}
`);
const result = await completeAsync(document, 4, {
async *friendList() {
yield await Promise.resolve(friends[0]);
yield await Promise.resolve(friends[1]);
yield await Promise.resolve(friends[2]);
},
});
expectJSON(result).toDeepEqual([
{
done: false,
value: {
data: {
friendList: [
{ name: 'Luke', id: '1' },
{ name: 'Han', id: '2' },
],
},
hasNext: true,
},
},
{
done: false,
value: {
incremental: [
{
items: [{ name: 'Leia', id: '3' }],
path: ['friendList', 2],
},
],
hasNext: true,
},
},
{ done: false, value: { hasNext: false } },
{ done: true, value: undefined },
]);
});
it('Handles error thrown in async iterable before initialCount is reached', async () => {
const document = parse(`
query {
Expand Down
9 changes: 9 additions & 0 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1823,7 +1823,16 @@ function yieldSubsequentPayloads(
},
);

if (exeContext.subsequentPayloads.length === 0) {
// a different call to next has exhausted all payloads
return { value: undefined, done: true };
}
const index = exeContext.subsequentPayloads.indexOf(asyncPayloadRecord);
if (index === -1) {
// a different call to next has consumed this payload
return race();
}

exeContext.subsequentPayloads.splice(index, 1);

const incrementalResult: IncrementalResult = {};
Expand Down

0 comments on commit ca90e59

Please sign in to comment.