Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - connect & pass traces to the jaeger agent using UDP and thrft. #246

Closed
wants to merge 9 commits into from
Closed
2 changes: 1 addition & 1 deletion .github/workflows/php.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
matrix:
operating-system: [ubuntu-latest]
php-versions: ['7.3', '7.4', '8.0']
php-versions: ['7.3', '7.4']

steps:
- uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
composer.phar
composer.lock
var
Jaeger/Thrift/*
vendor
.idea/
coverage.clover
Expand Down
62 changes: 62 additions & 0 deletions Jaeger/Codec/CodecUtility.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

declare(strict_types=1);

namespace Jaeger\Codec;

use Throwable;

class CodecUtility
{

/**
* Incoming trace/span IDs are hex representations of 64-bit values. PHP
* represents ints internally as signed 32- or 64-bit values, but base_convert
* converts to string representations of arbitrarily large positive numbers.
* This means at least half the incoming IDs will be larger than PHP_INT_MAX.
*
* Thrift, while building a binary representation of the IDs, performs bitwise
* operations on the string values, implicitly casting to int and capping them
* at PHP_INT_MAX. So, incoming IDs larger than PHP_INT_MAX will be serialized
* and sent to the agent as PHP_INT_MAX, breaking trace/span correlation.
*
* This method therefore, on 64-bit architectures, splits the hex string into
* high and low values, converts them separately to ints, and manually combines
* them into a proper signed int. This int is then handled properly by the
* Thrift package.
*
* On 32-bit architectures, it falls back to base_convert.
*
* @return int
*/
public static function getValidI64(int $length) : int
{
$hex = bin2hex(random_bytes($length));
// If we're on a 32-bit architecture, fall back to base_convert.
// if (PHP_INT_SIZE === 4) {
// return base_convert($hex, 16, 10);
// }

$hi = intval(substr($hex, -16, -8), 16);
$lo = intval(substr($hex, -8, 8), 16);

return $hi << 32 | $lo;
}

/**
* Generates a random hex string
*
* In case where there is not enough entropy for random_bytes() the generation will use a simpler method.
*
* @param int $length of bytes
* @return string
*/
public static function randomHex(int $length): string
{
try {
return bin2hex(random_bytes($length));
} catch (Throwable $ex) {
return substr(str_shuffle(str_repeat('0123456789abcdef', $length)), 1, $length);
}
}
}
108 changes: 108 additions & 0 deletions Jaeger/JaegerTransport.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?php

declare(strict_types=1);

namespace Jaeger;

use Jaeger\Thrift\Agent\AgentClient;
use Jaeger\Thrift\Batch;
use Jaeger\Thrift\Process;
use Jaeger\Thrift\Span;
use Thrift\Exception\TTransportException;
use Thrift\Protocol\TCompactProtocol;

final class JaegerTransport implements Transport
{

// DEFAULT_BUFFER_SIZE indicates the default maximum buffer size, or the size threshold
// at which the buffer will be flushed to the agent.
const DEFAULT_BUFFER_SIZE = 1;

private $transport;
private $client;

private $buffer = [];
private $process = null;
private $maxBufferSize = 0;

public function __construct($address = '127.0.0.1', $port = 6831, $maxBufferSize = 0)
{
$this->transport = new ThriftUdpTransport($address, $port);
$p = new TCompactProtocol($this->transport);
$this->client = new AgentClient($p, $p);

$this->maxBufferSize = ($maxBufferSize > 0 ? $maxBufferSize : self::DEFAULT_BUFFER_SIZE);
}

/**
* Submits a new span to collectors, possibly delayed and/or with buffering.
*
* @param Span $span
*/
public function append(Span $span, $serviceName)
{
// Grab a copy of the process data, if we didn't already.
if ($this->process == null) {
$this->process = new Process([
'serviceName' => $serviceName,
'tags' => $span->tags,
]);
}

$this->buffer[] = $span;

// TODO(tylerc): Buffer spans and send them in as few UDP packets as possible.
return $this->flush();
}

/**
* Flush submits the internal buffer to the remote server. It returns the
* number of spans flushed.
*
* @param $force bool - force a flush, even on a partial buffer
*/
public function flush($force = false)
{
$spans = count($this->buffer);

// buffer not full yet
if (!$force && $spans < $this->maxBufferSize) {
return 0;
}

// no spans to flush
if ($spans <= 0) {
return 0;
}

try {
// emit a batch
$this->client->emitBatch(new Batch([
'process' => $this->process,
'spans' => $this->buffer,
]));

// flush the UDP data
$this->transport->flush();

// reset the internal buffer
$this->buffer = [];
} catch (TTransportException $e) {
error_log('jaeger: transport failure: ' . $e->getMessage());

return 0;
}

return $spans;
}

/**
* Does a clean shutdown of the reporter, flushing any traces that may be
* buffered in memory.
*/
public function close()
{
$this->flush(true); // flush all remaining data
$this->transport->close();
}
}
82 changes: 82 additions & 0 deletions Jaeger/Thrift/Agent/AgentClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace Jaeger\Thrift\Agent;

/**
* Autogenerated by Thrift Compiler (0.13.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
use Thrift\Protocol\TBinaryProtocolAccelerated;
use Thrift\Type\TMessageType;

class AgentClient implements \Jaeger\Thrift\Agent\AgentIf
{
protected $input_ = null;
protected $output_ = null;

protected $seqid_ = 0;

public function __construct($input, $output = null)
{
$this->input_ = $input;
$this->output_ = $output ? $output : $input;
}

public function emitZipkinBatch(array $spans)
{
$this->send_emitZipkinBatch($spans);
}

public function send_emitZipkinBatch(array $spans)
{
$args = new \Jaeger\Thrift\Agent\Agent_emitZipkinBatch_args();
$args->spans = $spans;
$bin_accel = ($this->output_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary');
if ($bin_accel) {
thrift_protocol_write_binary(
$this->output_,
'emitZipkinBatch',
TMessageType::ONEWAY,
$args,
$this->seqid_,
$this->output_->isStrictWrite()
);
} else {
$this->output_->writeMessageBegin('emitZipkinBatch', TMessageType::ONEWAY, $this->seqid_);
$args->write($this->output_);
$this->output_->writeMessageEnd();
$this->output_->getTransport()->flush();
}
}

public function emitBatch(\Jaeger\Thrift\Batch $batch)
{
$this->send_emitBatch($batch);
}

public function send_emitBatch(\Jaeger\Thrift\Batch $batch)
{
$args = new \Jaeger\Thrift\Agent\Agent_emitBatch_args();
$args->batch = $batch;
$bin_accel = ($this->output_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary');
if ($bin_accel) {
thrift_protocol_write_binary(
$this->output_,
'emitBatch',
TMessageType::ONEWAY,
$args,
$this->seqid_,
$this->output_->isStrictWrite()
);
} else {
$this->output_->writeMessageBegin('emitBatch', TMessageType::ONEWAY, $this->seqid_);
$args->write($this->output_);
$this->output_->writeMessageEnd();
$this->output_->getTransport()->flush();
}
}
}
24 changes: 24 additions & 0 deletions Jaeger/Thrift/Agent/AgentIf.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);

namespace Jaeger\Thrift\Agent;

/**
* Autogenerated by Thrift Compiler (0.13.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/

interface AgentIf
{
/**
* @param \Jaeger\Thrift\Agent\Zipkin\Span[] $spans
*/
public function emitZipkinBatch(array $spans);
/**
* @param \Jaeger\Thrift\Batch $batch
*/
public function emitBatch(\Jaeger\Thrift\Batch $batch);
}
99 changes: 99 additions & 0 deletions Jaeger/Thrift/Agent/Agent_emitBatch_args.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?php

declare(strict_types=1);

namespace Jaeger\Thrift\Agent;

/**
* Autogenerated by Thrift Compiler (0.13.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
use Thrift\Exception\TProtocolException;
use Thrift\Type\TType;

class Agent_emitBatch_args
{
public static $isValidate = false;

public static $_TSPEC = [
1 => [
'var' => 'batch',
'isRequired' => false,
'type' => TType::STRUCT,
'class' => '\Jaeger\Thrift\Batch',
],
];

/**
* @var \Jaeger\Thrift\Batch
*/
public $batch = null;

public function __construct($vals = null)
{
if (is_array($vals)) {
if (isset($vals['batch'])) {
$this->batch = $vals['batch'];
}
}
}

public function getName()
{
return 'Agent_emitBatch_args';
}

public function read($input)
{
$xfer = 0;
$fname = null;
$ftype = 0;
$fid = 0;
$xfer += $input->readStructBegin($fname);
while (true) {
$xfer += $input->readFieldBegin($fname, $ftype, $fid);
if ($ftype == TType::STOP) {
break;
}
switch ($fid) {
case 1:
if ($ftype == TType::STRUCT) {
$this->batch = new \Jaeger\Thrift\Batch();
$xfer += $this->batch->read($input);
} else {
$xfer += $input->skip($ftype);
}

break;
default:
$xfer += $input->skip($ftype);

break;
}
$xfer += $input->readFieldEnd();
}
$xfer += $input->readStructEnd();

return $xfer;
}

public function write($output)
{
$xfer = 0;
$xfer += $output->writeStructBegin('Agent_emitBatch_args');
if ($this->batch !== null) {
if (!is_object($this->batch)) {
throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
}
$xfer += $output->writeFieldBegin('batch', TType::STRUCT, 1);
$xfer += $this->batch->write($output);
$xfer += $output->writeFieldEnd();
}
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();

return $xfer;
}
}
Loading