Skip to content

Commit

Permalink
update to match new spec changes
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jul 16, 2023
1 parent 272c52e commit 7ca7c93
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 205 deletions.
229 changes: 64 additions & 165 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@ import type {
GraphQLFormattedError,
} from '../error/GraphQLError.js';

import type {
DeferUsage,
DeferUsageSet,
GroupedFieldSet,
GroupedFieldSetDetails,
} from './collectFields.js';
import type { StreamUsage } from './execute.js';
import type { GroupedFieldSet } from './collectFields.js';

interface IncrementalUpdate<TData = unknown, TExtensions = ObjMap<unknown>> {
incremental: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
Expand Down Expand Up @@ -188,112 +182,38 @@ export class IncrementalPublisher {
this._reset();
}

prepareInitialResultRecord(): InitialResultRecord {
return new InitialResultRecord();
reportNewDeferFragmentRecord(
deferredFragmentRecord: DeferredFragmentRecord,
parentIncrementalResultRecord:
| InitialResultRecord
| DeferredFragmentRecord
| StreamItemsRecord,
): void {
parentIncrementalResultRecord.children.add(deferredFragmentRecord);
}

prepareNewDeferRecords(
newGroupedFieldSetDetails: Map<DeferUsageSet, GroupedFieldSetDetails>,
newDeferUsages: ReadonlyArray<DeferUsage>,
incrementalDataRecord: IncrementalDataRecord,
deferMap?: ReadonlyMap<DeferUsage, DeferredFragmentRecord>,
path?: Path | undefined,
): {
newDeferMap: ReadonlyMap<DeferUsage, DeferredFragmentRecord>;
newDeferredGroupedFieldSetRecords: ReadonlyArray<DeferredGroupedFieldSetRecord>;
} {
let newDeferMap;
if (newDeferUsages.length === 0) {
newDeferMap = deferMap ?? new Map<DeferUsage, DeferredFragmentRecord>();
} else {
newDeferMap =
deferMap === undefined
? new Map<DeferUsage, DeferredFragmentRecord>()
: new Map<DeferUsage, DeferredFragmentRecord>(deferMap);
for (const deferUsage of newDeferUsages) {
const deferredFragmentRecord = new DeferredFragmentRecord({
path,
label: deferUsage.label,
});

const parentDeferUsage = deferUsage.ancestors[0];

const parent =
parentDeferUsage === undefined
? (incrementalDataRecord as InitialResultRecord | StreamItemsRecord)
: this._deferredFragmentRecordFromDeferUsage(
parentDeferUsage,
newDeferMap,
);
parent.children.add(deferredFragmentRecord);

newDeferMap.set(deferUsage, deferredFragmentRecord);
}
}

const newDeferredGroupedFieldSetRecords: Array<DeferredGroupedFieldSetRecord> =
[];

for (const [
newGroupedFieldSetDeferUsages,
{ groupedFieldSet, shouldInitiateDefer },
] of newGroupedFieldSetDetails) {
const deferredFragmentRecords = this._getDeferredFragmentRecords(
newGroupedFieldSetDeferUsages,
newDeferMap,
reportNewDeferredGroupedFieldSetRecord(
deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord,
): void {
for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) {
deferredFragmentRecord._pending.add(deferredGroupedFieldSetRecord);
deferredFragmentRecord.deferredGroupedFieldSetRecords.add(
deferredGroupedFieldSetRecord,
);
const deferredGroupedFieldSetRecord = new DeferredGroupedFieldSetRecord({
path,
deferredFragmentRecords,
groupedFieldSet,
shouldInitiateDefer,
});
for (const deferredFragmentRecord of deferredFragmentRecords) {
deferredFragmentRecord._pending.add(deferredGroupedFieldSetRecord);
deferredFragmentRecord.deferredGroupedFieldSetRecords.add(
deferredGroupedFieldSetRecord,
);
}
newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord);
}

return {
newDeferMap,
newDeferredGroupedFieldSetRecords,
};
}

prepareNewStreamRecord(
streamUsage: StreamUsage,
path: Path,
asyncIterator?: AsyncIterator<unknown> | undefined,
): StreamRecord {
return new StreamRecord({
label: streamUsage.label,
path,
asyncIterator,
});
}

prepareNewStreamItemsRecord(
streamRecord: StreamRecord,
path: Path | undefined,
incrementalDataRecord: IncrementalDataRecord,
): StreamItemsRecord {
const streamItemsRecord = new StreamItemsRecord({
streamRecord,
path,
});

if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
for (const parent of incrementalDataRecord.deferredFragmentRecords) {
reportNewStreamItemsRecord(
streamItemsRecord: StreamItemsRecord,
parentIncrementalDataRecord: IncrementalDataRecord,
): void {
if (isDeferredGroupedFieldSetRecord(parentIncrementalDataRecord)) {
for (const parent of parentIncrementalDataRecord.deferredFragmentRecords) {
parent.children.add(streamItemsRecord);
}
} else {
incrementalDataRecord.children.add(streamItemsRecord);
parentIncrementalDataRecord.children.add(streamItemsRecord);
}

return streamItemsRecord;
}

completeDeferredGroupedFieldSet(
Expand Down Expand Up @@ -341,6 +261,9 @@ export class IncrementalPublisher {
streamItemsRecord.streamRecord.errors.push(error);
this.setIsFinalRecord(streamItemsRecord);
streamItemsRecord.isCompleted = true;
streamItemsRecord.streamRecord.earlyReturn?.().catch(() => {
// ignore error
});
this._release(streamItemsRecord);
}

Expand Down Expand Up @@ -418,7 +341,7 @@ export class IncrementalPublisher {
}

streams.forEach((stream) => {
stream.asyncIterator?.return?.().catch(() => {
stream.earlyReturn?.().catch(() => {
// ignore error
});
});
Expand Down Expand Up @@ -469,10 +392,10 @@ export class IncrementalPublisher {
streams.add(subsequentResultRecord.streamRecord);
}
}
const promises: Array<Promise<IteratorResult<unknown>>> = [];
const promises: Array<Promise<unknown>> = [];
streams.forEach((streamRecord) => {
if (streamRecord.asyncIterator?.return) {
promises.push(streamRecord.asyncIterator.return());
if (streamRecord.earlyReturn) {
promises.push(streamRecord.earlyReturn());
}
});
await Promise.all(promises);
Expand Down Expand Up @@ -569,31 +492,26 @@ export class IncrementalPublisher {
this._publish(child);
}
if (isStreamItemsRecord(subsequentResultRecord)) {
if (!subsequentResultRecord.sent) {
subsequentResultRecord.sent = true;
if (subsequentResultRecord.isFinalRecord) {
completedResults.push(
this._completedRecordToResult(
subsequentResultRecord.streamRecord,
),
);
}
if (subsequentResultRecord.isCompletedAsyncIterator) {
// async iterable resolver just finished but there may be pending payloads
continue;
}
if (subsequentResultRecord.streamRecord.errors.length > 0) {
continue;
}
const incrementalResult: IncrementalStreamResult = {
items: subsequentResultRecord.items,
path: subsequentResultRecord.streamRecord.path,
};
if (subsequentResultRecord.errors.length > 0) {
incrementalResult.errors = subsequentResultRecord.errors;
}
incrementalResults.push(incrementalResult);
if (subsequentResultRecord.isFinalRecord) {
completedResults.push(
this._completedRecordToResult(subsequentResultRecord.streamRecord),
);
}
if (subsequentResultRecord.isCompletedAsyncIterator) {
// async iterable resolver just finished but there may be pending payloads
continue;
}
if (subsequentResultRecord.streamRecord.errors.length > 0) {
continue;
}
const incrementalResult: IncrementalStreamResult = {
items: subsequentResultRecord.items,
path: subsequentResultRecord.streamRecord.path,
};
if (subsequentResultRecord.errors.length > 0) {
incrementalResult.errors = subsequentResultRecord.errors;
}
incrementalResults.push(incrementalResult);
} else {
completedResults.push(
this._completedRecordToResult(subsequentResultRecord),
Expand Down Expand Up @@ -639,23 +557,6 @@ export class IncrementalPublisher {
return result;
}

private _getDeferredFragmentRecords(
deferUsages: DeferUsageSet,
deferMap: ReadonlyMap<DeferUsage, DeferredFragmentRecord>,
): ReadonlyArray<DeferredFragmentRecord> {
return Array.from(deferUsages).map((deferUsage) =>
this._deferredFragmentRecordFromDeferUsage(deferUsage, deferMap),
);
}

private _deferredFragmentRecordFromDeferUsage(
deferUsage: DeferUsage,
deferMap: ReadonlyMap<DeferUsage, DeferredFragmentRecord>,
): DeferredFragmentRecord {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return deferMap.get(deferUsage)!;
}

private _publish(subsequentResultRecord: SubsequentResultRecord): void {
if (isStreamItemsRecord(subsequentResultRecord)) {
if (subsequentResultRecord.isCompleted) {
Expand Down Expand Up @@ -734,6 +635,18 @@ export class IncrementalPublisher {
}
}

function isDeferredGroupedFieldSetRecord(
incrementalDataRecord: unknown,
): incrementalDataRecord is DeferredGroupedFieldSetRecord {
return incrementalDataRecord instanceof DeferredGroupedFieldSetRecord;
}

function isStreamItemsRecord(
subsequentResultRecord: unknown,
): subsequentResultRecord is StreamItemsRecord {
return subsequentResultRecord instanceof StreamItemsRecord;
}

/** @internal */
export class InitialResultRecord {
errors: Array<GraphQLError>;
Expand Down Expand Up @@ -795,16 +708,16 @@ export class StreamRecord {
label: string | undefined;
path: ReadonlyArray<string | number>;
errors: Array<GraphQLError>;
asyncIterator?: AsyncIterator<unknown> | undefined;
earlyReturn?: (() => Promise<unknown>) | undefined;
constructor(opts: {
label: string | undefined;
path: Path;
asyncIterator?: AsyncIterator<unknown> | undefined;
earlyReturn?: (() => Promise<unknown>) | undefined;
}) {
this.label = opts.label;
this.path = pathToArray(opts.path);
this.errors = [];
this.asyncIterator = opts.asyncIterator;
this.earlyReturn = opts.earlyReturn;
}
}

Expand All @@ -819,7 +732,6 @@ export class StreamItemsRecord {
isCompletedAsyncIterator?: boolean;
isCompleted: boolean;
filtered: boolean;
sent: boolean;

constructor(opts: { streamRecord: StreamRecord; path: Path | undefined }) {
this.streamRecord = opts.streamRecord;
Expand All @@ -829,7 +741,6 @@ export class StreamItemsRecord {
this.isCompleted = false;
this.filtered = false;
this.items = [];
this.sent = false;
}
}

Expand All @@ -839,15 +750,3 @@ export type IncrementalDataRecord =
| StreamItemsRecord;

type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord;

function isDeferredGroupedFieldSetRecord(
incrementalDataRecord: unknown,
): incrementalDataRecord is DeferredGroupedFieldSetRecord {
return incrementalDataRecord instanceof DeferredGroupedFieldSetRecord;
}

function isStreamItemsRecord(
subsequentResultRecord: unknown,
): subsequentResultRecord is StreamItemsRecord {
return subsequentResultRecord instanceof StreamItemsRecord;
}
Loading

0 comments on commit 7ca7c93

Please sign in to comment.