Skip to content

Commit

Permalink
feat: add delay to ResumableStream
Browse files Browse the repository at this point in the history
  • Loading branch information
bshaffer committed Sep 10, 2024
1 parent 94e6e78 commit e62b520
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
20 changes: 20 additions & 0 deletions Bigtable/src/ResumableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class ResumableStream implements \IteratorAggregate
*/
private $callOptions;

/**
* @var callable
*/
private $delayFunction;

/**
* Constructs a resumable stream.
*
Expand Down Expand Up @@ -116,6 +121,17 @@ public function __construct(
$this->callOptions['retrySettings'] = [
'retriesEnabled' => false
];

$this->delayFunction = function (int $delayFactor) {
$initialDelayMillis = 100;
$initialDelayMultiplier = 1.3;
$maxDelayMillis = 600000;

$delayMultiplier = $initialDelayMultiplier ** $delayFactor;
$delayMs = min($initialDelayMillis * $delayMultiplier, $maxDelayMillis);
$delay = 1000 * $delayMs; // convert ms to µs
usleep((int) $delay);
};
}

/**
Expand All @@ -127,6 +143,7 @@ public function __construct(
public function readAll()
{
$attempt = 0;
$delayFactor = 0;
do {
$ex = null;
list($this->request, $this->callOptions) =
Expand All @@ -139,6 +156,7 @@ public function readAll()
$headers = $this->callOptions['headers'] ?? [];
if ($attempt > 0) {
$headers['bigtable-attempt'] = [(string) $attempt];
($this->delayFunction)($delayFactor);
}
$attempt++;

Expand All @@ -150,8 +168,10 @@ public function readAll()
try {
foreach ($stream->readAll() as $item) {
yield $item;
$delayFactor = 0; // reset delay factor on successful read.
}
} catch (\Exception $ex) {
$delayFactor++;
}
}
} while ((!$this->retryFunction || ($this->retryFunction)($ex)) && $attempt <= $this->retries);
Expand Down
90 changes: 90 additions & 0 deletions Bigtable/tests/Unit/ResumableStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php
/**
* Copyright 2024, Google LLC All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Google\Cloud\Bigtable\Tests\Unit;

use Google\Cloud\Bigtable\ResumableStream;
use Google\Cloud\Bigtable\V2\Client\BigtableClient;
use Google\Cloud\Bigtable\V2\ReadRowsRequest;
use Google\Cloud\Bigtable\V2\ReadRowsResponse;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;
use Prophecy\Argument;
use Google\ApiCore\ServerStream;

/**
* @group bigtable
* @group bigtabledata
*/
class ResumableStreamTest extends TestCase
{
use ProphecyTrait;

const RETRYABLE_CODE = 1234;

public function testRetryDelayIsResetWhenRowReceived()
{
$stream = $this->prophesize(ServerStream::class);
$generator1 = function () {
yield new ReadRowsResponse();
throw new \Exception('This too is retryable!', self::RETRYABLE_CODE);
};
$generator2 = fn () => yield new ReadRowsResponse();
$count = 0;
$stream->readAll()
->will(function () use (&$count, $generator1, $generator2) {
return match ($count++) {
0 => throw new \Exception('This is retryable!', self::RETRYABLE_CODE),
1 => throw new \Exception('This is also retryable!', self::RETRYABLE_CODE),
2 => $generator1(),
3 => $generator2(),
};
});
$delayFunction = function ($delayFactor) use (&$count) {
$this->assertEquals(match ($count) {
1 => 1, // initial delay
2 => 2, // increment by 1
3 => 1, // the delay is reset by the successful call
}, $delayFactor);
};
$bigtable = $this->prophesize(BigtableClient::class);
$bigtable->readRows(Argument::type(ReadRowsRequest::class), Argument::type('array'))
->shouldBeCalledTimes(4)
->willReturn($stream->reveal());
$resumableStream = new ResumableStream(
$bigtable->reveal(),
'readRows',
$this->prophesize(ReadRowsRequest::class)->reveal(),
function ($request, $callOptions) {
return [$request, $callOptions];
},
function ($exception) {
return $exception && $exception->getCode() === self::RETRYABLE_CODE;
}
);

$refl = new \ReflectionObject($resumableStream);
$refl->getProperty('delayFunction')->setValue($resumableStream, $delayFunction);

$rowCount = 0;
foreach ($resumableStream->readAll() as $item) {
$rowCount++;
$this->assertInstanceOf(ReadRowsResponse::class, $item);
}
$this->assertEquals(2, $rowCount);
}
}

0 comments on commit e62b520

Please sign in to comment.