Skip to content

Commit

Permalink
Basic write using line protocol and Point structure #5
Browse files Browse the repository at this point in the history
* simple batch writing
  • Loading branch information
rolincova committed Feb 28, 2020
1 parent 1910267 commit 5aba92f
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 50 deletions.
11 changes: 10 additions & 1 deletion src/InfluxDB2/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
class Client
{
public $options;
public $closed = false;
private $autoCloseable = array();

/**
* Client constructor.
Expand Down Expand Up @@ -37,7 +39,9 @@ public function __construct(array $options)
*/
public function createWriteApi(array $writeOptions = null): WriteApi
{
return new WriteApi($this->options, $writeOptions);
$writeApi = new WriteApi($this->options, $writeOptions);
$this->autoCloseable[] = $writeApi;
return $writeApi;
}

/**
Expand All @@ -55,6 +59,11 @@ public function createQueryApi(): QueryApi
*/
public function close()
{
$this->closed = true;

foreach ($this->autoCloseable as $ac)
{
$ac->close();
}
}
}
101 changes: 101 additions & 0 deletions src/InfluxDB2/Worker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

namespace InfluxDB2;


use Exception;
use SplQueue;

class Worker
{
/** @var WriteApi */
private $client;

private $queue;
private $writeOptions;

public function __construct($client)
{
$this->client = $client;
$this->writeOptions = $client->writeOptions;

$this->queue = new SplQueue();
}

public function push($payload)
{
$this->queue->enqueue($payload);

if ($this->queue->count() >= $this->writeOptions->batchSize)
{
$this->checkBackgroundQueue(true);
}
}

public function flush()
{
while ($this->queue->count() != 0)
{
$this->checkBackgroundQueue(false);
}
}

private function checkBackgroundQueue(bool $size)
{
$data = array();
$points = 0;

if ($size && $this->queue->count() < $this->writeOptions->batchSize) {
return;
}

while (($points < $this->writeOptions->batchSize) && $this->queue->count() != 0) {
try {
$item = $this->queue->dequeue();

$key = $item->key;
$index = $this->existsKey($key, $data);

if ($index === null) {
$data[] = array('key' => $key, 'data' => array());
$index = array_key_last($data);
}

$data[$index]['data'][] = $item->data;
$points += 1;
} catch (Exception $e) {
return;
}
}

if (!empty($data)) {
$this->write($data);
}
}

private function existsKey($key, $data): ?int
{
foreach ($data as $item)
{
$itemKey = $item['key'];
if ($key->precision === $itemKey->precision &&
$key->bucket === $itemKey->bucket &&
$key->org === $itemKey->org)
{
return array_search($item, $data);
}
}

return null;
}

private function write($data)
{
foreach ($data as $item) {
$key = $item['key'];
$payload = $item['data'];

$this->client->writeRaw(join("\n", $payload), $key->precision, $key->bucket, $key->org);
}
}
}
75 changes: 28 additions & 47 deletions src/InfluxDB2/WriteApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class WriteApi extends DefaultApi
{
public $writeOptions;

/** @var Worker */
private $worker;
public $closed = false;

/**
* WriteApi constructor.
* @param $options
Expand Down Expand Up @@ -95,15 +99,15 @@ public function write($data, string $precision = null, string $bucket = null, st
$this->check("bucket", $bucketParam);
$this->check("org", $orgParam);

$payload = $this->generatePayload($data, $bucketParam, $orgParam, $precisionParam);
$payload = $this->generatePayload($data, $precisionParam, $bucketParam, $orgParam);

if ($payload == null) {
return;
}

if (WriteType::BATCHING == $this->writeOptions->writeType)
{
print ("BATCHING is not implemented yet\n");
$this->worker()->push($payload);
} else {
$this->writeRaw($payload, $precisionParam, $bucketParam, $orgParam);
}
Expand Down Expand Up @@ -135,6 +139,23 @@ public function writeRaw(string $data, string $precision = null, string $bucket
$this->post($data, "/api/v2/write", $queryParams);
}

public function close()
{
$this->closed = true;

$this->worker()->flush();
}

private function worker(): Worker
{
if (!isset($this->worker))
{
$this->worker = new Worker($this);
}

return $this->worker;
}

private function generatePayload($data, string $precision = null, string $bucket = null, string $org = null)
{
if ($data == null || empty($data)) {
Expand Down Expand Up @@ -184,31 +205,15 @@ private function getOption(string $optionName, string $precision = null): string
class BatchItem
{
/** @var BatchItemKey */
private $key;
public $key;
/** @var string */
private $data;
public $data;

public function __construct($key, $data)
{
$this->key = $key;
$this->data = $data;
}

/**
* @return BatchItemKey
*/
public function getKey(): BatchItemKey
{
return $this->key;
}

/**
* @return string
*/
public function getData(): string
{
return $this->data;
}
}

/**
Expand All @@ -217,40 +222,16 @@ public function getData(): string
class BatchItemKey
{
/** @var string */
private $bucket;
public $bucket;
/** @var string */
private $org;
public $org;
/** @var WritePrecision */
private $precision;
public $precision;

public function __construct($bucket, $org, $precision)
{
$this->bucket = $bucket;
$this->org = $org;
$this->precision = $precision;
}

/**
* @return string
*/
public function getBucket(): string
{
return $this->bucket;
}

/**
* @return string
*/
public function getOrg(): string
{
return $this->org;
}

/**
* @return WritePrecision
*/
public function getPrecision(): WritePrecision
{
return $this->precision;
}
}
10 changes: 8 additions & 2 deletions tests/WriteApiIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
use InfluxDB2\Client;
use InfluxDB2\Point;
use InfluxDB2\Model\WritePrecision;
use InfluxDB2\WriteApi;
use InfluxDB2\WriteOptions;
use InfluxDB2\WriteType;
use PHPUnit\Framework\TestCase;

class WriteApiIntegrationTest extends TestCase
{
/** @var Client */
private $client;
/** @var WriteApi */
private $writeApi;

/**
Expand Down Expand Up @@ -67,26 +70,29 @@ public function testBatchingWrite()
$data = ['name' => 'cpu',
'tags' => ['host' => 'server_nl', 'region' => 'us'],
'fields' => ['internal' => 5, 'external' => 6],
'time' => microtime()];
'time' => microtime(true)];

// ['name' => 'gpu', 'fields' => ['value' => 0.9999]];

$writeApi->write($data,WritePrecision::US);
$writeApi->write($data,WritePrecision::MS);

$p1 = ['name' => "h2o", 'tags' => ['host' => 'aws', 'region' => 'us'], 'fields' => ['level' => 1, 'saturation' => 99], 'time' => 1];
$p2 = ['name' => "h2o", 'tags' => ['host' => 'aws', 'region' => 'us'], 'fields' => ['level' => 2, 'saturation' => 98], 'time' => 2];
$p3 = ['name' => "h2o", 'tags' => ['host' => 'aws', 'region' => 'us'], 'fields' => ['level' => 3, 'saturation' => 97], 'time' => 3];
$p4 = ['name' => "h2o", 'tags' => ['host' => 'aws', 'region' => 'us'], 'fields' => ['level' => 4, 'saturation' => 96], 'time' => 4];
$p5 = ['name' => "h2o", 'tags' => ['host' => 'aws', 'region' => 'us'], 'fields' => ['level' => 5, 'saturation' => 95], 'time' => 5];
$p6 = ['name' => "h2o", 'tags' => ['host' => 'aws', 'region' => 'us'], 'fields' => ['level' => 6, 'saturation' => 95], 'time' => 6];

$writeApi->write($p1);
$writeApi->write($p2);
$writeApi->write($p3);
$writeApi->write($p4);
$writeApi->write($p5);
$writeApi->write($p6);

$this->assertNotNull($writeApi);

$this->client->close();
}

public function testWriteArrayOfPoint()
Expand Down

0 comments on commit 5aba92f

Please sign in to comment.