diff --git a/phpunit-conformance.xml.dist b/phpunit-conformance.xml.dist index 236f6e8..6e6226b 100644 --- a/phpunit-conformance.xml.dist +++ b/phpunit-conformance.xml.dist @@ -1,5 +1,5 @@ - + tests/Conformance diff --git a/src/ResumableStream.php b/src/ResumableStream.php index fb34415..fcd80d4 100644 --- a/src/ResumableStream.php +++ b/src/ResumableStream.php @@ -79,6 +79,11 @@ class ResumableStream implements \IteratorAggregate */ private $callOptions; + /** + * @var callable + */ + private $delayFunction; + /** * Constructs a resumable stream. * @@ -116,6 +121,22 @@ public function __construct( $this->callOptions['retrySettings'] = [ 'retriesEnabled' => false ]; + + $this->delayFunction = function (int $attempt) { + // Values here are taken from the Java Bigtable client, and are + // different than those set by default in the readRows configuration + // @see https://github.com/googleapis/java-bigtable/blob/c618969216c90c42dee6ee48db81e90af4fb102b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java#L162-L164 + $initialDelayMillis = 10; + $initialDelayMultiplier = 2; + $maxDelayMillis = 60000; + + $delayMultiplier = $initialDelayMultiplier ** $attempt; + $delayMs = min($initialDelayMillis * $delayMultiplier, $maxDelayMillis); + $actualDelayMs = mt_rand(0, $delayMs); // add jitter + $delay = 1000 * $actualDelayMs; // convert ms to µs + + usleep((int) $delay); + }; } /** @@ -126,7 +147,8 @@ public function __construct( */ public function readAll() { - $attempt = 0; + // Reset $currentAttempts on successful row read, but keep total attempts for the header. + $currentAttempt = $totalAttempt = 0; do { $ex = null; list($this->request, $this->callOptions) = @@ -137,10 +159,10 @@ public function readAll() if ($completed !== true) { // Send in "bigtable-attempt" header on retry request $headers = $this->callOptions['headers'] ?? []; - if ($attempt > 0) { - $headers['bigtable-attempt'] = [(string) $attempt]; + if ($totalAttempt > 0) { + $headers['bigtable-attempt'] = [(string) $totalAttempt]; + ($this->delayFunction)($currentAttempt); } - $attempt++; $stream = call_user_func_array( [$this->gapicClient, $this->method], @@ -150,11 +172,14 @@ public function readAll() try { foreach ($stream->readAll() as $item) { yield $item; + $currentAttempt = 0; // reset delay and attempt on successful read. } } catch (\Exception $ex) { + $totalAttempt++; + $currentAttempt++; } } - } while ((!$this->retryFunction || ($this->retryFunction)($ex)) && $attempt <= $this->retries); + } while ((!$this->retryFunction || ($this->retryFunction)($ex)) && $currentAttempt <= $this->retries); if ($ex !== null) { throw $ex; } diff --git a/tests/Conformance/ReadRowsTest.php b/tests/Conformance/ReadRowsTest.php index 74cba5b..ec448f4 100644 --- a/tests/Conformance/ReadRowsTest.php +++ b/tests/Conformance/ReadRowsTest.php @@ -20,10 +20,13 @@ use Google\ApiCore\ServerStream; use Google\Cloud\Bigtable\ChunkFormatter; use Google\Cloud\Bigtable\Exception\BigtableDataOperationException; +use Google\Cloud\Bigtable\V2\Client\BigtableClient; +use Google\Cloud\Bigtable\V2\ReadRowsRequest; use Google\Cloud\Bigtable\V2\ReadRowsResponse; -use Google\Cloud\Bigtable\V2\ReadRowsResponse_CellChunk; +use Google\Cloud\Bigtable\V2\ReadRowsResponse\CellChunk; use PHPUnit\Framework\TestCase; use Prophecy\PhpUnit\ProphecyTrait; +use Prophecy\Argument; /** * @group bigtable @@ -48,8 +51,13 @@ public function testReadRows($readRowsResponses, $expectedRows, $expectedErrorCo $this->serverStream->readAll()->shouldBeCalled()->willReturn( $this->arrayAsGenerator($readRowsResponses) ); + $request = new ReadRowsRequest(); + $client = $this->prophesize(BigtableClient::class); + $client->readRows($request, Argument::type('array')) + ->willReturn($this->serverStream->reveal()); $chunkFormatter = new ChunkFormatter( - $this->serverStream->reveal() + $client->reveal(), + $request ); $rows = []; $errorCount = 0; @@ -73,7 +81,7 @@ public function rowsProvider() $responses = []; foreach ($test['chunks_base64'] as $chunk) { $chunk = base64_decode($chunk); - $cellChunk = new ReadRowsResponse_CellChunk(); + $cellChunk = new CellChunk(); $cellChunk->mergeFromString($chunk); $response = new ReadRowsResponse(); $response->setChunks([$cellChunk]); diff --git a/tests/Unit/ResumableStreamTest.php b/tests/Unit/ResumableStreamTest.php new file mode 100644 index 0000000..09d95f9 --- /dev/null +++ b/tests/Unit/ResumableStreamTest.php @@ -0,0 +1,134 @@ +markTestSkipped('This test is not compatible with the protobuf extension'); + } + + $count = 0; + $stream = $this->prophesize(ServerStream::class); + $generator1 = function () { + yield new ReadRowsResponse(); + throw new \Exception('This is the 3rd retryable', self::RETRYABLE_CODE); + }; + $generator2 = fn () => yield new ReadRowsResponse(); + $stream->readAll() + ->will(function () use (&$count, $generator1, $generator2) { + // Simlate a call to readRows where the server throws 2 exceptions, reads a row + // successfuly, throws another exception, and reads one more row successfully. + return match ($count++) { + 0 => throw new \Exception('This is the 1st retryable', self::RETRYABLE_CODE), + 1 => throw new \Exception('This is the 2nd retryable', self::RETRYABLE_CODE), + 2 => $generator1(), + 3 => throw new \Exception( + 'The 4th exception should retry because attempts reset', + self::RETRYABLE_CODE + ), + 4 => $generator2(), + }; + }); + $bigtable = $this->prophesize(BigtableClient::class); + $bigtable->readRows(Argument::type(ReadRowsRequest::class), Argument::type('array')) + ->shouldBeCalledTimes(5) + ->willReturn($stream->reveal()); + $resumableStream = new ResumableStream( + $bigtable->reveal(), + 'readRows', + $this->prophesize(ReadRowsRequest::class)->reveal(), + fn ($request, $callOptions) => [$request, $callOptions], + fn ($exception) => $exception && $exception->getCode() === self::RETRYABLE_CODE + ); + + $retries = 0; + $delayFunction = function ($delayFactor) use (&$retries) { + $this->assertEquals(match (++$retries) { + 1 => 1, // initial delay + 2 => 2, // increment by 1 + 3 => 1, // the delay is reset by the successful call + 4 => 2, // increment by 1 + }, $delayFactor); + }; + $prop = (new \ReflectionObject($resumableStream))->getProperty('delayFunction'); + $prop->setAccessible(true); + $prop->setValue($resumableStream, $delayFunction); + + $rows = iterator_to_array($resumableStream->readAll()); + $this->assertEquals(2, count($rows)); + $this->assertEquals(4, $retries); + } + + public function testThreeConsecutiveFailuresIsNotReset() + { + if (extension_loaded('protobuf')) { + $this->markTestSkipped('This test is not compatible with the protobuf extension'); + } + + $this->expectException(\Exception::class); + $this->expectExceptionMessage('This is retryable, but we are at our limit!'); + + $count = 0; + $stream = $this->prophesize(ServerStream::class); + $stream->readAll() + ->will(function () use (&$count) { + // Simlate a call to readRows where the server throws 2 exceptions, reads a row + // successfuly, throws another exception, and reads one more row successfully. + return match ($count++) { + 0 => throw new \Exception('This is retryable!', self::RETRYABLE_CODE), + 1 => throw new \Exception('This is also retryable!', self::RETRYABLE_CODE), + 2 => throw new \Exception('This is too is retryable!', self::RETRYABLE_CODE), + 3 => throw new \Exception('This is retryable, but we are at our limit!', self::RETRYABLE_CODE), + 4 => throw new \Exception('This is not retryable and should not be thrown'), + }; + }); + $bigtable = $this->prophesize(BigtableClient::class); + $bigtable->readRows(Argument::type(ReadRowsRequest::class), Argument::type('array')) + ->shouldBeCalledTimes(4) + ->willReturn($stream->reveal()); + $resumableStream = new ResumableStream( + $bigtable->reveal(), + 'readRows', + $this->prophesize(ReadRowsRequest::class)->reveal(), + fn ($request, $callOptions) => [$request, $callOptions], + fn ($exception) => $exception && $exception->getCode() === self::RETRYABLE_CODE + ); + + iterator_to_array($resumableStream->readAll()); + } +}