From 5fc2723b265c4c94a3bd1c7822fdaa6b94b7439b Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Thu, 30 May 2024 17:10:59 +0300 Subject: [PATCH] Handle stream as stream Rather than creating a linked list where each incremental data record also includes the next record in addition to any new defers and/or streams. Enables easily batching all available stream items within the same incremental entry. --- src/execution/IncrementalGraph.ts | 113 ++++++++---- src/execution/IncrementalPublisher.ts | 5 +- src/execution/__tests__/stream-test.ts | 108 ++++------- src/execution/execute.ts | 246 +++++++++---------------- src/execution/types.ts | 66 +++---- 5 files changed, 235 insertions(+), 303 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index e0c16803dcb..32df81c1314 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -1,6 +1,8 @@ import { isPromise } from '../jsutils/isPromise.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; +import type { GraphQLError } from '../error/GraphQLError.js'; + import type { CompletedDeferredGroupedFieldSet, CompletedIncrementalData, @@ -8,14 +10,11 @@ import type { DeferredFragmentRecord, DeferredGroupedFieldSetRecord, IncrementalDataRecord, - StreamItemsRecord, + StreamItemRecord, StreamRecord, SubsequentResultRecord, } from './types.js'; -import { - isDeferredGroupedFieldSetRecord, - isStreamItemsRecord, -} from './types.js'; +import { isDeferredGroupedFieldSetRecord } from './types.js'; interface DeferredFragmentNode { deferredFragmentRecord: DeferredFragmentRecord; @@ -31,9 +30,9 @@ function isDeferredFragmentNode( } function isStreamNode( - subsequentResultNode: SubsequentResultNode, -): subsequentResultNode is StreamRecord { - return 'path' in subsequentResultNode; + record: SubsequentResultNode | IncrementalDataRecord, +): record is StreamRecord { + return 'streamItemRecords' in record; } type SubsequentResultNode = DeferredFragmentNode | StreamRecord; @@ -71,7 +70,7 @@ export class IncrementalGraph { if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { this._addDeferredGroupedFieldSetRecord(incrementalDataRecord); } else { - this._addStreamItemsRecord(incrementalDataRecord); + this._addStreamRecord(incrementalDataRecord); } } } @@ -101,6 +100,7 @@ export class IncrementalGraph { if (isStreamNode(node)) { this._pending.add(node); newPending.add(node); + this._newIncrementalDataRecords.add(node); } else if (node.deferredGroupedFieldSetRecords.size > 0) { for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) { this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode); @@ -116,22 +116,12 @@ export class IncrementalGraph { this._newPending.clear(); for (const incrementalDataRecord of this._newIncrementalDataRecords) { - if (isStreamItemsRecord(incrementalDataRecord)) { - const result = incrementalDataRecord.streamItemsResult.value; - if (isPromise(result)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - result.then((resolved) => - this._enqueue({ - streamItemsRecord: incrementalDataRecord, - streamItemsResult: resolved, - }), - ); - } else { - this._enqueue({ - streamItemsRecord: incrementalDataRecord, - streamItemsResult: result, - }); - } + if (isStreamNode(incrementalDataRecord)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._onStreamItems( + incrementalDataRecord, + incrementalDataRecord.streamItemRecords, + ); } else { const result = incrementalDataRecord.deferredGroupedFieldSetResult.value; @@ -283,12 +273,8 @@ export class IncrementalGraph { } } - private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void { - const streamRecord = streamItemsRecord.streamRecord; - if (!this._pending.has(streamRecord)) { - this._newPending.add(streamRecord); - } - this._newIncrementalDataRecords.add(streamItemsRecord); + private _addStreamRecord(streamRecord: StreamRecord): void { + this._newPending.add(streamRecord); } private _addDeferredFragmentNode( @@ -320,6 +306,71 @@ export class IncrementalGraph { return deferredFragmentNode; } + private async _onStreamItems( + streamRecord: StreamRecord, + streamItemRecords: Array, + ): Promise { + let items: Array = []; + let errors: Array = []; + let incrementalDataRecords: Array = []; + let streamItemRecord: StreamItemRecord | undefined; + while ((streamItemRecord = streamItemRecords.shift()) !== undefined) { + let result = streamItemRecord.value; + if (isPromise(result)) { + if (items.length > 0) { + this._enqueue({ + streamRecord, + streamItemsResult: { + result: + // TODO add additional test case or rework for coverage + errors.length > 0 /* c8 ignore start */ + ? { items, errors } /* c8 ignore stop */ + : { items }, + incrementalDataRecords, + }, + }); + items = []; + errors = []; + incrementalDataRecords = []; + } + // eslint-disable-next-line no-await-in-loop + result = await result; + // wait an additional tick to coalesce resolving additional promises + // within the queue + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(); + } + if (result.item === undefined) { + if (items.length > 0) { + this._enqueue({ + streamRecord, + streamItemsResult: { + result: errors.length > 0 ? { items, errors } : { items }, + incrementalDataRecords, + }, + }); + } + this._enqueue({ + streamRecord, + streamItemsResult: + result.errors === undefined + ? {} + : { + errors: result.errors, + }, + }); + return; + } + items.push(result.item); + if (result.errors !== undefined) { + errors.push(...result.errors); + } + if (result.incrementalDataRecords !== undefined) { + incrementalDataRecords.push(...result.incrementalDataRecords); + } + } + } + private *_yieldCurrentCompletedIncrementalData( first: CompletedIncrementalData, ): Generator { diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index cf9ca772afd..695f76d7cac 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -296,9 +296,8 @@ class IncrementalPublisher { completedStreamItems: CompletedStreamItems, context: SubsequentIncrementalExecutionResultContext, ): void { - const { streamItemsRecord, streamItemsResult } = completedStreamItems; - const streamRecord = streamItemsRecord.streamRecord; - const id = streamItemsRecord.streamRecord.id; + const { streamRecord, streamItemsResult } = completedStreamItems; + const id = streamRecord.id; invariant(id !== undefined); if (streamItemsResult.errors !== undefined) { this._incrementalGraph.removeStream(streamRecord); diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 10016bd0b58..7d8e3e9f505 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -146,10 +146,7 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [ - { items: ['banana'], id: '0' }, - { items: ['coconut'], id: '0' }, - ], + incremental: [{ items: ['banana', 'coconut'], id: '0' }], completed: [{ id: '0' }], hasNext: false, }, @@ -169,11 +166,7 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [ - { items: ['apple'], id: '0' }, - { items: ['banana'], id: '0' }, - { items: ['coconut'], id: '0' }, - ], + incremental: [{ items: ['apple', 'banana', 'coconut'], id: '0' }], completed: [{ id: '0' }], hasNext: false, }, @@ -220,11 +213,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: ['banana'], - id: '0', - }, - { - items: ['coconut'], + items: ['banana', 'coconut'], id: '0', }, ], @@ -284,11 +273,10 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [['banana', 'banana', 'banana']], - id: '0', - }, - { - items: [['coconut', 'coconut', 'coconut']], + items: [ + ['banana', 'banana', 'banana'], + ['coconut', 'coconut', 'coconut'], + ], id: '0', }, ], @@ -366,15 +354,11 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [{ name: 'Luke', id: '1' }], - id: '0', - }, - { - items: [{ name: 'Han', id: '2' }], - id: '0', - }, - { - items: [{ name: 'Leia', id: '3' }], + items: [ + { name: 'Luke', id: '1' }, + { name: 'Han', id: '2' }, + { name: 'Leia', id: '3' }, + ], id: '0', }, ], @@ -507,7 +491,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [null], + items: [null, { name: 'Leia', id: '3' }], id: '0', errors: [ { @@ -517,10 +501,6 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ name: 'Leia', id: '3' }], - id: '0', - }, ], completed: [{ id: '0' }], hasNext: false, @@ -557,6 +537,11 @@ describe('Execute: stream directive', () => { items: [{ name: 'Luke', id: '1' }], id: '0', }, + ], + hasNext: true, + }, + { + incremental: [ { items: [{ name: 'Han', id: '2' }], id: '0', @@ -910,7 +895,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [null], + items: [null, { nonNullName: 'Han' }], id: '0', errors: [ { @@ -920,10 +905,6 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ nonNullName: 'Han' }], - id: '0', - }, ], completed: [{ id: '0' }], hasNext: false, @@ -956,7 +937,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [null], + items: [null, { nonNullName: 'Han' }], id: '0', errors: [ { @@ -966,10 +947,6 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ nonNullName: 'Han' }], - id: '0', - }, ], completed: [{ id: '0' }], hasNext: false, @@ -1086,7 +1063,7 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [null], + items: [null, { nonNullName: 'Han' }], id: '0', errors: [ { @@ -1096,10 +1073,6 @@ describe('Execute: stream directive', () => { }, ], }, - { - items: [{ nonNullName: 'Han' }], - id: '0', - }, ], completed: [{ id: '0' }], hasNext: false, @@ -1400,10 +1373,6 @@ describe('Execute: stream directive', () => { }, { incremental: [ - { - items: [{ name: 'Luke' }], - id: '1', - }, { data: { scalarField: null }, id: '0', @@ -1415,6 +1384,10 @@ describe('Execute: stream directive', () => { }, ], }, + { + items: [{ name: 'Luke' }], + id: '1', + }, ], completed: [{ id: '0' }, { id: '1' }], hasNext: false, @@ -1717,11 +1690,10 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [{ id: '1', name: 'Luke' }], - id: '0', - }, - { - items: [{ id: '2', name: 'Han' }], + items: [ + { id: '1', name: 'Luke' }, + { id: '2', name: 'Han' }, + ], id: '0', }, ], @@ -1783,11 +1755,7 @@ describe('Execute: stream directive', () => { id: '0', }, { - items: [{ name: 'Luke' }], - id: '1', - }, - { - items: [{ name: 'Han' }], + items: [{ name: 'Luke' }, { name: 'Han' }], id: '1', }, ], @@ -1856,14 +1824,14 @@ describe('Execute: stream directive', () => { value: { pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ - { - items: [{ id: '2' }], - id: '1', - }, { data: { name: 'Luke' }, id: '0', }, + { + items: [{ id: '2' }], + id: '1', + }, ], completed: [{ id: '0' }], hasNext: true, @@ -1957,14 +1925,14 @@ describe('Execute: stream directive', () => { value: { pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], incremental: [ - { - items: [{ id: '2' }], - id: '1', - }, { data: { name: 'Luke' }, id: '0', }, + { + items: [{ id: '2' }], + id: '1', + }, ], completed: [{ id: '0' }], hasNext: true, diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 22b804f59ce..c4edd07b032 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -69,11 +69,10 @@ import type { ExecutionResult, ExperimentalIncrementalExecutionResults, IncrementalDataRecord, - StreamItemsRecord, - StreamItemsResult, + StreamItemRecord, + StreamItemResult, StreamRecord, } from './types.js'; -import { isReconcilableStreamItemsResult } from './types.js'; import { getArgumentValues, getDirectiveValues, @@ -1094,17 +1093,29 @@ async function completeAsyncIteratorValue( // eslint-disable-next-line no-constant-condition while (true) { if (streamUsage && index >= streamUsage.initialCount) { + const streamItemRecords = getAsyncStreamItemRecords( + index, + path, + asyncIterator, + exeContext, + streamUsage.fieldGroup, + info, + itemType, + ); + const returnFn = asyncIterator.return; let streamRecord: StreamRecord | CancellableStreamRecord; if (returnFn === undefined) { streamRecord = { label: streamUsage.label, path, - } as StreamRecord; + streamItemRecords, + }; } else { streamRecord = { label: streamUsage.label, path, + streamItemRecords, earlyReturn: returnFn.bind(asyncIterator), }; if (exeContext.cancellableStreams === undefined) { @@ -1113,18 +1124,7 @@ async function completeAsyncIteratorValue( exeContext.cancellableStreams.add(streamRecord); } - const firstStreamItems = firstAsyncStreamItems( - streamRecord, - path, - index, - asyncIterator, - exeContext, - streamUsage.fieldGroup, - info, - itemType, - ); - - addIncrementalDataRecords(graphqlWrappedResult, [firstStreamItems]); + addIncrementalDataRecords(graphqlWrappedResult, [streamRecord]); break; } @@ -1267,22 +1267,22 @@ function completeIterableValue( const item = iteration.value; if (streamUsage && index >= streamUsage.initialCount) { - const streamRecord: StreamRecord = { + const syncStreamRecord: StreamRecord = { label: streamUsage.label, path, + streamItemRecords: getSyncStreamItemRecords( + item, + index, + path, + iterator, + exeContext, + streamUsage.fieldGroup, + info, + itemType, + ), }; - const firstStreamItems = firstSyncStreamItems( - streamRecord, - item, - index, - iterator, - exeContext, - streamUsage.fieldGroup, - info, - itemType, - ); - addIncrementalDataRecords(graphqlWrappedResult, [firstStreamItems]); + addIncrementalDataRecords(graphqlWrappedResult, [syncStreamRecord]); break; } @@ -2208,26 +2208,22 @@ function getDeferredFragmentRecords( ); } -function firstSyncStreamItems( - streamRecord: StreamRecord, +function getSyncStreamItemRecords( initialItem: PromiseOrValue, initialIndex: number, + streamPath: Path, iterator: Iterator, exeContext: ExecutionContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, -): StreamItemsRecord { - return { - streamRecord, - streamItemsResult: new Future( +): Array { + const streamItems: Array = [ + new Future( Promise.resolve().then(() => { - const path = streamRecord.path; - const initialPath = addPath(path, initialIndex, undefined); - - let result = new Future( - completeStreamItems( - streamRecord, + const initialPath = addPath(streamPath, initialIndex, undefined); + const firstStreamItem = new Future( + completeStreamItem( initialPath, initialItem, exeContext, @@ -2237,27 +2233,24 @@ function firstSyncStreamItems( itemType, ), ); - const firstStreamItems = { streamItemsResult: result }; - let currentStreamItems = firstStreamItems; - let currentIndex = initialIndex; + let iteration = iterator.next(); - let erroredSynchronously = false; + let currentIndex = initialIndex + 1; + let currentStreamItem = firstStreamItem; while (!iteration.done) { - if ( - !isPromise(result.value) && - !isReconcilableStreamItemsResult(result.value) - ) { - erroredSynchronously = true; + // TODO: add test case for early sync termination + /* c8 ignore next 4 */ + const result = currentStreamItem.value; + if (!isPromise(result) && result.errors !== undefined) { break; } - const item = iteration.value; - currentIndex++; - const currentPath = addPath(path, currentIndex, undefined); - result = new Future( - completeStreamItems( - streamRecord, - currentPath, - item, + + const itemPath = addPath(streamPath, currentIndex, undefined); + + currentStreamItem = new Future( + completeStreamItem( + itemPath, + iteration.value, exeContext, { errors: undefined }, fieldGroup, @@ -2265,84 +2258,37 @@ function firstSyncStreamItems( itemType, ), ); - - const nextStreamItems: StreamItemsRecord = { - streamRecord, - streamItemsResult: result, - }; - currentStreamItems.streamItemsResult = new Future( - prependNextStreamItems( - currentStreamItems.streamItemsResult.value, - nextStreamItems, - ), - ); - currentStreamItems = nextStreamItems; + streamItems.push(currentStreamItem); iteration = iterator.next(); + currentIndex = initialIndex + 1; } - // If a non-reconcilable stream items result was encountered, then the stream terminates in error. - // Otherwise, add a stream terminator. - if (!erroredSynchronously) { - currentStreamItems.streamItemsResult = new Future( - prependNextStreamItems(currentStreamItems.streamItemsResult.value, { - streamRecord, - streamItemsResult: new Future({}), - }), - ); - } + streamItems.push(new Future({})); - return firstStreamItems.streamItemsResult.value; + return firstStreamItem.value; }), ), - }; -} + ]; -function prependNextStreamItems( - result: PromiseOrValue, - nextStreamItems: StreamItemsRecord, -): PromiseOrValue { - if (isPromise(result)) { - return result.then((resolved) => - prependNextResolvedStreamItems(resolved, nextStreamItems), - ); - } - return prependNextResolvedStreamItems(result, nextStreamItems); + return streamItems; } -function prependNextResolvedStreamItems( - result: StreamItemsResult, - nextStreamItems: StreamItemsRecord, -): StreamItemsResult { - if (!isReconcilableStreamItemsResult(result)) { - return result; - } - const incrementalDataRecords = result.incrementalDataRecords; - return { - ...result, - incrementalDataRecords: - incrementalDataRecords === undefined - ? [nextStreamItems] - : [nextStreamItems, ...incrementalDataRecords], - }; -} - -function firstAsyncStreamItems( - streamRecord: StreamRecord, - path: Path, +function getAsyncStreamItemRecords( initialIndex: number, + streamPath: Path, asyncIterator: AsyncIterator, exeContext: ExecutionContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, -): StreamItemsRecord { - const firstStreamItems: StreamItemsRecord = { - streamRecord, - streamItemsResult: new Future( - getNextAsyncStreamItemsResult( - streamRecord, - path, +): Array { + const streamItemRecords: Array = []; + streamItemRecords.push( + new Future( + getNextAsyncStreamItemResult( + streamItemRecords, + streamPath, initialIndex, asyncIterator, exeContext, @@ -2351,26 +2297,28 @@ function firstAsyncStreamItems( itemType, ), ), - }; - return firstStreamItems; + ); + return streamItemRecords; } -async function getNextAsyncStreamItemsResult( - streamRecord: StreamRecord, - path: Path, +async function getNextAsyncStreamItemResult( + streamItemRecords: Array, + streamPath: Path, index: number, asyncIterator: AsyncIterator, exeContext: ExecutionContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, -): Promise { +): Promise { let iteration; try { iteration = await asyncIterator.next(); } catch (error) { return { - errors: [locatedError(error, toNodes(fieldGroup), pathToArray(path))], + errors: [ + locatedError(error, toNodes(fieldGroup), pathToArray(streamPath)), + ], }; } @@ -2378,10 +2326,9 @@ async function getNextAsyncStreamItemsResult( return {}; } - const itemPath = addPath(path, index, undefined); + const itemPath = addPath(streamPath, index, undefined); - const result = completeStreamItems( - streamRecord, + const result = completeStreamItem( itemPath, iteration.value, exeContext, @@ -2391,12 +2338,11 @@ async function getNextAsyncStreamItemsResult( itemType, ); - const nextStreamItems: StreamItemsRecord = { - streamRecord, - streamItemsResult: new Future( - getNextAsyncStreamItemsResult( - streamRecord, - path, + streamItemRecords.push( + new Future( + getNextAsyncStreamItemResult( + streamItemRecords, + streamPath, index, asyncIterator, exeContext, @@ -2405,13 +2351,12 @@ async function getNextAsyncStreamItemsResult( itemType, ), ), - }; + ); - return prependNextStreamItems(result, nextStreamItems); + return result; } -function completeStreamItems( - streamRecord: StreamRecord, +function completeStreamItem( itemPath: Path, item: unknown, exeContext: ExecutionContext, @@ -2419,7 +2364,7 @@ function completeStreamItems( fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, -): PromiseOrValue { +): PromiseOrValue { if (isPromise(item)) { return completePromisedValue( exeContext, @@ -2432,9 +2377,8 @@ function completeStreamItems( new Map(), ).then( (resolvedItem) => - buildStreamItemsResult(incrementalContext.errors, resolvedItem), + buildStreamItemResult(incrementalContext.errors, resolvedItem), (error) => ({ - streamRecord, errors: withError(incrementalContext.errors, error), }), ); @@ -2485,29 +2429,23 @@ function completeStreamItems( }) .then( (resolvedItem) => - buildStreamItemsResult(incrementalContext.errors, resolvedItem), + buildStreamItemResult(incrementalContext.errors, resolvedItem), (error) => ({ - streamRecord, errors: withError(incrementalContext.errors, error), }), ); } - return buildStreamItemsResult(incrementalContext.errors, result); + return buildStreamItemResult(incrementalContext.errors, result); } -function buildStreamItemsResult( +function buildStreamItemResult( errors: ReadonlyArray | undefined, result: GraphQLWrappedResult, -): StreamItemsResult { +): StreamItemResult { return { - result: - errors === undefined - ? { items: [result[0]] } - : { - items: [result[0]], - errors: [...errors], - }, + item: result[0], + errors, incrementalDataRecords: result[1], }; } diff --git a/src/execution/types.ts b/src/execution/types.ts index 2592b8e7877..6b6e7865d5a 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -232,69 +232,45 @@ export interface DeferredFragmentRecord { parent: DeferredFragmentRecord | undefined; } +export interface StreamItemResult { + item?: unknown; + incrementalDataRecords?: ReadonlyArray | undefined; + errors?: ReadonlyArray | undefined; +} + +export type StreamItemRecord = Future; + export interface StreamRecord { path: Path | undefined; label: string | undefined; id?: string | undefined; + streamItemRecords: Array; } -export interface CancellableStreamRecord extends StreamRecord { - earlyReturn: () => Promise; -} - -export function isCancellableStreamRecord( - subsequentResultRecord: StreamRecord, -): subsequentResultRecord is CancellableStreamRecord { - return 'earlyReturn' in subsequentResultRecord; -} - -interface ReconcilableStreamItemsResult { - result: BareStreamItemsResult; - incrementalDataRecords: ReadonlyArray | undefined; - errors?: never; -} - -export function isReconcilableStreamItemsResult( - streamItemsResult: StreamItemsResult, -): streamItemsResult is ReconcilableStreamItemsResult { - return streamItemsResult.result !== undefined; -} - -interface TerminatingStreamItemsResult { - result?: never; - incrementalDataRecords?: never; - errors?: never; -} - -interface NonReconcilableStreamItemsResult { - errors: ReadonlyArray; - result?: never; +export interface StreamItemsResult { + result?: BareStreamItemsResult | undefined; + incrementalDataRecords?: ReadonlyArray | undefined; + errors?: ReadonlyArray | undefined; } export interface CompletedStreamItems { - streamItemsRecord: StreamItemsRecord; + streamRecord: StreamRecord; streamItemsResult: StreamItemsResult; } -export type StreamItemsResult = - | ReconcilableStreamItemsResult - | TerminatingStreamItemsResult - | NonReconcilableStreamItemsResult; - -export interface StreamItemsRecord { - streamRecord: SubsequentResultRecord; - streamItemsResult: Future; +export interface CancellableStreamRecord extends StreamRecord { + earlyReturn: () => Promise; } -export function isStreamItemsRecord( - incrementalDataRecord: IncrementalDataRecord, -): incrementalDataRecord is StreamItemsRecord { - return 'streamRecord' in incrementalDataRecord; +export function isCancellableStreamRecord( + subsequentResultRecord: SubsequentResultRecord, +): subsequentResultRecord is CancellableStreamRecord { + return 'earlyReturn' in subsequentResultRecord; } export type IncrementalDataRecord = | DeferredGroupedFieldSetRecord - | StreamItemsRecord; + | StreamRecord; export type CompletedIncrementalData = | CompletedStreamItems