Skip to content

Commit

Permalink
WIP: Add support for logs
Browse files Browse the repository at this point in the history
Note that this branch will be rebased on a periodic basis to stay
on top of changes in main. If you are checking this out early, do
remember to do `git pull -r` to avoid conflicts.
  • Loading branch information
jjatria committed May 3, 2024
1 parent 781feba commit e50dc71
Show file tree
Hide file tree
Showing 15 changed files with 974 additions and 245 deletions.
2 changes: 2 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ Revision history for OpenTelemetry-Exporter-OTLP

{{$NEXT}}

* EXPERIMENTAL: Add support for logs

0.016 2024-05-02 23:01:09+01:00 Europe/London

* Bump OTLP Protobuf files to v1.2.0
Expand Down
16 changes: 12 additions & 4 deletions META.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"requires" : {
"ExtUtils::MakeMaker" : "0",
"File::ShareDir::Install" : "0.06",
"perl" : "v5.20.0"
"perl" : "v5.32.0"
}
},
"develop" : {
Expand Down Expand Up @@ -58,7 +58,7 @@
"Syntax::Keyword::Dynamically" : "0",
"Syntax::Keyword::Match" : "0",
"Time::Piece" : "0",
"perl" : "v5.20.0"
"perl" : "v5.32.0"
}
},
"test" : {
Expand All @@ -70,7 +70,7 @@
"File::Spec" : "0",
"Test2::V0" : "0",
"Test::More" : "0",
"perl" : "v5.20.0"
"perl" : "v5.32.0"
}
}
},
Expand All @@ -87,6 +87,14 @@
"file" : "lib/OpenTelemetry/Exporter/OTLP/Encoder/Protobuf.pm",
"version" : "0.016"
},
"OpenTelemetry::Exporter::OTLP::Logs" : {
"file" : "lib/OpenTelemetry/Exporter/OTLP/Logs.pm",
"version" : "0.015"
},
"OpenTelemetry::Exporter::OTLP::Traces" : {
"file" : "lib/OpenTelemetry/Exporter/OTLP/Traces.pm",
"version" : "0.015"
},
"OpenTelemetry::Proto" : {
"file" : "lib/OpenTelemetry/Proto.pm",
"version" : "0.016"
Expand All @@ -99,7 +107,7 @@
},
"repository" : {
"type" : "git",
"url" : "git://github.com/jjatria/perl-opentelemetry-exporter-otlp",
"url" : "git://github.com/jjatria/perl-opentelemetry-exporter-otlp.git",
"web" : "https://github.com/jjatria/perl-opentelemetry-exporter-otlp"
}
},
Expand Down
6 changes: 3 additions & 3 deletions Makefile.PL
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use strict;
use warnings;

use 5.020000;
use 5.032000;

use ExtUtils::MakeMaker;

Expand All @@ -21,7 +21,7 @@ my %WriteMakefileArgs = (
},
"DISTNAME" => "OpenTelemetry-Exporter-OTLP",
"LICENSE" => "perl",
"MIN_PERL_VERSION" => "5.020000",
"MIN_PERL_VERSION" => "5.032000",
"NAME" => "OpenTelemetry::Exporter::OTLP",
"PREREQ_PM" => {
"Feature::Compat::Try" => 0,
Expand All @@ -45,7 +45,7 @@ my %WriteMakefileArgs = (
},
"VERSION" => "0.017",
"test" => {
"TESTS" => "t/*.t t/OpenTelemetry/*.t t/OpenTelemetry/Exporter/*.t t/OpenTelemetry/Exporter/OTLP/Encoder/*.t"
"TESTS" => "t/*.t t/OpenTelemetry/*.t t/OpenTelemetry/Exporter/OTLP/*.t t/OpenTelemetry/Exporter/OTLP/Encoder/*.t"
}
);

Expand Down
67 changes: 39 additions & 28 deletions lib/OpenTelemetry/Exporter/OTLP.pm
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,30 @@ class OpenTelemetry::Exporter::OTLP :does(OpenTelemetry::Exporter) {
field $stopped;
field $ua;
field $endpoint;
field $compression :param = undef;
field $compression;
field $encoder;
field $max_retries = 5;

ADJUSTPARAMS ($params) {
$endpoint
= delete $params->{traces_endpoint}
$endpoint = delete $params->{endpoint}
// config('EXPORTER_OTLP_TRACES_ENDPOINT')
// do {
my $base = delete $params->{endpoint}
// config('EXPORTER_OTLP_ENDPOINT')
my $base = config('EXPORTER_OTLP_ENDPOINT')
// 'http://localhost:4318';

( $base =~ s|/+$||r ) . '/v1/traces';
};

$compression
//= config(qw( EXPORTER_OTLP_TRACES_COMPRESSION EXPORTER_OTLP_COMPRESSION ))
$compression = delete $params->{compression}
// config(<EXPORTER_OTLP_{TRACES_,}COMPRESSION>)
// $COMPRESSION;

my $timeout = delete $params->{timeout}
// config(qw( EXPORTER_OTLP_TRACES_TIMEOUT EXPORTER_OTLP_TIMEOUT ))
// config(<EXPORTER_OTLP_{TRACES_,}TIMEOUT>)
// 10;

my $headers = delete $params->{headers}
// config(qw( EXPORTER_OTLP_TRACES_HEADERS EXPORTER_OTLP_HEADERS ))
// config(<EXPORTER_OTLP_{TRACES_,}HEADERS>)
// {};

$headers = {
Expand Down Expand Up @@ -274,7 +272,8 @@ class OpenTelemetry::Exporter::OTLP :does(OpenTelemetry::Exporter) {
);

OpenTelemetry->handle_error(
message => "Unhandled error sending OTLP request: $res->{content}",
exception => $res->{content},
message => 'Unhandled error sending OTLP request',
);

return TRACE_EXPORT_FAILURE;
Expand All @@ -285,18 +284,20 @@ class OpenTelemetry::Exporter::OTLP :does(OpenTelemetry::Exporter) {
redo if $self->$maybe_backoff( ++$retries, $reason );
}
case( m/^(?: 4 | 5 ) \d{2} $/ax ) {
$metrics->inc_counter(
failure => [ reason => $res->{status} ],
);
my $code = $res->{status};

$metrics->inc_counter( failure => [ reason => $code ] );

if ( $CAN_USE_PROTOBUF ) {
require OpenTelemetry::Proto;
try {
require OpenTelemetry::Proto;

my $status = OTel::Google::RPC::Status
->decode($res->{content});

OpenTelemetry->handle_error(
exception => 'OTLP exporter received an RPC error status',
message => $status->encode_json,
exception => $status->encode_json,
message => 'OTLP exporter received an RPC error status',
);
}
catch($e) {
Expand All @@ -307,32 +308,42 @@ class OpenTelemetry::Exporter::OTLP :does(OpenTelemetry::Exporter) {
}
}

my $after = $res->{status} =~ /^(?: 429 | 503 )$/x
my $after = ( $code == 429 || $code == 503 )
? $res->{headers}{'retry-after'}
: undef;

# As-per https://opentelemetry.io/docs/specs/otlp/#failures-1
redo if $res->{status} =~ /^(?: 429 | 502 | 503 | 504 )$/x
&& $self->$maybe_backoff(
++$retries,
$res->{status},
$after,
);
redo if ( $code == 429
|| $code == 502
|| $code == 503
|| $code == 504
) && $self->$maybe_backoff( ++$retries, $code, $after );
}
}

return TRACE_EXPORT_FAILURE;
}
}

method export ( $spans, $timeout = undef ) {
method export ( $data, $timeout = undef ) {
return TRACE_EXPORT_FAILURE if $stopped;
return unless @$data;

try {
dynamically OpenTelemetry::Context->current
= OpenTelemetry::Trace->untraced_context;

dynamically OpenTelemetry::Context->current
= OpenTelemetry::Trace->untraced_context;
my $request = $encoder->encode($data);
my $result = $self->$send_request( $request, $timeout );

my $request = $encoder->encode($spans);
$self->$send_request( $request, $timeout );
$metrics->inc_counter('success');

return $result;
}
catch($e) {
warn "Could not export data: $e";
return TRACE_EXPORT_FAILURE;
}
}

async method shutdown ( $timeout = undef ) {
Expand Down
2 changes: 1 addition & 1 deletion lib/OpenTelemetry/Exporter/OTLP.pod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

=head1 NAME

OpenTelemetry::Exporter::OTLP - An OpenTelemetry Protocol span exporter
OpenTelemetry::Exporter::OTLP - An OpenTelemetry Protocol exporter

=head1 SYNOPSIS

Expand Down
77 changes: 63 additions & 14 deletions lib/OpenTelemetry/Exporter/OTLP/Encoder/JSON.pm
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class OpenTelemetry::Exporter::OTLP::Encoder::JSON {
use Ref::Util qw( is_hashref is_arrayref );
use Scalar::Util 'refaddr';

use experimental 'isa';

method content_type () { 'application/json' }

method serialise ($data) { encode_json $data }
Expand Down Expand Up @@ -53,7 +55,7 @@ class OpenTelemetry::Exporter::OTLP::Encoder::JSON {
};
}

method encode_event ( $event ) {
method encode_span_event ( $event ) {
{
attributes => $self->encode_kvlist($event->attributes),
droppedAttributesCount => $event->dropped_attributes,
Expand All @@ -62,7 +64,7 @@ class OpenTelemetry::Exporter::OTLP::Encoder::JSON {
};
}

method encode_link ( $link ) {
method encode_span_link ( $link ) {
{
attributes => $self->encode_kvlist($link->attributes),
droppedAttributesCount => $link->dropped_attributes,
Expand All @@ -71,7 +73,7 @@ class OpenTelemetry::Exporter::OTLP::Encoder::JSON {
};
}

method encode_status ( $status ) {
method encode_span_status ( $status ) {
{
code => $status->code,
message => '' . $status->description,
Expand All @@ -85,13 +87,13 @@ class OpenTelemetry::Exporter::OTLP::Encoder::JSON {
droppedEventsCount => $span->dropped_events,
droppedLinksCount => $span->dropped_links,
endTimeUnixNano => int $span->end_timestamp * 1_000_000_000,
events => [ map $self->encode_event($_), $span->events ],
events => [ map $self->encode_span_event($_), $span->events ],
kind => $span->kind,
links => [ map $self->encode_link($_), $span->links ],
links => [ map $self->encode_span_link($_), $span->links ],
name => $span->name,
spanId => $span->hex_span_id,
startTimeUnixNano => int $span->start_timestamp * 1_000_000_000,
status => $self->encode_status($span->status),
status => $self->encode_span_status($span->status),
traceId => $span->hex_trace_id,
traceState => $span->trace_state->to_string,
};
Expand All @@ -103,6 +105,25 @@ class OpenTelemetry::Exporter::OTLP::Encoder::JSON {
$data;
}

method encode_log ( $log ) {
my $data = {
attributes => $self->encode_kvlist($log->attributes),
body => $self->encode_anyvalue($log->body),
droppedAttributesCount => $log->dropped_attributes,
flags => $log->trace_flags->flags,
observedTimeUnixNano => int $log->observed_timestamp * 1_000_000_000,
severityNumber => $log->severity_number,
severityText => $log->severity_text,
spanId => $log->hex_span_id,
traceId => $log->hex_trace_id,
};

my $t = $log->timestamp;
$data->{timeUnixNano} = int $t * 1_000_000_000 if defined $t;

$data;
}

method encode_scope ( $scope ) {
{
attributes => $self->encode_kvlist($scope->attributes),
Expand All @@ -119,6 +140,13 @@ class OpenTelemetry::Exporter::OTLP::Encoder::JSON {
};
}

method encode_scope_logs ( $scope, $logs ) {
{
scope => $self->encode_scope($scope),
logRecords => [ map $self->encode_log($_), @$logs ],
};
}

method encode_resource_spans ( $resource, $spans ) {
my %scopes;

Expand All @@ -130,26 +158,47 @@ class OpenTelemetry::Exporter::OTLP::Encoder::JSON {
}

{
resource => $self->encode_resource($resource),
scopeSpans => [
map $self->encode_scope_spans(@$_), values %scopes,
],
resource => $self->encode_resource($resource),
scopeSpans => [ map $self->encode_scope_spans(@$_), values %scopes ],
schemaUrl => $resource->schema_url,
};
}

method encode_resource_logs ( $resource, $logs ) {
my %scopes;

for (@$logs) {
my $key = refaddr $_->instrumentation_scope;

$scopes{ $key } //= [ $_->instrumentation_scope, [] ];
push @{ $scopes{ $key }[1] }, $_;
}

{
resource => $self->encode_resource($resource),
scopeLogs => [ map $self->encode_scope_logs(@$_), values %scopes ],
schemaUrl => $resource->schema_url,
};
}

method encode ( $spans ) {
method encode ( $data ) {
my ( %request, %resources );

for (@$spans) {
my $type;
for (@$data) {
$type //= $_ isa OpenTelemetry::SDK::Logs::LogRecord
? 'logs'
: 'spans';

my $key = refaddr $_->resource;
$resources{ $key } //= [ $_->resource, [] ];
push @{ $resources{ $key }[1] }, $_;
}

my $encode = "encode_resource_$type";
$self->serialise({
resourceSpans => [
map $self->encode_resource_spans(@$_), values %resources,
'resource' . ucfirst $type => [
map $self->$encode(@$_), values %resources,
]
});
}
Expand Down
Loading

0 comments on commit e50dc71

Please sign in to comment.