Skip to content

Commit

Permalink
introduce IncrementalGraph class to manage the graph of pending subse…
Browse files Browse the repository at this point in the history
…quent results
  • Loading branch information
yaacovCR committed May 31, 2024
1 parent 06bb157 commit cba24de
Show file tree
Hide file tree
Showing 7 changed files with 856 additions and 585 deletions.
344 changes: 344 additions & 0 deletions src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,344 @@
import { isPromise } from '../jsutils/isPromise.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

import type {
CompletedDeferredGroupedFieldSet,
CompletedIncrementalData,
CompletedReconcilableDeferredGroupedFieldSet,
DeferredFragmentRecord,
DeferredGroupedFieldSetRecord,
IncrementalDataRecord,
StreamItemsRecord,
StreamRecord,
SubsequentResultRecord,
} from './types.js';
import {
isDeferredGroupedFieldSetRecord,
isStreamItemsRecord,
} from './types.js';

interface DeferredFragmentNode {
deferredFragmentRecord: DeferredFragmentRecord;
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
completedReconcilableDeferredGroupedFieldSets: Set<CompletedReconcilableDeferredGroupedFieldSet>;
children: Array<DeferredFragmentNode>;
}

function isDeferredFragmentNode(
node: DeferredFragmentNode | undefined,
): node is DeferredFragmentNode {
return node !== undefined;
}

function isStreamNode(
subsequentResultNode: SubsequentResultNode,
): subsequentResultNode is StreamRecord {
return 'path' in subsequentResultNode;
}

type SubsequentResultNode = DeferredFragmentNode | StreamRecord;

/**
* @internal
*/
export class IncrementalGraph {
private _pending: Set<SubsequentResultNode>;
private _newPending: Set<SubsequentResultNode>;
private _newIncrementalDataRecords: Set<IncrementalDataRecord>;
private _deferredFragmentNodes: Map<
DeferredFragmentRecord,
DeferredFragmentNode
>;

private _completedQueue: Array<CompletedIncrementalData>;
private _nextQueue: Array<
(iterable: IteratorResult<Iterable<CompletedIncrementalData>>) => void
>;

constructor() {
this._pending = new Set();
this._newIncrementalDataRecords = new Set();
this._newPending = new Set();
this._deferredFragmentNodes = new Map();
this._completedQueue = [];
this._nextQueue = [];
}

addIncrementalDataRecords(
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>,
): void {
for (const incrementalDataRecord of incrementalDataRecords) {
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
} else {
this._addStreamItemsRecord(incrementalDataRecord);
}
}
}

addCompletedReconcilableDeferredGroupedFieldSet(
completedDeferredGroupedFieldSet: CompletedReconcilableDeferredGroupedFieldSet,
): void {
const deferredFragmentNodes: Array<DeferredFragmentNode> =
completedDeferredGroupedFieldSet.deferredGroupedFieldSetRecord.deferredFragmentRecords
.map((deferredFragmentRecord) =>
this._deferredFragmentNodes.get(deferredFragmentRecord),
)
.filter<DeferredFragmentNode>(isDeferredFragmentNode);
for (const deferredFragmentNode of deferredFragmentNodes) {
deferredFragmentNode.deferredGroupedFieldSetRecords.delete(
completedDeferredGroupedFieldSet.deferredGroupedFieldSetRecord,
);
deferredFragmentNode.completedReconcilableDeferredGroupedFieldSets.add(
completedDeferredGroupedFieldSet,
);
}
}

getNewPending(): Set<SubsequentResultRecord> {
const newPending = new Set<SubsequentResultRecord>();
for (const node of this._newPending) {
if (isStreamNode(node)) {
this._pending.add(node);
newPending.add(node);
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
}
this._pending.add(node);
newPending.add(node.deferredFragmentRecord);
} else {
for (const child of node.children) {
this._newPending.add(child);
}
}
}
this._newPending.clear();

for (const incrementalDataRecord of this._newIncrementalDataRecords) {
if (isStreamItemsRecord(incrementalDataRecord)) {
const result = incrementalDataRecord.streamItemsResult.value;
if (isPromise(result)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
result.then((resolved) =>
this._enqueue({
streamItemsRecord: incrementalDataRecord,
streamItemsResult: resolved,
}),
);
} else {
this._enqueue({
streamItemsRecord: incrementalDataRecord,
streamItemsResult: result,
});
}
} else {
const result =
incrementalDataRecord.deferredGroupedFieldSetResult.value;
if (isPromise(result)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
result.then((resolved) =>
this._enqueue({
deferredGroupedFieldSetRecord: incrementalDataRecord,
deferredGroupedFieldSetResult: resolved,
} as CompletedDeferredGroupedFieldSet),
);
} else {
this._enqueue({
deferredGroupedFieldSetRecord: incrementalDataRecord,
deferredGroupedFieldSetResult: result,
} as CompletedDeferredGroupedFieldSet);
}
}
}
this._newIncrementalDataRecords.clear();

return newPending;
}

completedIncrementalData() {
return {
[Symbol.asyncIterator]() {
return this;
},
next: (): Promise<IteratorResult<Iterable<CompletedIncrementalData>>> => {
const firstResult = this._completedQueue.shift();
if (firstResult !== undefined) {
return Promise.resolve({
value: this._yieldCurrentCompletedIncrementalData(firstResult),
done: false,
});
}
const { promise, resolve } =
promiseWithResolvers<
IteratorResult<Iterable<CompletedIncrementalData>>
>();
this._nextQueue.push(resolve);
return promise;
},
return: (): Promise<
IteratorResult<Iterable<CompletedIncrementalData>>
> => {
for (const resolve of this._nextQueue) {
resolve({ value: undefined, done: true });
}
return Promise.resolve({ value: undefined, done: true });
},
};
}

hasNext(): boolean {
return this._pending.size > 0;
}

completeDeferredFragment(
deferredFragmentRecord: DeferredFragmentRecord,
): Array<CompletedReconcilableDeferredGroupedFieldSet> | undefined {
const deferredFragmentNode = this._deferredFragmentNodes.get(
deferredFragmentRecord,
);
// TODO: add test case?
/* c8 ignore next 3 */
if (deferredFragmentNode === undefined) {
return undefined;
}
const {
deferredGroupedFieldSetRecords,
completedReconcilableDeferredGroupedFieldSets,
} = deferredFragmentNode;
if (deferredGroupedFieldSetRecords.size > 0) {
return undefined;
}
const results = Array.from(completedReconcilableDeferredGroupedFieldSets);
for (const completedReconcilableDeferredGroupedFieldSet of completedReconcilableDeferredGroupedFieldSets) {
for (const otherDeferredFragmentRecord of completedReconcilableDeferredGroupedFieldSet
.deferredGroupedFieldSetRecord.deferredFragmentRecords) {
const otherDeferredFragmentNode = this._deferredFragmentNodes.get(
otherDeferredFragmentRecord,
);
if (otherDeferredFragmentNode === undefined) {
continue;
}
otherDeferredFragmentNode.completedReconcilableDeferredGroupedFieldSets.delete(
completedReconcilableDeferredGroupedFieldSet,
);
}
}
for (const child of deferredFragmentNode.children) {
const childNode = this._addDeferredFragmentNode(
child.deferredFragmentRecord,
);
this._newPending.add(childNode);
}
this._removePending(deferredFragmentNode);
this._deferredFragmentNodes.delete(deferredFragmentRecord);
return results;
}

removeDeferredFragment(
deferredFragmentRecord: DeferredFragmentRecord,
): boolean {
const deferredFragmentNode = this._deferredFragmentNodes.get(
deferredFragmentRecord,
);
if (deferredFragmentNode === undefined) {
return false;
}
this._removePending(deferredFragmentNode);
this._deferredFragmentNodes.delete(deferredFragmentRecord);
// TODO: add test case for an erroring deferred fragment with child defers
/* c8 ignore next 3 */
for (const child of deferredFragmentNode.children) {
this.removeDeferredFragment(child.deferredFragmentRecord);
}
return true;
}

removeStream(streamRecord: StreamRecord): void {
this._removePending(streamRecord);
}

private _removePending(subsequentResultNode: SubsequentResultNode): void {
this._pending.delete(subsequentResultNode);
if (this._pending.size === 0) {
for (const resolve of this._nextQueue) {
resolve({ value: undefined, done: true });
}
}
}

private _addDeferredGroupedFieldSetRecord(
deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord,
): void {
for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) {
const deferredFragmentNode = this._addDeferredFragmentNode(
deferredFragmentRecord,
);
if (this._pending.has(deferredFragmentNode)) {
this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord);
}
deferredFragmentNode.deferredGroupedFieldSetRecords.add(
deferredGroupedFieldSetRecord,
);
}
}

private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void {
const streamRecord = streamItemsRecord.streamRecord;
if (!this._pending.has(streamRecord)) {
this._newPending.add(streamRecord);
}
this._newIncrementalDataRecords.add(streamItemsRecord);
}

private _addDeferredFragmentNode(
deferredFragmentRecord: DeferredFragmentRecord,
): DeferredFragmentNode {
let deferredFragmentNode = this._deferredFragmentNodes.get(
deferredFragmentRecord,
);
if (deferredFragmentNode !== undefined) {
return deferredFragmentNode;
}
deferredFragmentNode = {
deferredFragmentRecord,
deferredGroupedFieldSetRecords: new Set(),
completedReconcilableDeferredGroupedFieldSets: new Set(),
children: [],
};
this._deferredFragmentNodes.set(
deferredFragmentRecord,
deferredFragmentNode,
);
const parent = deferredFragmentRecord.parent;
if (parent === undefined) {
this._newPending.add(deferredFragmentNode);
return deferredFragmentNode;
}
const parentNode = this._addDeferredFragmentNode(parent);
parentNode.children.push(deferredFragmentNode);
return deferredFragmentNode;
}

private *_yieldCurrentCompletedIncrementalData(
first: CompletedIncrementalData,
): Generator<CompletedIncrementalData> {
yield first;
let completed;
while ((completed = this._completedQueue.shift()) !== undefined) {
yield completed;
}
}

private _enqueue(completed: CompletedIncrementalData): void {
const next = this._nextQueue.shift();
if (next !== undefined) {
next({
value: this._yieldCurrentCompletedIncrementalData(completed),
done: false,
});
return;
}
this._completedQueue.push(completed);
}
}
Loading

0 comments on commit cba24de

Please sign in to comment.