Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: exponential random retry #76

Merged
merged 4 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ jobs:
- checkout
- run: |
mkdir -p tools/php-cs-fixer
composer require --working-dir=tools/php-cs-fixer friendsofphp/php-cs-fixer
composer require --working-dir=tools/php-cs-fixer friendsofphp/php-cs-fixer:2.18.7
tools/php-cs-fixer/vendor/bin/php-cs-fixer fix --dry-run --verbose --show-progress=estimating --using-cache=no --diff

workflows:
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.13.0 [unreleased]

### Features
1. [#76](https://github.com/influxdata/influxdb-client-php/pull/76): Exponential random backoff retry strategy

## 1.12.0 [2021-04-01]

### Features
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ The writes are processed in batches which are configurable by `WriteOptions`:
| **retryInterval** | the number of milliseconds to retry unsuccessful write. The retry interval is "exponentially" used when the InfluxDB server does not specify "Retry-After" header. | 5000 |
| **jitterInterval** | the number of milliseconds before the data is written increased by a random amount | 0 |
| **maxRetries** | the number of max retries when write fails | 5 |
| **maxRetryDelay** | maximum delay when retrying write in milliseconds | 180000 |
| **exponentialBase** | the base for the exponential retry delay, the next delay is computed as `retryInterval * exponentialBase^(attempts-1)` | 5 |
| **maxRetryDelay** | maximum delay when retrying write in milliseconds | 125000 |
| **maxRetryTime** | maximum total retry timeout in milliseconds | 180000 |
| **exponentialBase** | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retryInterval * exponentialBase^(attempts-1)`` and ``retryInterval * exponentialBase^(attempts)``. Example for ``retryInterval=5000, exponentialBase=2, maxRetryDelay=125000, total=5`` Retry delays are random distributed values within the ranges of ``[5000-10000, 10000-20000, 20000-40000, 40000-80000, 80000-125000]`` | 2 |
```php
use InfluxDB2\Client;
use InfluxDB2\WriteType as WriteType;
Expand Down
14 changes: 0 additions & 14 deletions src/InfluxDB2/DefaultApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,4 @@ function ($v, $k) {
throw new InvalidArgumentException("The '${key}' should be defined as argument or default option: {$options}");
}
}

/**
* Log message with specified severity to log file defined by: 'options['logFile']'.
*
* @param string $level log severity
* @param string $message log message
*/
protected function log(string $level, string $message): void
{
$logFile = isset($this->options['logFile']) ? $this->options['logFile'] : "php://output";
$logDate = date('H:i:s d-M-Y');

file_put_contents($logFile, "[{$logDate}]: [{$level}] - {$message}", FILE_APPEND);
}
}
64 changes: 13 additions & 51 deletions src/InfluxDB2/WriteApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

namespace InfluxDB2;

use GuzzleHttp\Exception\ConnectException;
use InfluxDB2\Model\WritePrecision;

/**
* Write time series data into InfluxDB.
* @package InfluxDB2
Expand All @@ -21,7 +18,7 @@ class WriteApi extends DefaultApi implements Writer
/**
* WriteApi constructor.
* @param $options
* @param array $writeOptions
* @param array|null $writeOptions
* @param array|null $pointSettings
*/
public function __construct($options, array $writeOptions = null, array $pointSettings = null)
Expand Down Expand Up @@ -134,55 +131,20 @@ public function writeRaw(string $data, string $precision = null, string $bucket

$queryParams = ["org" => $orgParam, "bucket" => $bucketParam, "precision" => $precisionParam];

$this->writeRawInternal($data, $queryParams, 1, $this->writeOptions->retryInterval);
}

private function writeRawInternal(string $data, array $queryParams, int $attempts, int $retryInterval)
{
if ($this->writeOptions->jitterInterval > 0) {
$jitterDelay = ($this->writeOptions->jitterInterval * 1000) * (rand(0, 1000) / 1000);
usleep($jitterDelay);
}

try {
$retry = new WriteRetry(
$this->writeOptions->maxRetries,
$this->writeOptions->retryInterval,
$this->writeOptions->maxRetryDelay,
$this->writeOptions->exponentialBase,
$this->writeOptions->maxRetryTime,
$this->writeOptions->jitterInterval,
$this->options['logFile'] ?? "php://output"
);

$retry->retry(function () use ($data, $queryParams) {
$this->post($data, "/api/v2/write", $queryParams);
} catch (ApiException $e) {
$code = $e->getCode();

if ($attempts > $this->writeOptions->maxRetries) {
throw $e;
}

if (($code == null || $code < 429) && !($e->getPrevious() instanceof ConnectException)) {
throw $e;
}

$headers = $e->getResponseHeaders();

if ($headers != null && array_key_exists('Retry-After', $headers)) {
$timeout = (int)$headers['Retry-After'][0] * 1000000.0;
} else {
$timeout = min($retryInterval, $this->writeOptions->maxRetryDelay) * 1000.0;
}

$timeoutInSec = $timeout / 1000000.0;
$error = $e->getResponseBody();
$error = isset($error) ? $error : $e->getMessage();

$message = "The retriable error occurred during writing of data. Reason: '{$error}'. Retry in: {$timeoutInSec}s.";
$this->log("WARNING", $message);

usleep($timeout);

$this->writeRawInternal(
$data,
$queryParams,
$attempts + 1,
$retryInterval * $this->writeOptions->exponentialBase
);
}
});
}

public function close()
{
$this->closed = true;
Expand Down
21 changes: 15 additions & 6 deletions src/InfluxDB2/WriteOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ class WriteOptions
const DEFAULT_BATCH_SIZE = 10;
const DEFAULT_RETRY_INTERVAL = 5000;
const DEFAULT_MAX_RETRIES = 5;
const DEFAULT_MAX_RETRY_DELAY = 180000;
const DEFAULT_EXPONENTIAL_BASE = 5;
const DEFAULT_MAX_RETRY_DELAY = 125000;
const DEFAULT_MAX_RETRY_TIME = 180000;
const DEFAULT_EXPONENTIAL_BASE = 2;
const DEFAULT_JITTER_INTERVAL = 0;

public $writeType;
Expand All @@ -18,6 +19,7 @@ class WriteOptions
public $maxRetryDelay;
public $exponentialBase;
public $jitterInterval;
public $maxRetryTime;

/**
* WriteOptions constructor.
Expand All @@ -27,12 +29,18 @@ class WriteOptions
* 'retryInterval' => number of milliseconds to retry unsuccessful write
* 'maxRetries' => max number of retries when write fails
* The retry interval is used when the InfluxDB server does not specify "Retry-After" header.
* 'maxRetryDelay' => maximum delay when retrying write
* 'exponentialBase' => the base for the exponential retry delay, the next delay is computed as
* `retry_interval * exponentialBase^(attempts - 1)`
* 'maxRetryDelay' => maximum delay when retrying write in milliseconds
* 'maxRetryTime' => maximum total time when retrying write in milliseconds
* 'exponentialBase' => the base for the exponential retry delay, the next delay is computed using
* random exponential backoff as a random value within the interval
* ``retryInterval * exponentialBase^(attempts-1)`` and
* ``retryInterval * exponentialBase^(attempts)``.
* Example for ``retryInterval=5000, exponentialBase=2, maxRetryDelay=125000, total=5``
* Retry delays are random distributed values within the ranges of
* ``[5000-10000, 10000-20000, 20000-40000, 40000-80000, 80000-125000]``
* 'jitterInterval' => the number of milliseconds before the data is written increased by a random amount
* ]
* @param array $writeOptions Array containing the write parameters (See above)
* @param array|null $writeOptions Array containing the write parameters (See above)
*/
public function __construct(array $writeOptions = null)
{
Expand All @@ -42,6 +50,7 @@ public function __construct(array $writeOptions = null)
$this->retryInterval = $writeOptions["retryInterval"] ?? self::DEFAULT_RETRY_INTERVAL;
$this->maxRetries = $writeOptions["maxRetries"] ?? self::DEFAULT_MAX_RETRIES;
$this->maxRetryDelay = $writeOptions["maxRetryDelay"] ?? self::DEFAULT_MAX_RETRY_DELAY;
$this->maxRetryTime = $writeOptions["maxRetryTime"] ?? self::DEFAULT_MAX_RETRY_TIME;
$this->exponentialBase = $writeOptions["exponentialBase"] ?? self::DEFAULT_EXPONENTIAL_BASE;
$this->jitterInterval = $writeOptions["jitterInterval"] ?? self::DEFAULT_JITTER_INTERVAL;
}
Expand Down
143 changes: 143 additions & 0 deletions src/InfluxDB2/WriteRetry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
<?php

namespace InfluxDB2;

use GuzzleHttp\Exception\ConnectException;

/**
* Exponential random write retry.
*/
class WriteRetry
{
private $maxRetries;
private $retryInterval;
private $maxRetryDelay;
private $exponentialBase;
private $jitterInterval;
private $maxRetryTime;
private $retryTimout;
/**
* @var mixed|string
*/
private $logFile;

/**
* WriteRetry constructor.
*
* @param int $maxRetries max number of retries when write fails
* @param int $retryInterval number of milliseconds to retry unsuccessful write,
* The retry interval is used when the InfluxDB server does not specify "Retry-After" header.
* @param int $maxRetryDelay maximum delay when retrying write in milliseconds
* @param int $exponentialBase the base for the exponential retry delay, the next delay is computed using
* random exponential backoff as a random value within the interval
* ``retryInterval * exponentialBase^(attempts-1)`` and
* ``retryInterval * exponentialBase^(attempts)``.
* Example for ``retryInterval=5000, exponentialBase=2, maxRetryDelay=125000, total=5``
* Retry delays are random distributed values within the ranges of
* ``[5000-10000, 10000-20000, 20000-40000, 40000-80000, 80000-125000]``
*
* @param int $maxRetryTime maximum total time when retrying write in milliseconds
* @param int $jitterInterval the number of milliseconds before the data is written increased by a random amount
* @param string $logFile logfile
*/
public function __construct(
int $maxRetries = 5,
int $retryInterval = 5000,
int $maxRetryDelay = 125000,
int $exponentialBase = 2,
int $maxRetryTime = 180000,
int $jitterInterval = 0,
string $logFile = "php://output"
) {
$this->maxRetries = $maxRetries;
$this->retryInterval = $retryInterval;
$this->maxRetryDelay = $maxRetryDelay;
$this->maxRetryTime = $maxRetryTime;
$this->exponentialBase = $exponentialBase;
$this->jitterInterval = $jitterInterval;
$this->logFile = $logFile;

//retry timout
$this->retryTimout = microtime(true) * 1000 + $maxRetryTime;
}

/**
* @throws ApiException
*/
public function retry($callable, $attempts = 0)
{
try {
return call_user_func($callable);
} catch (ApiException $e) {
$error = $e->getResponseBody() ?? $e->getMessage();

if (!$this->isRetryable($e)) {
throw $e;
}
$attempts++;
if ($attempts > $this->maxRetries) {
$this->log("ERROR", "Maximum retry attempts reached");
throw $e;
}

// throws exception when max retry time is exceeded
if (microtime(true) * 1000 > $this->retryTimout) {
$this->log("ERROR", "Maximum retry time $this->maxRetryTime ms exceeded");
throw $e;
}

$headers = $e->getResponseHeaders();
if ($headers != null && array_key_exists('Retry-After', $headers)) {
//jitter add in microseconds
$jitterMicro = rand(0, $this->jitterInterval) * 1000;
$timeout = (int)$headers['Retry-After'][0] * 1000000.0 + $jitterMicro;
} else {
$timeout = $this->getBackoffTime($attempts) * 1000;
}

$timeoutInSec = $timeout / 1000000.0;

$message = "The retryable error occurred during writing of data. Reason: '$error'. Retry in: {$timeoutInSec}s.";
$this->log("WARNING", $message);
usleep($timeout);
$this->retry($callable, $attempts);
}
}

public function isRetryable(ApiException $e): bool
{
$code = $e->getCode();
if (($code == null || $code < 429) &&
!($e->getPrevious() instanceof ConnectException)) {
return false;
}
return true;
}

public function getBackoffTime(int $attempt)
{
$range_start = $this->retryInterval;
$range_stop = $this->retryInterval * $this->exponentialBase;

$i = 1;
while ($i < $attempt) {
$i += 1;
$range_start = $range_stop;
$range_stop = $range_stop * $this->exponentialBase;
if ($range_stop > $this->maxRetryDelay) {
break;
}
}

if ($range_stop > $this->maxRetryDelay) {
$range_stop = $this->maxRetryDelay;
}
return $range_start + ($range_stop - $range_start) * (rand(0, 1000) / 1000);
}

private function log(string $level, string $message): void
{
$logDate = date('H:i:s d-M-Y');
file_put_contents($this->logFile, "[$logDate]: [$level] - $message".PHP_EOL, FILE_APPEND);
}
}
Loading