Skip to content

Commit

Permalink
Handle stream as stream
Browse files Browse the repository at this point in the history
Rather than creating a linked list where each incremental data record also includes the next record in addition to any new defers and/or streams.

Enables easily batching all available stream items within the same incremental entry.
  • Loading branch information
yaacovCR committed May 31, 2024
1 parent cba24de commit 5fc2723
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 303 deletions.
113 changes: 82 additions & 31 deletions src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import { isPromise } from '../jsutils/isPromise.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

import type { GraphQLError } from '../error/GraphQLError.js';

import type {
CompletedDeferredGroupedFieldSet,
CompletedIncrementalData,
CompletedReconcilableDeferredGroupedFieldSet,
DeferredFragmentRecord,
DeferredGroupedFieldSetRecord,
IncrementalDataRecord,
StreamItemsRecord,
StreamItemRecord,
StreamRecord,
SubsequentResultRecord,
} from './types.js';
import {
isDeferredGroupedFieldSetRecord,
isStreamItemsRecord,
} from './types.js';
import { isDeferredGroupedFieldSetRecord } from './types.js';

interface DeferredFragmentNode {
deferredFragmentRecord: DeferredFragmentRecord;
Expand All @@ -31,9 +30,9 @@ function isDeferredFragmentNode(
}

function isStreamNode(
subsequentResultNode: SubsequentResultNode,
): subsequentResultNode is StreamRecord {
return 'path' in subsequentResultNode;
record: SubsequentResultNode | IncrementalDataRecord,
): record is StreamRecord {
return 'streamItemRecords' in record;
}

type SubsequentResultNode = DeferredFragmentNode | StreamRecord;
Expand Down Expand Up @@ -71,7 +70,7 @@ export class IncrementalGraph {
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
} else {
this._addStreamItemsRecord(incrementalDataRecord);
this._addStreamRecord(incrementalDataRecord);
}
}
}
Expand Down Expand Up @@ -101,6 +100,7 @@ export class IncrementalGraph {
if (isStreamNode(node)) {
this._pending.add(node);
newPending.add(node);
this._newIncrementalDataRecords.add(node);
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
Expand All @@ -116,22 +116,12 @@ export class IncrementalGraph {
this._newPending.clear();

for (const incrementalDataRecord of this._newIncrementalDataRecords) {
if (isStreamItemsRecord(incrementalDataRecord)) {
const result = incrementalDataRecord.streamItemsResult.value;
if (isPromise(result)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
result.then((resolved) =>
this._enqueue({
streamItemsRecord: incrementalDataRecord,
streamItemsResult: resolved,
}),
);
} else {
this._enqueue({
streamItemsRecord: incrementalDataRecord,
streamItemsResult: result,
});
}
if (isStreamNode(incrementalDataRecord)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this._onStreamItems(
incrementalDataRecord,
incrementalDataRecord.streamItemRecords,
);
} else {
const result =
incrementalDataRecord.deferredGroupedFieldSetResult.value;
Expand Down Expand Up @@ -283,12 +273,8 @@ export class IncrementalGraph {
}
}

private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void {
const streamRecord = streamItemsRecord.streamRecord;
if (!this._pending.has(streamRecord)) {
this._newPending.add(streamRecord);
}
this._newIncrementalDataRecords.add(streamItemsRecord);
private _addStreamRecord(streamRecord: StreamRecord): void {
this._newPending.add(streamRecord);
}

private _addDeferredFragmentNode(
Expand Down Expand Up @@ -320,6 +306,71 @@ export class IncrementalGraph {
return deferredFragmentNode;
}

private async _onStreamItems(
streamRecord: StreamRecord,
streamItemRecords: Array<StreamItemRecord>,
): Promise<void> {
let items: Array<unknown> = [];
let errors: Array<GraphQLError> = [];
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
let streamItemRecord: StreamItemRecord | undefined;
while ((streamItemRecord = streamItemRecords.shift()) !== undefined) {
let result = streamItemRecord.value;
if (isPromise(result)) {
if (items.length > 0) {
this._enqueue({
streamRecord,
streamItemsResult: {
result:
// TODO add additional test case or rework for coverage
errors.length > 0 /* c8 ignore start */
? { items, errors } /* c8 ignore stop */
: { items },
incrementalDataRecords,
},
});
items = [];
errors = [];
incrementalDataRecords = [];
}
// eslint-disable-next-line no-await-in-loop
result = await result;
// wait an additional tick to coalesce resolving additional promises
// within the queue
// eslint-disable-next-line no-await-in-loop
await Promise.resolve();
}
if (result.item === undefined) {
if (items.length > 0) {
this._enqueue({
streamRecord,
streamItemsResult: {
result: errors.length > 0 ? { items, errors } : { items },
incrementalDataRecords,
},
});
}
this._enqueue({
streamRecord,
streamItemsResult:
result.errors === undefined
? {}
: {
errors: result.errors,
},
});
return;
}
items.push(result.item);
if (result.errors !== undefined) {
errors.push(...result.errors);
}
if (result.incrementalDataRecords !== undefined) {
incrementalDataRecords.push(...result.incrementalDataRecords);
}
}
}

private *_yieldCurrentCompletedIncrementalData(
first: CompletedIncrementalData,
): Generator<CompletedIncrementalData> {
Expand Down
5 changes: 2 additions & 3 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,8 @@ class IncrementalPublisher {
completedStreamItems: CompletedStreamItems,
context: SubsequentIncrementalExecutionResultContext,
): void {
const { streamItemsRecord, streamItemsResult } = completedStreamItems;
const streamRecord = streamItemsRecord.streamRecord;
const id = streamItemsRecord.streamRecord.id;
const { streamRecord, streamItemsResult } = completedStreamItems;
const id = streamRecord.id;
invariant(id !== undefined);
if (streamItemsResult.errors !== undefined) {
this._incrementalGraph.removeStream(streamRecord);
Expand Down
Loading

0 comments on commit 5fc2723

Please sign in to comment.