Skip to content

Commit

Permalink
feat: add exponential backoff to ResumableStream (#7664)
Browse files Browse the repository at this point in the history
  • Loading branch information
bshaffer authored Sep 16, 2024
1 parent cfcb812 commit 89f982a
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 9 deletions.
2 changes: 1 addition & 1 deletion phpunit-conformance.xml.dist
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit colors="true">
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" bootstrap="./tests/Unit/bootstrap.php" colors="true" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd">
<testsuites>
<testsuite>
<directory>tests/Conformance</directory>
Expand Down
35 changes: 30 additions & 5 deletions src/ResumableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class ResumableStream implements \IteratorAggregate
*/
private $callOptions;

/**
* @var callable
*/
private $delayFunction;

/**
* Constructs a resumable stream.
*
Expand Down Expand Up @@ -116,6 +121,22 @@ public function __construct(
$this->callOptions['retrySettings'] = [
'retriesEnabled' => false
];

$this->delayFunction = function (int $attempt) {
// Values here are taken from the Java Bigtable client, and are
// different than those set by default in the readRows configuration
// @see https://github.com/googleapis/java-bigtable/blob/c618969216c90c42dee6ee48db81e90af4fb102b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java#L162-L164
$initialDelayMillis = 10;
$initialDelayMultiplier = 2;
$maxDelayMillis = 60000;

$delayMultiplier = $initialDelayMultiplier ** $attempt;
$delayMs = min($initialDelayMillis * $delayMultiplier, $maxDelayMillis);
$actualDelayMs = mt_rand(0, $delayMs); // add jitter
$delay = 1000 * $actualDelayMs; // convert ms to µs

usleep((int) $delay);
};
}

/**
Expand All @@ -126,7 +147,8 @@ public function __construct(
*/
public function readAll()
{
$attempt = 0;
// Reset $currentAttempts on successful row read, but keep total attempts for the header.
$currentAttempt = $totalAttempt = 0;
do {
$ex = null;
list($this->request, $this->callOptions) =
Expand All @@ -137,10 +159,10 @@ public function readAll()
if ($completed !== true) {
// Send in "bigtable-attempt" header on retry request
$headers = $this->callOptions['headers'] ?? [];
if ($attempt > 0) {
$headers['bigtable-attempt'] = [(string) $attempt];
if ($totalAttempt > 0) {
$headers['bigtable-attempt'] = [(string) $totalAttempt];
($this->delayFunction)($currentAttempt);
}
$attempt++;

$stream = call_user_func_array(
[$this->gapicClient, $this->method],
Expand All @@ -150,11 +172,14 @@ public function readAll()
try {
foreach ($stream->readAll() as $item) {
yield $item;
$currentAttempt = 0; // reset delay and attempt on successful read.
}
} catch (\Exception $ex) {
$totalAttempt++;
$currentAttempt++;
}
}
} while ((!$this->retryFunction || ($this->retryFunction)($ex)) && $attempt <= $this->retries);
} while ((!$this->retryFunction || ($this->retryFunction)($ex)) && $currentAttempt <= $this->retries);
if ($ex !== null) {
throw $ex;
}
Expand Down
14 changes: 11 additions & 3 deletions tests/Conformance/ReadRowsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
use Google\ApiCore\ServerStream;
use Google\Cloud\Bigtable\ChunkFormatter;
use Google\Cloud\Bigtable\Exception\BigtableDataOperationException;
use Google\Cloud\Bigtable\V2\Client\BigtableClient;
use Google\Cloud\Bigtable\V2\ReadRowsRequest;
use Google\Cloud\Bigtable\V2\ReadRowsResponse;
use Google\Cloud\Bigtable\V2\ReadRowsResponse_CellChunk;
use Google\Cloud\Bigtable\V2\ReadRowsResponse\CellChunk;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;
use Prophecy\Argument;

/**
* @group bigtable
Expand All @@ -48,8 +51,13 @@ public function testReadRows($readRowsResponses, $expectedRows, $expectedErrorCo
$this->serverStream->readAll()->shouldBeCalled()->willReturn(
$this->arrayAsGenerator($readRowsResponses)
);
$request = new ReadRowsRequest();
$client = $this->prophesize(BigtableClient::class);
$client->readRows($request, Argument::type('array'))
->willReturn($this->serverStream->reveal());
$chunkFormatter = new ChunkFormatter(
$this->serverStream->reveal()
$client->reveal(),
$request
);
$rows = [];
$errorCount = 0;
Expand All @@ -73,7 +81,7 @@ public function rowsProvider()
$responses = [];
foreach ($test['chunks_base64'] as $chunk) {
$chunk = base64_decode($chunk);
$cellChunk = new ReadRowsResponse_CellChunk();
$cellChunk = new CellChunk();
$cellChunk->mergeFromString($chunk);
$response = new ReadRowsResponse();
$response->setChunks([$cellChunk]);
Expand Down
134 changes: 134 additions & 0 deletions tests/Unit/ResumableStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<?php
/**
* Copyright 2024, Google LLC All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Google\Cloud\Bigtable\Tests\Unit;

use Google\Cloud\Bigtable\ResumableStream;
use Google\Cloud\Bigtable\V2\Client\BigtableClient;
use Google\Cloud\Bigtable\V2\ReadRowsRequest;
use Google\Cloud\Bigtable\V2\ReadRowsResponse;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;
use Prophecy\Argument;
use Google\ApiCore\ServerStream;

/**
* @group bigtable
* @group bigtabledata
*/
class ResumableStreamTest extends TestCase
{
use ProphecyTrait;

const RETRYABLE_CODE = 1234;

public function testRetryDelayAndAttemptAreResetWhenRowReceived()
{
if (extension_loaded('protobuf')) {
$this->markTestSkipped('This test is not compatible with the protobuf extension');
}

$count = 0;
$stream = $this->prophesize(ServerStream::class);
$generator1 = function () {
yield new ReadRowsResponse();
throw new \Exception('This is the 3rd retryable', self::RETRYABLE_CODE);
};
$generator2 = fn () => yield new ReadRowsResponse();
$stream->readAll()
->will(function () use (&$count, $generator1, $generator2) {
// Simlate a call to readRows where the server throws 2 exceptions, reads a row
// successfuly, throws another exception, and reads one more row successfully.
return match ($count++) {
0 => throw new \Exception('This is the 1st retryable', self::RETRYABLE_CODE),
1 => throw new \Exception('This is the 2nd retryable', self::RETRYABLE_CODE),
2 => $generator1(),
3 => throw new \Exception(
'The 4th exception should retry because attempts reset',
self::RETRYABLE_CODE
),
4 => $generator2(),
};
});
$bigtable = $this->prophesize(BigtableClient::class);
$bigtable->readRows(Argument::type(ReadRowsRequest::class), Argument::type('array'))
->shouldBeCalledTimes(5)
->willReturn($stream->reveal());
$resumableStream = new ResumableStream(
$bigtable->reveal(),
'readRows',
$this->prophesize(ReadRowsRequest::class)->reveal(),
fn ($request, $callOptions) => [$request, $callOptions],
fn ($exception) => $exception && $exception->getCode() === self::RETRYABLE_CODE
);

$retries = 0;
$delayFunction = function ($delayFactor) use (&$retries) {
$this->assertEquals(match (++$retries) {
1 => 1, // initial delay
2 => 2, // increment by 1
3 => 1, // the delay is reset by the successful call
4 => 2, // increment by 1
}, $delayFactor);
};
$prop = (new \ReflectionObject($resumableStream))->getProperty('delayFunction');
$prop->setAccessible(true);
$prop->setValue($resumableStream, $delayFunction);

$rows = iterator_to_array($resumableStream->readAll());
$this->assertEquals(2, count($rows));
$this->assertEquals(4, $retries);
}

public function testThreeConsecutiveFailuresIsNotReset()
{
if (extension_loaded('protobuf')) {
$this->markTestSkipped('This test is not compatible with the protobuf extension');
}

$this->expectException(\Exception::class);
$this->expectExceptionMessage('This is retryable, but we are at our limit!');

$count = 0;
$stream = $this->prophesize(ServerStream::class);
$stream->readAll()
->will(function () use (&$count) {
// Simlate a call to readRows where the server throws 2 exceptions, reads a row
// successfuly, throws another exception, and reads one more row successfully.
return match ($count++) {
0 => throw new \Exception('This is retryable!', self::RETRYABLE_CODE),
1 => throw new \Exception('This is also retryable!', self::RETRYABLE_CODE),
2 => throw new \Exception('This is too is retryable!', self::RETRYABLE_CODE),
3 => throw new \Exception('This is retryable, but we are at our limit!', self::RETRYABLE_CODE),
4 => throw new \Exception('This is not retryable and should not be thrown'),
};
});
$bigtable = $this->prophesize(BigtableClient::class);
$bigtable->readRows(Argument::type(ReadRowsRequest::class), Argument::type('array'))
->shouldBeCalledTimes(4)
->willReturn($stream->reveal());
$resumableStream = new ResumableStream(
$bigtable->reveal(),
'readRows',
$this->prophesize(ReadRowsRequest::class)->reveal(),
fn ($request, $callOptions) => [$request, $callOptions],
fn ($exception) => $exception && $exception->getCode() === self::RETRYABLE_CODE
);

iterator_to_array($resumableStream->readAll());
}
}

0 comments on commit 89f982a

Please sign in to comment.