Skip to content

Commit

Permalink
fix: parsing response with new lines (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Nov 11, 2021
1 parent 50738a7 commit e489e4f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 34 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 2.4.0 [unreleased]

### Bug Fixes
1. [#101](https://github.com/influxdata/influxdb-client-php/pull/101): Fix parsing Query response with contains new lines in field values

## 2.3.0 [2021-10-22]

### Features
Expand Down
48 changes: 14 additions & 34 deletions src/InfluxDB2/FluxCsvParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

namespace InfluxDB2;

use GuzzleHttp\Psr7\Stream;
use Psr\Http\Message\StreamInterface;
use RuntimeException;

/**
* Class FluxCsvParser us used to construct FluxResult from CSV.
* @package InfluxDB2
Expand All @@ -22,6 +18,7 @@ class FluxCsvParser

private $response;
private $stream;
private $resource;

/* @var $variable int */
private $tableIndex = 0;
Expand All @@ -47,7 +44,8 @@ class FluxCsvParser
*/
public function __construct($response, $stream = false)
{
$this->response = is_string($response) ? new Stream($this->stringToStream($response)) : $response;
$this->response = is_string($response) ? null : $response;
$this->resource = is_string($response) ? $this->stringToStream($response) : $response->detach();
$this->stream = $stream;
$this->tableIndex = 0;
if (!$stream) {
Expand Down Expand Up @@ -75,13 +73,11 @@ public function parse()
public function each()
{
try {
while ($row = $this->readline($this->response)) {
if (!isset($row) || trim($row) === '') {
while (($csv = fgetcsv($this->resource)) !== false) {
if (!isset($csv) || (count($csv) == 1 && $csv[0] == null)) {
continue;
}

$csv = str_getcsv($row);

//skip empty csv row
if ($csv[1] == 'error' && $csv[2] == 'reference') {
$this->parsingStateError = true;
Expand Down Expand Up @@ -278,34 +274,18 @@ private function toValue($strVal, FluxColumn $column)
return $strVal;
}

private function readline(StreamInterface $stream)
{
$buffer = null;

while (null !== ($byte = $stream->read(1))) {
if ($byte === "") {
break;
}

if ($buffer == null) {
$buffer .= '';
}

$buffer .= $byte;

// Break when a new line is found
if ($byte === "\n") {
break;
}
}

return $buffer;
}

private function closeConnection()
{
# Close CSV Parser
$this->closed = true;
$this->response->close();
if (isset($this->response)) {
$this->response->close();
}
if (is_resource($this->resource)) {
fclose($this->resource);
}

unset($this->response);
unset($this->resource);
}
}
20 changes: 20 additions & 0 deletions tests/QueryApiIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,26 @@ public function testQuery()
$this->assertEquals('level', $record->getField());
}

public function testWriteQueryNewLine()
{
$measurement = 'h2o_QueryNewLine_' . (new DateTime())->getTimestamp();

$this->writeApi->write(Point::measurement($measurement)
->addTag('location', 'europe')
->addField('value', "some \n value"));

$result = $this->queryApi->query('from(bucket: "my-bucket") |> range(start: 0)
|> filter(fn: (r) => r._measurement == "' . $measurement . '")');

$this->assertNotNull($result);
$this->assertEquals(1, sizeof($result));
$records = $result[0]->records;
$this->assertEquals(1, sizeof($records));
$record = $records[0];

$this->assertEquals("some \r\n value", $record->getValue());
}

/**
* @param string $measurement
* @param DateTime $now
Expand Down

0 comments on commit e489e4f

Please sign in to comment.