diff --git a/src/Payload.php b/src/Payload.php index bd3ce5e..a1755ae 100644 --- a/src/Payload.php +++ b/src/Payload.php @@ -15,7 +15,7 @@ * * @implements \IteratorAggregate */ -final class Payload implements ReadableStream, \IteratorAggregate +final class Payload implements ReadableStream, \IteratorAggregate, \Stringable { use ReadableStreamIteratorAggregate; use ForbidCloning; @@ -128,4 +128,15 @@ public function onClose(\Closure $onClose): void { $this->onClose->getFuture()->finally($onClose); } + + /** + * Buffers entire stream before returning. Use {@see self::buffer()} to optionally provide a{@see Cancellation} + * and/or length limit. + * + * @throws BufferException|StreamException + */ + public function __toString(): string + { + return $this->buffer(); + } } diff --git a/test/PayloadTest.php b/test/PayloadTest.php index f4b3a58..a483ca9 100644 --- a/test/PayloadTest.php +++ b/test/PayloadTest.php @@ -7,83 +7,73 @@ use Amp\Pipeline\Queue; use Revolt\EventLoop; use function Amp\async; +use function Amp\delay; final class PayloadTest extends AsyncTestCase { - public function testBufferingAll(): void + /** + * @param \Closure(string $expected, Payload $stream):void $test + */ + private function usingStream(\Closure $test, ?float $delay = null): void { $values = ["abc", "def", "ghi"]; $queue = new Queue; $stream = new Payload(new ReadableIterableStream($queue->pipe())); + $future = async($test, \implode($values), $stream); + foreach ($values as $value) { - $queue->pushAsync($value)->ignore(); + $queue->push($value); + } + + if ($delay > 0) { + delay($delay); } $queue->complete(); - self::assertSame(\implode($values), $stream->buffer()); + $future->await(); } public function testFullStreamConsumption(): void { - $values = ["abc", "def", "ghi"]; - - $queue = new Queue; - $stream = new Payload(new ReadableIterableStream($queue->pipe())); - - foreach ($values as $value) { - $queue->pushAsync($value)->ignore(); - } - - EventLoop::delay(0.005, function () use ($queue) { - $queue->complete(); - }); - - $buffer = ""; - while (($chunk = $stream->read()) !== null) { - $buffer .= $chunk; - } + $this->usingStream(function (string $expected, Payload $stream): void { + $emitted = []; + while (($chunk = $stream->read()) !== null) { + $emitted[] = $chunk; + } - self::assertSame(\implode($values), $buffer); + self::assertSame($expected, \implode($emitted)); + }, delay: 0.1); } public function testFastResolvingStream(): void { - $values = ["abc", "def", "ghi"]; - - $queue = new Queue; - $stream = new Payload(new ReadableIterableStream($queue->pipe())); - - foreach ($values as $value) { - $queue->pushAsync($value)->ignore(); - } - - $queue->complete(); - - $emitted = []; - while (($chunk = $stream->read()) !== null) { - $emitted[] = $chunk; - } + $this->usingStream(function (string $expected, Payload $stream): void { + $emitted = []; + while (($chunk = $stream->read()) !== null) { + $emitted[] = $chunk; + } - self::assertSame($values, $emitted); + self::assertSame($expected, \implode($emitted)); + }); } public function testFastResolvingStreamBufferingOnly(): void { - $values = ["abc", "def", "ghi"]; - - $queue = new Queue; - $stream = new Payload(new ReadableIterableStream($queue->pipe())); - - foreach ($values as $value) { - $queue->pushAsync($value)->ignore(); - } - - $queue->complete(); + $this->usingStream( + fn (string $expected, Payload $stream) => self::assertSame($expected, $stream->buffer()), + delay: 0.1, + ); + } - self::assertSame(\implode($values), $stream->buffer()); + public function testStringCast(): void + { + $this->usingStream( + fn (string $expected, Payload $stream) => self::assertSame($expected, (string) $stream), + delay: 0.1, + ); } public function testPartialStreamConsumption(): void