Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[InfluxDB][Monitoring] Allow passing Client as configuration option. #809

Merged
merged 5 commits into from
Apr 1, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions pkg/monitoring/InfluxDbStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use Enqueue\Dsn\Dsn;
use InfluxDB\Client;
use InfluxDB\Database;
use InfluxDB\Driver\QueryDriverInterface;
use InfluxDB\Exception as InfluxDBException;
use InfluxDB\Point;

class InfluxDbStorage implements StatsStorage
Expand Down Expand Up @@ -38,6 +40,8 @@ class InfluxDbStorage implements StatsStorage
* 'measurementSentMessages' => 'sent-messages',
* 'measurementConsumedMessages' => 'consumed-messages',
* 'measurementConsumers' => 'consumers',
* 'client' => null, # Client instance. Null by default.
* 'retentionPolicy' => null,
* ]
*
* or
Expand All @@ -59,6 +63,13 @@ public function __construct($config = 'influxdb:')
} elseif (is_array($config)) {
$config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']);
} elseif ($config instanceof Client) {
// Passing Client instead of array config is deprecated because it prevents setting any configuration values
// and causes library to use defaults.
@trigger_error(
sprintf('Passing %s as %s argument is deprecated. Pass it as "client" array property instead',
Client::class,
__METHOD__
), E_USER_DEPRECATED);
$this->client = $config;
$config = [];
} else {
Expand All @@ -74,8 +85,22 @@ public function __construct($config = 'influxdb:')
'measurementSentMessages' => 'sent-messages',
'measurementConsumedMessages' => 'consumed-messages',
'measurementConsumers' => 'consumers',
'client' => null,
'retentionPolicy' => null,
], $config);

if (null !== $config['client']) {
if (!$config['client'] instanceof Client) {
throw new \InvalidArgumentException(sprintf(
'%s configuration property is expected to be an instance of %s class. %s was passed instead.',
'client',
Client::class,
gettype($config['client'])
));
}
$this->client = $config['client'];
}

$this->config = $config;
}

Expand Down Expand Up @@ -109,7 +134,7 @@ public function pushConsumerStats(ConsumerStats $stats): void
$points[] = new Point($this->config['measurementConsumers'], null, $tags, $values, $stats->getTimestampMs());
}

$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
$this->doWrite($points);
}

public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
Expand All @@ -135,7 +160,7 @@ public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
new Point($this->config['measurementConsumedMessages'], $runtime, $tags, $values, $stats->getTimestampMs()),
];

$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
$this->doWrite($points);
}

public function pushSentMessageStats(SentMessageStats $stats): void
Expand All @@ -158,21 +183,44 @@ public function pushSentMessageStats(SentMessageStats $stats): void
new Point($this->config['measurementSentMessages'], 1, $tags, [], $stats->getTimestampMs()),
];

$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
$this->doWrite($points);
}

private function getDb(): Database
private function doWrite(array $points): void
{
if (null === $this->database) {
if (null === $this->client) {
$this->client = new Client(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password']
);
if (!$this->client || $this->client->getDriver() instanceof QueryDriverInterface) {
$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS, $this->config['retentionPolicy']);
} else {
// Code below mirrors what `writePoints` method of Database does.
Steveb-p marked this conversation as resolved.
Show resolved Hide resolved
try {
$parameters = [
'url' => sprintf('write?db=%s&precision=%s', $this->config['db'], Database::PRECISION_MILLISECONDS),
'database' => $this->config['db'],
'method' => 'post',
];
if (null !== $this->config['retentionPolicy']) {
$parameters['url'] .= sprintf('&rp=%s', $this->config['retentionPolicy']);
}

$this->client->write($parameters, $points);
} catch (\Exception $e) {
throw new InfluxDBException($e->getMessage(), $e->getCode());
}
}
}

private function getDb(): Database
{
if (null === $this->client) {
$this->client = new Client(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password']
);
}

if (null === $this->database) {
$this->database = $this->client->selectDB($this->config['db']);
$this->database->create();
}
Expand Down Expand Up @@ -200,6 +248,7 @@ private function parseDsn(string $dsn): array
'measurementSentMessages' => $dsn->getString('measurementSentMessages'),
'measurementConsumedMessages' => $dsn->getString('measurementConsumedMessages'),
'measurementConsumers' => $dsn->getString('measurementConsumers'),
'retentionPolicy' => $dsn->getString('retentionPolicy'),
]), function ($value) { return null !== $value; });
}
}