Skip to content

Commit

Permalink
Introduce @stream directive
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Feb 25, 2024
1 parent 67d084e commit e9f3b78
Showing 1 changed file with 92 additions and 10 deletions.
102 changes: 92 additions & 10 deletions spec/Section 6 -- Execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ YieldIncrementalResults(data, errors, futures, eventEmitter):
{newPendingResults} as pending.
- For each completed child Future node of a root node in {graph}:
- Let {completedFuture} be that Future; let {result} be its result.
- If {data} on {result} is {null}:
- If {FutureCompletedWithoutData(completedFuture, result)} is {true}:
- Initialize {completed} to an empty list.
- Let {parents} be the parents of {completedFuture}.
- Initialize {completed} to an empty list.
Expand All @@ -380,8 +380,8 @@ YieldIncrementalResults(data, errors, futures, eventEmitter):
- Add each {future} of {futures} on {result} to {graph} via the same procedure
as above.
- Let {futuresToRelease} be the set of completed Future nodes in {graph}
completing a Deferred Fragment root node where all of the sibling Futures
are also complete.
either completing a Stream root node, or completing a Deferred Fragment root
node where all of the sibling Futures are also complete.
- If {futuresToRelease} is empty, continue to the next completed child Future
node in {graph}.
- Initialize {incremental} to an empty lists.
Expand All @@ -397,9 +397,8 @@ YieldIncrementalResults(data, errors, futures, eventEmitter):
nodes.
- Prune root nodes of {graph} containing no direct child Futures, as above.
- Let {hasNext} be {false} if {graph} is empty.
- Let {incrementalResult} be an unordered map containing {hasNext}.
- If {incremental} is not empty, set the corresponding entry on
{incrementalResult} to {incremental}.
- Let {incrementalResult} be an unordered map containing {incremental} and
{hasNext}.
- If {completed} is not empty, set the corresponding entry on
{incrementalResult} to {completed}.
- Let {newPendingResults} be the set of new root nodes in {graph}, promoted by
Expand All @@ -409,7 +408,8 @@ YieldIncrementalResults(data, errors, futures, eventEmitter):
- Set the corresponding entry on {incrementalResult} to {pending}.
- Yield {incrementalResult}.
- Emit events on {eventEmitter} signalling the release of each of the Deferred
Fragments in {newPendingResults} as pending.
Fragments in {newPendingResults} as pending, as well as each of the {items}
entries on the Futures in {futuresToRelease}.
- Complete this incremental result stream.

GetPending(newPendingResults):
Expand All @@ -422,8 +422,21 @@ GetPending(newPendingResults):
- Append {pendingEntry} to {pending}.
- Return {pending}.

FutureCompletedWithoutData(completedFuture, result):

- If {completedFuture} incrementally completes Deferred Fragments:
- If {data} on {result} is {null}, return {true}.
- Otherwise:
- If {items} on {result} is not defined or {null}, return {true}.
- Return {false}.

GetIncrementalEntry(future, graph):

- If {future} completes a Stream:
- Let {stream} be the Stream incrementally completed by {future}.
- Let {items} and {errors} be the corresponding entries on {result}.
- Let {id} be the unique identifier for {stream}.
- Return an unordered map containing {id}, {items}, and {errors}.
- Let {deferredFragments} be the Deferred Fragments incrementally completed by
{future} at {path}.
- Let {result} be the result of {future}.
Expand Down Expand Up @@ -967,17 +980,86 @@ CompleteListValue(innerType, fieldDetailsList, result, variableValues,
eventEmitter, path, deferUsageSet, deferMap):

- Initialize {items} and {futures} to empty lists.
- Let {index} be {0}.
- For each {resultItem} of {result}:
- Let {fieldDetails} be the first entry in {fieldDetailsList}.
- Let {field} be the corresponding entry on {fieldDetails}.
- If {field} provides the directive `@stream` and its {if} argument is not
{false} and is not a variable in {variableValues} with the value {false} and
{innerType} is the outermost inner type of the list type defined for
{fieldDetailsList}:
- Let {streamDirective} be that directive.
- If this execution is for a subscription operation, raise a _field error_.
- Let {initialCount} be the value or variable provided to {streamDirective}'s
{initialCount} argument.
- If {initialCount} is less than zero, raise a _field error_.
- Let {label} be the value or variable provided to {streamDirective}'s {label}
argument.
- Let {iterator} be an iterator for {result}.
- Let {index} be zero.
- While {result} is not closed:
- If {streamDirective} is defined and {index} is greater than or equal to
{initialCount}:
- Initialize {parents} to an empty list.
- For each {deferUsage} in {deferUsageSet}:
- Let {parent} be the entry in {deferMap} for {deferUsage}.
- Append {parent} to {parents}.
- Let {stream} be an unordered map containing {parents}, {path}, and
{label}.
- Let {streamFieldDetails} be the result of
{GetStreamFieldDetailsList(fieldDetailsList)}.
- Let {future} represent the future execution of {ExecuteStreamField(stream,
iterator, streamFieldDetailsList, index, innerType, variableValues,
eventEmitter)}.
- Defer the execution of {future} until {stream} is released as pending, as
signalled by {eventEmitter}, or if early execution is desired, following
any implementation specific deferral, whichever occurs first.
- Append {future} to {futures}.
- Return {items} and {futures}.
- Wait for the next item from {result} via the {iterator}.
- If an item is not retrieved because of an error, raise a _field error_.
- Let {item} be the item retrieved from {result}.
- Let {itemPath} be {path} with {index} appended.
- Let {completedItem} and {itemFutures} be the result of calling
{CompleteValue(innerType, fieldDetailsList, item, variableValues,
eventEmitter, itemPath)}.
- Append {completedItem} to {items}.
- Append all items in {itemFutures} to {futures}.
- Increment {index} by {1}.
- Return {items} and {futures}.

GetStreamFieldDetailsList(fieldDetailsList):

- Let {streamFields} be an empty list.
- For each {fieldDetails} in {fieldDetailsList}:
- Let {field} be the corresponding entry on {fieldDetails}.
- Let {newFieldDetails} be a new Field Details record created from {field}.
- Append {newFieldDetails} to {streamFields}.
- Return {streamFields}.

#### Execute Stream Field

ExecuteStreamField(stream, iterator, fieldDetailsList, index, innerType,
variableValues, eventEmitter):

- Let {path} be the corresponding entry on {stream}.
- Let {itemPath} be {path} with {index} appended.
- Wait for the next item from {iterator}.
- If {iterator} is closed, complete this data stream and return.
- Let {item} be the next item retrieved via {iterator}.
- Let {nextIndex} be {index} plus one.
- Let {completedItem} and {futures} be the result of {CompleteValue(innerType,
fields, item, variableValues, itemPath)}.
- Initialize {items} to an empty list.
- Append {completedItem} to {items}.
- Let {errors} be the list of all _field error_ raised while completing the
item.
- Let {future} represent the future execution of {ExecuteStreamField(stream,
path, iterator, fieldDetailsList, nextIndex, innerType, variableValues,
eventEmitter)}.
- Defer the execution of {future} until {items} is released, or if early
execution is desired, following any implementation specific deferral,
whichever occurs first.
- Append {future} to {futures}.
- Return an unordered map containing {items}, {errors}, and {futures}.

**Coercing Results**

The primary purpose of value completion is to ensure that the values returned by
Expand Down

0 comments on commit e9f3b78

Please sign in to comment.