From e62b520fc9cd5c69aa086b80f6e6a3a294188f48 Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Tue, 10 Sep 2024 16:06:18 -0700 Subject: [PATCH] feat: add delay to ResumableStream --- Bigtable/src/ResumableStream.php | 20 +++++ Bigtable/tests/Unit/ResumableStreamTest.php | 90 +++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 Bigtable/tests/Unit/ResumableStreamTest.php diff --git a/Bigtable/src/ResumableStream.php b/Bigtable/src/ResumableStream.php index fb344157a4b5..4284e5f77340 100644 --- a/Bigtable/src/ResumableStream.php +++ b/Bigtable/src/ResumableStream.php @@ -79,6 +79,11 @@ class ResumableStream implements \IteratorAggregate */ private $callOptions; + /** + * @var callable + */ + private $delayFunction; + /** * Constructs a resumable stream. * @@ -116,6 +121,17 @@ public function __construct( $this->callOptions['retrySettings'] = [ 'retriesEnabled' => false ]; + + $this->delayFunction = function (int $delayFactor) { + $initialDelayMillis = 100; + $initialDelayMultiplier = 1.3; + $maxDelayMillis = 600000; + + $delayMultiplier = $initialDelayMultiplier ** $delayFactor; + $delayMs = min($initialDelayMillis * $delayMultiplier, $maxDelayMillis); + $delay = 1000 * $delayMs; // convert ms to µs + usleep((int) $delay); + }; } /** @@ -127,6 +143,7 @@ public function __construct( public function readAll() { $attempt = 0; + $delayFactor = 0; do { $ex = null; list($this->request, $this->callOptions) = @@ -139,6 +156,7 @@ public function readAll() $headers = $this->callOptions['headers'] ?? []; if ($attempt > 0) { $headers['bigtable-attempt'] = [(string) $attempt]; + ($this->delayFunction)($delayFactor); } $attempt++; @@ -150,8 +168,10 @@ public function readAll() try { foreach ($stream->readAll() as $item) { yield $item; + $delayFactor = 0; // reset delay factor on successful read. } } catch (\Exception $ex) { + $delayFactor++; } } } while ((!$this->retryFunction || ($this->retryFunction)($ex)) && $attempt <= $this->retries); diff --git a/Bigtable/tests/Unit/ResumableStreamTest.php b/Bigtable/tests/Unit/ResumableStreamTest.php new file mode 100644 index 000000000000..7cec2c1931d6 --- /dev/null +++ b/Bigtable/tests/Unit/ResumableStreamTest.php @@ -0,0 +1,90 @@ +prophesize(ServerStream::class); + $generator1 = function () { + yield new ReadRowsResponse(); + throw new \Exception('This too is retryable!', self::RETRYABLE_CODE); + }; + $generator2 = fn () => yield new ReadRowsResponse(); + $count = 0; + $stream->readAll() + ->will(function () use (&$count, $generator1, $generator2) { + return match ($count++) { + 0 => throw new \Exception('This is retryable!', self::RETRYABLE_CODE), + 1 => throw new \Exception('This is also retryable!', self::RETRYABLE_CODE), + 2 => $generator1(), + 3 => $generator2(), + }; + }); + $delayFunction = function ($delayFactor) use (&$count) { + $this->assertEquals(match ($count) { + 1 => 1, // initial delay + 2 => 2, // increment by 1 + 3 => 1, // the delay is reset by the successful call + }, $delayFactor); + }; + $bigtable = $this->prophesize(BigtableClient::class); + $bigtable->readRows(Argument::type(ReadRowsRequest::class), Argument::type('array')) + ->shouldBeCalledTimes(4) + ->willReturn($stream->reveal()); + $resumableStream = new ResumableStream( + $bigtable->reveal(), + 'readRows', + $this->prophesize(ReadRowsRequest::class)->reveal(), + function ($request, $callOptions) { + return [$request, $callOptions]; + }, + function ($exception) { + return $exception && $exception->getCode() === self::RETRYABLE_CODE; + } + ); + + $refl = new \ReflectionObject($resumableStream); + $refl->getProperty('delayFunction')->setValue($resumableStream, $delayFunction); + + $rowCount = 0; + foreach ($resumableStream->readAll() as $item) { + $rowCount++; + $this->assertInstanceOf(ReadRowsResponse::class, $item); + } + $this->assertEquals(2, $rowCount); + } +}