Skip to content

Commit

Permalink
rename StreamRecord to StreamItemsRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
Cito committed Apr 7, 2024
1 parent a2d1f89 commit 398d5cc
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
19 changes: 11 additions & 8 deletions src/graphql/execution/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def anext(iterator: AsyncIterator) -> Any: # noqa: A001
"subscribe",
"AsyncPayloadRecord",
"DeferredFragmentRecord",
"StreamRecord",
"StreamItemsRecord",
"ExecutionResult",
"ExecutionContext",
"ExperimentalIncrementalExecutionResults",
Expand Down Expand Up @@ -1772,7 +1772,7 @@ def execute_stream_field(
) -> AsyncPayloadRecord:
"""Execute stream field."""
is_awaitable = self.is_awaitable
async_payload_record = StreamRecord(
async_payload_record = StreamItemsRecord(
label, item_path, None, parent_context, self
)
completed_item: Any
Expand Down Expand Up @@ -1865,7 +1865,7 @@ async def execute_stream_async_iterator_item(
field_group: FieldGroup,
info: GraphQLResolveInfo,
item_type: GraphQLOutputType,
async_payload_record: StreamRecord,
async_payload_record: StreamItemsRecord,
item_path: Path,
) -> Any:
"""Execute stream iterator item."""
Expand Down Expand Up @@ -1910,7 +1910,7 @@ async def execute_stream_async_iterator(

while True:
item_path = Path(path, index, None)
async_payload_record = StreamRecord(
async_payload_record = StreamItemsRecord(
label, item_path, async_iterator, previous_async_payload_record, self
)

Expand Down Expand Up @@ -1961,7 +1961,10 @@ def filter_subsequent_payloads(
# async_record points to a path unaffected by this payload
continue
# async_record path points to nulled error field
if isinstance(async_record, StreamRecord) and async_record.async_iterator:
if (
isinstance(async_record, StreamItemsRecord)
and async_record.async_iterator
):
self._canceled_iterators.add(async_record.async_iterator)
del self.subsequent_payloads[async_record]

Expand All @@ -1975,7 +1978,7 @@ def get_completed_incremental_results(self) -> list[IncrementalResult]:
if not async_payload_record.completed.is_set():
continue
del self.subsequent_payloads[async_payload_record]
if isinstance(async_payload_record, StreamRecord):
if isinstance(async_payload_record, StreamItemsRecord):
items = async_payload_record.items
if async_payload_record.is_completed_async_iterator:
# async iterable resolver finished but there may be pending payload
Expand Down Expand Up @@ -2659,7 +2662,7 @@ def add_data(self, data: AwaitableOrValue[dict[str, Any] | None]) -> None:
self._data_added.set()


class StreamRecord:
class StreamItemsRecord:
"""A record collecting items marked with the stream directive"""

errors: list[GraphQLError]
Expand Down Expand Up @@ -2735,4 +2738,4 @@ def set_is_completed_async_iterator(self) -> None:
self._items_added.set()


AsyncPayloadRecord = Union[DeferredFragmentRecord, StreamRecord]
AsyncPayloadRecord = Union[DeferredFragmentRecord, StreamItemsRecord]
8 changes: 5 additions & 3 deletions tests/execution/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
IncrementalStreamResult,
experimental_execute_incrementally,
)
from graphql.execution.execute import StreamRecord
from graphql.execution.execute import StreamItemsRecord
from graphql.language import DocumentNode, parse
from graphql.pyutils import Path
from graphql.type import (
Expand Down Expand Up @@ -175,9 +175,11 @@ def can_format_and_print_incremental_stream_result():
def can_print_stream_record():
context = ExecutionContext.build(schema, parse("{ hero { id } }"))
assert isinstance(context, ExecutionContext)
record = StreamRecord(None, None, None, None, context)
record = StreamItemsRecord(None, None, None, None, context)
assert str(record) == "StreamRecord(path=[])"
record = StreamRecord("foo", Path(None, "bar", "Bar"), None, record, context)
record = StreamItemsRecord(
"foo", Path(None, "bar", "Bar"), None, record, context
)
assert (
str(record) == "StreamRecord(" "path=['bar'], label='foo', parent_context)"
)
Expand Down

0 comments on commit 398d5cc

Please sign in to comment.