Skip to content

Commit

Permalink
Support new integer data type for line protocol
Browse files Browse the repository at this point in the history
As influxdb now support data type specification for integers, in order
to separate those values from float64, this is a very simple solution.

Related to issue #48.
  • Loading branch information
wdalmut committed Aug 20, 2015
1 parent db61008 commit b450ce2
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/Adapter/GuzzleAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function send(array $message)
"db" => $message["database"],
"retentionPolicy" => $message["retentionPolicy"],
],
"body" => message_to_line_protocol($message)
"body" => message_to_line_protocol($message, $this->getOptions()->getForceIntegers())
];

$endpoint = $this->getHttpSeriesEndpoint();
Expand Down
2 changes: 1 addition & 1 deletion src/Adapter/UdpAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public function send(array $message)
$message["tags"] = array_replace_recursive($this->getOptions()->getTags(), $message["tags"]);
}

$message = message_to_line_protocol($message);
$message = message_to_line_protocol($message, $this->getOptions()->getForceIntegers());

$this->write($message);
}
Expand Down
19 changes: 14 additions & 5 deletions src/Adapter/helpers.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use DateTime;

function message_to_line_protocol(array $message)
function message_to_line_protocol(array $message, $force_ints = false)
{
if (!array_key_exists("points", $message)) {
return;
Expand All @@ -26,14 +26,19 @@ function message_to_line_protocol(array $message)
$lines[] = trim(
sprintf(
"%s %s %d",
$point["measurement"], list_to_string($point["fields"], true), $unixepoch
$point["measurement"],
list_to_string($point["fields"], true, $force_ints),
$unixepoch
)
);
} else {
$lines[] = trim(
sprintf(
"%s,%s %s %d",
$point["measurement"], list_to_string($tags), list_to_string($point["fields"], true), $unixepoch
$point["measurement"],
list_to_string($tags, false, $force_ints),
list_to_string($point["fields"], true, $force_ints),
$unixepoch
)
);
}
Expand All @@ -42,9 +47,9 @@ function message_to_line_protocol(array $message)
return implode("\n", $lines);
}

function list_to_string(array $elements, $escape = false)
function list_to_string(array $elements, $escape = false, $force_ints = false)
{
array_walk($elements, function(&$value, $key) use ($escape) {
array_walk($elements, function(&$value, $key) use ($escape, $force_ints) {
if ($escape && is_string($value)) {
$value = "\"{$value}\"";
}
Expand All @@ -53,6 +58,10 @@ function list_to_string(array $elements, $escape = false)
$value = ($value) ? "true" : "false";
}

if ($force_ints && is_int($value)) {
$value = "{$value}i";
}

$value = "{$key}={$value}";
});

Expand Down
21 changes: 13 additions & 8 deletions src/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,15 @@
class Options
{
private $host;

private $port;

private $username;

private $password;

private $protocol;

private $database;

private $retentionPolicy;

private $tags;

private $prefix;
private $forceIntegers;

public function __construct()
{
Expand All @@ -33,11 +26,23 @@ public function __construct()
$this->setPassword("root");
$this->setProtocol("http");
$this->setPrefix("");
$this->setForceIntegers(false);

$this->setRetentionPolicy("default");
$this->setTags([]);
}

public function getForceIntegers()
{
return $this->forceIntegers;
}

public function setForceIntegers($forceIntegers)
{
$this->forceIntegers = $forceIntegers;
return $this;
}

public function getPrefix()
{
return $this->prefix;
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/Adapter/GuzzleAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,32 @@ public function testTagsFieldIsMergedWithGlobalTags()
]
]);
}

public function testAdapterForceIntegersCorrectly()
{
$guzzleHttp = $this->prophesize("GuzzleHttp\Client");
$guzzleHttp->post("http://localhost:8086/write", [
"auth" => ["root", "root"],
"query" => [
"db" => "db",
"retentionPolicy" => "default",
],
"body" => 'tcp.test mark="element",value=12i 1257894000000000000',
])->shouldBeCalledTimes(1);
$options = (new Options())->setDatabase("db")->setForceIntegers(true);
$adapter = new InfluxHttpAdapter($guzzleHttp->reveal(), $options);

$adapter->send([
"time" => "2009-11-10T23:00:00Z",
"points" => [
[
"measurement" => "tcp.test",
"fields" => [
"mark" => "element",
"value" => 12,
]
]
]
]);
}
}
24 changes: 16 additions & 8 deletions tests/unit/Adapter/HelpersTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,28 @@ class HelpersTest extends \PHPUnit_Framework_TestCase
/**
* @dataProvider getElements
*/
public function testListToLineValues($message, $result, $escape)
public function testListToLineValues($message, $result, $escape, $force_ints)
{
$this->assertEquals($result, list_to_string($message, $escape));
$this->assertEquals($result, list_to_string($message, $escape, $force_ints));
}

public function getElements()
{
return [
[["one" => "two"], "one=two", false],
[["one" => "two"], "one=\"two\"", true],
[["one" => "two", "three" => "four"], "one=two,three=four", false],
[["one" => "two", "three" => "four"], "one=\"two\",three=\"four\"", true],
[["one" => true, "three" => false], "one=true,three=false", false],
[["one" => true, "three" => 0, "four" => 1], "one=true,three=0,four=1", false],
[["one" => "two"], "one=two", false, false],
[["one" => "two"], "one=\"two\"", true, false],
[["one" => "two", "three" => "four"], "one=two,three=four", false, false],
[["one" => "two", "three" => "four"], "one=\"two\",three=\"four\"", true, false],
[["one" => true, "three" => false], "one=true,three=false", false, false],
[["one" => true, "three" => 0, "four" => 1], "one=true,three=0,four=1", false, false],
[["one" => true, "three" => false], "one=true,three=false", false, true],
[["one" => true, "three" => 0, "four" => 1], "one=true,three=0i,four=1i", false, true],
[["one" => 12, "three" => 14], "one=12i,three=14i", true, true],
[["one" => 12.1, "three" => 14], "one=12.1,three=14i", true, true],
[["one" => 12., "three" => 14], "one=12,three=14i", true, true],
[["one" => (double)12, "three" => 14], "one=12,three=14i", true, true],
[["one" => (double)12, "three" => (double)14], "one=12,three=14", true, true],
[["one" => (double)"12", "three" => (int)"14"], "one=12,three=14i", true, true],
];
}
}
107 changes: 106 additions & 1 deletion tests/unit/Adapter/UdpAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class UdpAdapterTest extends \PHPUnit_Framework_TestCase
*/
public function testRewriteMessages($input, $response)
{
$object = new UdpAdapter(new Options());
$object = $this->getMockBuilder("InfluxDB\Adapter\UdpAdapter")
->setConstructorArgs([new Options()])
->setMethods(["write"])
Expand Down Expand Up @@ -296,4 +295,110 @@ public function testMergeFullTagsPositions()
]
]);
}

/**
* @dataProvider getMessagesWithForceIntegers
*/
public function testForceIntegers($input, $response)
{
$options = new Options();
$options->setForceIntegers(true);

$object = $this->getMockBuilder("InfluxDB\Adapter\UdpAdapter")
->setConstructorArgs([$options])
->setMethods(["write"])
->getMock();
$object->expects($this->once())
->method("write")
->with($response);

$object->send($input);
}

public function getMessagesWithForceIntegers()
{
return [
[
[
"time" => "2009-11-10T23:00:00Z",
"points" => [
[
"measurement" => "cpu",
"fields" => [
"value" => 1,
],
],
],
],
"cpu value=1i 1257894000000000000"
],
[
[
"time" => "2009-11-10T23:00:00Z",
"points" => [
[
"measurement" => "cpu",
"fields" => [
"value" => 1,
"string" => "escape",
],
],
],
],
"cpu value=1i,string=\"escape\" 1257894000000000000"
],
[
[
"tags" => [
"region" => "us-west",
"host" => "serverA",
"env" => "prod",
"target" => "servers",
"zone" => "1c",
],
"time" => "2009-11-10T23:00:00Z",
"points" => [
[
"measurement" => "cpu",
"fields" => [
"cpu" => 18.12,
"free" => 712432,
],
],
],
],
"cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c cpu=18.12,free=712432i 1257894000000000000"
],
[
[
"tags" => [
"region" => "us-west",
"host" => "serverA",
"env" => "prod",
"target" => "servers",
"zone" => "1c",
],
"time" => "2009-11-10T23:00:00Z",
"points" => [
[
"measurement" => "cpu",
"fields" => [
"cpu" => 18.12,
],
],
[
"measurement" => "mem",
"fields" => [
"free" => 712432,
],
],
],
],
<<<EOF
cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c cpu=18.12 1257894000000000000
mem,region=us-west,host=serverA,env=prod,target=servers,zone=1c free=712432i 1257894000000000000
EOF
],
];
}
}

0 comments on commit b450ce2

Please sign in to comment.