Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[InfluxDB][Monitoring] Allow passing Client as configuration option. #809

Merged
merged 5 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 30 additions & 22 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ Enqueue is an MIT-licensed open source project with its ongoing development made

# Monitoring.

Enqueue provides a tool for monitoring message queues.
Enqueue provides a tool for monitoring message queues.
With it, you can control how many messages were sent, how many processed successfuly or failed.
How many consumers are working, their up time, processed messages stats, memory usage and system load.
How many consumers are working, their up time, processed messages stats, memory usage and system load.
The tool could be integrated with virtually any analytics and monitoring platform.
There are several integration:
There are several integration:
* [Datadog StatsD](https://datadoghq.com)
* [InfluxDB](https://www.influxdata.com/) and [Grafana](https://grafana.com/)
* [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/)
* [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/)
We are working on a JS\WAMP based real-time UI tool, for more information please [contact us]([email protected]).

![Grafana Monitoring](images/grafana_monitoring.jpg)

[contact us]([email protected]) if need a Grafana template such as on the picture.
[contact us]([email protected]) if need a Grafana template such as on the picture.

* [Installation](#installation)
* [Track sent messages](#track-sent-messages)
Expand All @@ -40,7 +40,7 @@ We are working on a JS\WAMP based real-time UI tool, for more information please
composer req enqueue/monitoring:0.9.x-dev
```

## Track sent messages
## Track sent messages

```php
<?php
Expand All @@ -50,7 +50,7 @@ use Enqueue\Monitoring\GenericStatsStorageFactory;
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushSentMessageStats(new SentMessageStats(
(int) (microtime(true) * 1000), // timestamp
'queue_name', // queue
'queue_name', // queue
'aMessageId',
'aCorrelationId',
[], // headers
Expand All @@ -76,7 +76,7 @@ $context->createProducer()->send($queue, $message);
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushSentMessageStats(new SentMessageStats(
(int) (microtime(true) * 1000),
$queue->getQueueName(),
$queue->getQueueName(),
$message->getMessageId(),
$message->getCorrelationId(),
$message->getHeaders()[],
Expand All @@ -99,7 +99,7 @@ $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
'consumerId',
(int) (microtime(true) * 1000), // now
$receivedAt,
$receivedAt,
'aQueue',
'aMessageId',
'aCorrelationId',
Expand Down Expand Up @@ -127,16 +127,16 @@ $consumer = $context->createConsumer($queue);
$consumerId = uniqid('consumer-id', true); // we suggest using UUID here
if ($message = $consumer->receiveNoWait()) {
$receivedAt = (int) (microtime(true) * 1000);

// heavy processing here.

$consumer->acknowledge($message);

$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
$consumerId,
(int) (microtime(true) * 1000), // now
$receivedAt,
$receivedAt,
$queue->getQueueName(),
$message->getMessageId(),
$message->getCorrelationId(),
Expand All @@ -151,7 +151,7 @@ if ($message = $consumer->receiveNoWait()) {
## Track consumer metrics

Consumers are long running processes. It vital to know how many of them are running right now, how they perform, how much memory do they use and so.
This example shows how you can send such metrics.
This example shows how you can send such metrics.
Call this code from time to time between processing messages.

```php
Expand All @@ -165,13 +165,13 @@ $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1
$statsStorage->pushConsumerStats(new ConsumerStats(
'consumerId',
(int) (microtime(true) * 1000), // now
$startedAt,
$startedAt,
null, // finished at
true, // is started?
true, // is started?
false, // is finished?
false, // is failed
['foo'], // consume from queues
123, // received messages
123, // received messages
120, // acknowledged messages
1, // rejected messages
1, // requeued messages
Expand All @@ -182,7 +182,7 @@ $statsStorage->pushConsumerStats(new ConsumerStats(

## Consumption extension

There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption).
There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption).
It could collect consumed messages and consumer stats for you.

```php
Expand Down Expand Up @@ -236,8 +236,16 @@ There are available options:
* 'measurementSentMessages' => 'sent-messages',
* 'measurementConsumedMessages' => 'consumed-messages',
* 'measurementConsumers' => 'consumers',
* 'client' => null,
* 'retentionPolicy' => null,
```

You can pass InfluxDB\Client instance in `client` option. Otherwise, it will be created on first use according to other
options.

If your InfluxDB\Client uses driver that implements InfluxDB\Driver\QueryDriverInterface, then database will be
automatically created for you if it doesn't exist. Default InfluxDB\Client will also do that.

## Datadog storage

Install additional packages:
Expand All @@ -256,7 +264,7 @@ $statsStorage = (new GenericStatsStorageFactory())->create('datadog://127.0.0.1:
For best experience please adjust units and types in metric summary.

Example dashboard:

![Datadog monitoring](images/datadog_monitoring.png)


Expand Down Expand Up @@ -311,7 +319,7 @@ There are available options:

## Symfony App

You have to register some services in order to incorporate monitoring facilities into your Symfony application.
You have to register some services in order to incorporate monitoring facilities into your Symfony application.

```yaml
# config/packages/enqueue.yaml
Expand All @@ -325,11 +333,11 @@ enqueue:
transport: 'amqp://guest:guest@foo:5672/%2f'
monitoring: 'wamp://127.0.0.1:9090?topic=stats'
client: ~

datadog:
transport: 'amqp://guest:guest@foo:5672/%2f'
monitoring: 'datadog://127.0.0.1:8125?batched=false'
client: ~
```

[back to index](index.md)
[back to index](index.md)
98 changes: 79 additions & 19 deletions pkg/monitoring/InfluxDbStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use Enqueue\Dsn\Dsn;
use InfluxDB\Client;
use InfluxDB\Database;
use InfluxDB\Driver\QueryDriverInterface;
use InfluxDB\Exception as InfluxDBException;
use InfluxDB\Point;

class InfluxDbStorage implements StatsStorage
Expand Down Expand Up @@ -38,6 +40,8 @@ class InfluxDbStorage implements StatsStorage
* 'measurementSentMessages' => 'sent-messages',
* 'measurementConsumedMessages' => 'consumed-messages',
* 'measurementConsumers' => 'consumers',
* 'client' => null, # Client instance. Null by default.
* 'retentionPolicy' => null,
* ]
*
* or
Expand All @@ -55,10 +59,17 @@ public function __construct($config = 'influxdb:')
if (empty($config)) {
$config = [];
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
$config = self::parseDsn($config);
} elseif (is_array($config)) {
$config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']);
$config = empty($config['dsn']) ? $config : self::parseDsn($config['dsn']);
} elseif ($config instanceof Client) {
// Passing Client instead of array config is deprecated because it prevents setting any configuration values
// and causes library to use defaults.
@trigger_error(
sprintf('Passing %s as %s argument is deprecated. Pass it as "client" array property or use createWithClient instead',
Client::class,
__METHOD__
), E_USER_DEPRECATED);
$this->client = $config;
$config = [];
} else {
Expand All @@ -74,11 +85,41 @@ public function __construct($config = 'influxdb:')
'measurementSentMessages' => 'sent-messages',
'measurementConsumedMessages' => 'consumed-messages',
'measurementConsumers' => 'consumers',
'client' => null,
'retentionPolicy' => null,
], $config);

if (null !== $config['client']) {
if (!$config['client'] instanceof Client) {
throw new \InvalidArgumentException(sprintf(
'%s configuration property is expected to be an instance of %s class. %s was passed instead.',
'client',
Client::class,
gettype($config['client'])
));
}
$this->client = $config['client'];
}

$this->config = $config;
}

/**
* @param Client $client
* @param string $config
*
* @return InfluxDbStorage
*/
public static function createWithClient(Client $client, $config = 'influxdb:'): self
{
if (is_string($config)) {
$config = self::parseDsn($config);
}
$config['client'] = $client;

return new static($config);
}

public function pushConsumerStats(ConsumerStats $stats): void
{
$points = [];
Expand Down Expand Up @@ -109,7 +150,7 @@ public function pushConsumerStats(ConsumerStats $stats): void
$points[] = new Point($this->config['measurementConsumers'], null, $tags, $values, $stats->getTimestampMs());
}

$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
$this->doWrite($points);
}

public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
Expand All @@ -135,7 +176,7 @@ public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
new Point($this->config['measurementConsumedMessages'], $runtime, $tags, $values, $stats->getTimestampMs()),
];

$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
$this->doWrite($points);
}

public function pushSentMessageStats(SentMessageStats $stats): void
Expand All @@ -158,29 +199,47 @@ public function pushSentMessageStats(SentMessageStats $stats): void
new Point($this->config['measurementSentMessages'], 1, $tags, [], $stats->getTimestampMs()),
];

$this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS);
$this->doWrite($points);
}

private function getDb(): Database
private function doWrite(array $points): void
{
if (null === $this->database) {
if (null === $this->client) {
$this->client = new Client(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password']
);
if (null === $this->client) {
$this->client = new Client(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password']
);
}

if ($this->client->getDriver() instanceof QueryDriverInterface) {
if (null === $this->database) {
$this->database = $this->client->selectDB($this->config['db']);
$this->database->create();
}

$this->database = $this->client->selectDB($this->config['db']);
$this->database->create();
$this->database->writePoints($points, Database::PRECISION_MILLISECONDS, $this->config['retentionPolicy']);
} else {
// Code below mirrors what `writePoints` method of Database does.
Steveb-p marked this conversation as resolved.
Show resolved Hide resolved
try {
$parameters = [
'url' => sprintf('write?db=%s&precision=%s', $this->config['db'], Database::PRECISION_MILLISECONDS),
'database' => $this->config['db'],
'method' => 'post',
];
if (null !== $this->config['retentionPolicy']) {
$parameters['url'] .= sprintf('&rp=%s', $this->config['retentionPolicy']);
}

$this->client->write($parameters, $points);
} catch (\Exception $e) {
throw new InfluxDBException($e->getMessage(), $e->getCode());
}
}

return $this->database;
}

private function parseDsn(string $dsn): array
private static function parseDsn(string $dsn): array
{
$dsn = Dsn::parseFirst($dsn);

Expand All @@ -200,6 +259,7 @@ private function parseDsn(string $dsn): array
'measurementSentMessages' => $dsn->getString('measurementSentMessages'),
'measurementConsumedMessages' => $dsn->getString('measurementConsumedMessages'),
'measurementConsumers' => $dsn->getString('measurementConsumers'),
'retentionPolicy' => $dsn->getString('retentionPolicy'),
]), function ($value) { return null !== $value; });
}
}