Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move is_monotonic check to the SDK #544

Merged
merged 7 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions apps/opentelemetry/test/opentelemetry_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -868,9 +868,9 @@ record_exception_works(Config) ->
otel_span:end_span(SpanCtx),
[Span] = assert_exported(Tid, SpanCtx),
[Event] = otel_events:list(Span#span.events),
?assertEqual(<<"exception">>, Event#event.name),
?assertEqual(#{<<"exception.type">> => <<"throw:my_error">>,
<<"exception.stacktrace">> => list_to_binary(io_lib:format("~p", [Stacktrace], [{chars_limit, 50}])),
?assertEqual(exception, Event#event.name),
?assertEqual(#{'exception.type' => <<"throw:my_error">>,
'exception.stacktrace' => list_to_binary(io_lib:format("~p", [Stacktrace], [{chars_limit, 50}])),
<<"some-attribute">> => <<"value">>},
otel_attributes:map(Event#event.attributes)),
ok
Expand All @@ -888,10 +888,10 @@ record_exception_with_message_works(Config) ->
otel_span:end_span(SpanCtx),
[Span] = assert_exported(Tid, SpanCtx),
[Event] = otel_events:list(Span#span.events),
?assertEqual(<<"exception">>, Event#event.name),
?assertEqual(#{<<"exception.type">> => <<"throw:my_error">>,
<<"exception.stacktrace">> => list_to_binary(io_lib:format("~p", [Stacktrace], [{chars_limit, 50}])),
<<"exception.message">> => <<"My message">>,
?assertEqual(exception, Event#event.name),
?assertEqual(#{'exception.type' => <<"throw:my_error">>,
'exception.stacktrace' => list_to_binary(io_lib:format("~p", [Stacktrace], [{chars_limit, 50}])),
'exception.message' => <<"My message">>,
<<"some-attribute">> => <<"value">>},
otel_attributes:map(Event#event.attributes)
),
Expand All @@ -916,7 +916,7 @@ dropped_attributes(Config) ->

truncated_binary_attributes(_Config) ->
InfinityLengthAttributes = otel_attributes:new(#{<<"attr-1">> => <<"abcde">>,
<<"attr-2">> => [<<"a">>, <<"abcde">>, <<"abcde">>]},
<<"attr-2">> => [<<"a">>, <<"abcde">>, <<"abcde">>]},
128, infinity),

%% when length limit is inifinity
Expand Down
4 changes: 2 additions & 2 deletions apps/opentelemetry_api/src/otel_span.erl
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ record_exception(SpanCtx, Class, Term, Stacktrace, Attributes) when is_map(Attri
{ok, StacktraceString} = otel_utils:format_binary_string("~0tP", [Stacktrace, 10], [{chars_limit, 50}]),
ExceptionAttributes = #{?EXCEPTION_TYPE => ExceptionType,
?EXCEPTION_STACKTRACE => StacktraceString},
add_event(SpanCtx, <<"exception">>, maps:merge(ExceptionAttributes, Attributes));
add_event(SpanCtx, 'exception', maps:merge(ExceptionAttributes, Attributes));
record_exception(_, _, _, _, _) ->
false.

Expand All @@ -285,7 +285,7 @@ record_exception(SpanCtx, Class, Term, Message, Stacktrace, Attributes) when is_
ExceptionAttributes = #{?EXCEPTION_TYPE => ExceptionType,
?EXCEPTION_STACKTRACE => StacktraceString,
?EXCEPTION_MESSAGE => Message},
add_event(SpanCtx, <<"exception">>, maps:merge(ExceptionAttributes, Attributes));
add_event(SpanCtx, 'exception', maps:merge(ExceptionAttributes, Attributes));
record_exception(_, _, _, _, _, _) ->
false.

Expand Down
16 changes: 8 additions & 8 deletions apps/opentelemetry_api_experimental/include/otel_metrics.hrl
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
-record(instrument, {module :: module() | '_',
meter :: otel_meter:t() | '_',
name :: otel_instrument:name() | '_',
description :: otel_instrument:description() | undefined | '_',
kind :: otel_instrument:kind() | '_',
unit :: otel_instrument:unit() | undefined | '_',
callback :: otel_instrument:callback() | undefined | '_',
callback_args :: term() | undefined | '_'}).
-record(instrument, {module :: module(),
meter :: otel_meter:t(),
name :: otel_instrument:name(),
description :: otel_instrument:description(),
kind :: otel_instrument:kind(),
unit :: otel_instrument:unit() | undefined,
callback :: otel_instrument:callback() | undefined,
callback_args :: term() | undefined}).

-define(KIND_COUNTER, counter).
-define(KIND_OBSERVABLE_COUNTER, observable_counter).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ get_meter_(MeterProvider) ->
Name :: atom() | {atom(), Vsn, SchemaUrl},
Vsn :: unicode:chardata() | undefined,
SchemaUrl :: uri_string:uri_string() | undefined,
Meter:: opentelemetry:meter().
Meter:: meter().
get_meter('$__default_meter') ->
get_meter();
get_meter({Name, Vsn, SchemaUrl}) ->
Expand All @@ -73,7 +73,7 @@ get_meter(Name) ->
Name :: atom(),
Vsn :: unicode:chardata() | undefined,
SchemaUrl :: uri_string:uri_string() | undefined,
Meter:: opentelemetry:meter().
Meter:: meter().
get_meter(Name, Vsn, SchemaUrl) ->
get_meter(?GLOBAL_METER_PROVIDER_NAME, Name, Vsn, SchemaUrl).

Expand All @@ -82,7 +82,7 @@ get_meter(Name, Vsn, SchemaUrl) ->
Name :: atom(),
Vsn :: unicode:chardata() | undefined,
SchemaUrl :: uri_string:uri_string() | undefined,
Meter:: opentelemetry:meter().
Meter:: meter().
get_meter(MeterProvider, Name, Vsn, SchemaUrl) ->
%% check cache and then use provider to get the meter if it isn't cached yet
case persistent_term:get(?METER_KEY(MeterProvider, {Name, Vsn, SchemaUrl}), undefined) of
Expand All @@ -106,7 +106,7 @@ set_meter(Name, Meter) ->
Name :: atom(),
Vsn :: unicode:chardata() | undefined,
SchemaUrl :: uri_string:uri_string() | undefined,
Meter:: opentelemetry:meter().
Meter:: meter().
set_meter(Name, Vsn, SchemaUrl, Meter) ->
set_meter(?GLOBAL_METER_PROVIDER_NAME, Name, Vsn, SchemaUrl, Meter).

Expand All @@ -115,6 +115,6 @@ set_meter(Name, Vsn, SchemaUrl, Meter) ->
Name :: atom(),
Vsn :: unicode:chardata() | undefined,
SchemaUrl :: uri_string:uri_string() | undefined,
Meter:: opentelemetry:meter().
Meter:: meter().
set_meter(MeterProvider, Name, Vsn, SchemaUrl, Meter) ->
opentelemetry:verify_and_set_term(Meter, ?METER_KEY(MeterProvider, {Name, Vsn, SchemaUrl}), otel_meter).
15 changes: 4 additions & 11 deletions apps/opentelemetry_api_experimental/src/otel_counter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,9 @@ create(Meter, Name, Opts) ->
otel_meter:create_counter(Meter, Name, Opts).

-spec add(otel_meter:t(), otel_instrument:name(), number(), opentelemetry:attributes_map()) -> ok.
add(Meter, Name, Number, Attributes) when Number >= 0 ->
otel_meter:record(Meter, Name, Number, Attributes);
add(_, Name, _, _) ->
?LOG_INFO("Counter instrument ~p does not support negative values.", [Name]),
ok.
add(Meter, Name, Number, Attributes) ->
otel_meter:record(Meter, Name, Number, Attributes).

-spec add(otel_instrument:t(), number(), opentelemetry:attributes_map()) -> ok.
add(Instrument=#instrument{module=Module}, Number, Attributes)
when Number >= 0 ->
Module:record(Instrument, Number, Attributes);
add(#instrument{name=Name}, _, _) ->
?LOG_INFO("Counter instrument ~p does not support negative values.", [Name]),
ok.
add(Instrument=#instrument{module=Module}, Number, Attributes) ->
Module:record(Instrument, Number, Attributes).
2 changes: 2 additions & 0 deletions apps/opentelemetry_api_experimental/src/otel_instrument.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,7 @@ is_monotonic(#instrument{kind=?KIND_COUNTER}) ->
true;
is_monotonic(#instrument{kind=?KIND_OBSERVABLE_COUNTER}) ->
true;
is_monotonic(#instrument{kind=?KIND_HISTOGRAM}) ->
true;
is_monotonic(_) ->
false.
4 changes: 2 additions & 2 deletions apps/opentelemetry_api_experimental/src/otel_meter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@
Meter :: t(),
Name :: otel_instrument:name(),
Kind :: otel_instrument:kind(),
Opts :: otel_meter:opts().
Opts :: opts().

-callback create_instrument(Meter, Name, Kind, Callback, CallbackArgs, Opts) -> otel_instrument:t() when
Meter :: t(),
Name :: otel_instrument:name(),
Kind :: otel_instrument:kind(),
Callback :: otel_instrument:callback(),
CallbackArgs :: term(),
Opts :: otel_meter:opts().
Opts :: opts().

-callback register_callback(Meter, Instruments, Callback, CallbackArgs) -> ok when
Meter :: t(),
Expand Down
2 changes: 1 addition & 1 deletion apps/opentelemetry_experimental/include/otel_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
time_unix_nano :: integer(),
count :: number(),
sum :: float(),
bucket_counts :: tuple(),
bucket_counts :: list(),
explicit_bounds :: [float()],
exemplars :: list(),
flags :: integer(),
Expand Down
40 changes: 20 additions & 20 deletions apps/opentelemetry_experimental/src/otel_aggregation.erl
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
-module(otel_aggregation).

-export([maybe_init_aggregate/5,
-export([maybe_init_aggregate/4,
default_mapping/0,
temporality_mapping/0,
instrument_temporality/1]).

-include_lib("opentelemetry_api_experimental/include/otel_metrics.hrl").
-include("otel_metrics.hrl").
-include("otel_view.hrl").

-type temporality() :: ?AGGREGATION_TEMPORALITY_UNSPECIFIED |
?AGGREGATION_TEMPORALITY_DELTA |
Expand All @@ -16,7 +17,7 @@
-type t() :: otel_aggregation_drop:t() | otel_aggregation_sum:t() |
otel_aggregation_last_value:t() | otel_aggregation_histogram_explicit:t().

-type key() :: {atom(), opentelemetry:attributes_maps(), reference()}.
-type key() :: {atom(), opentelemetry:attributes_map(), reference()}.

-type options() :: map().

Expand All @@ -25,41 +26,40 @@
options/0,
temporality/0]).

-callback init(Key, Options) -> Aggregation when
Key :: key(),
Options :: options(),
%% Returns the aggregation's record as it is seen and updated by
%% the aggregation module in the metrics table.
-callback init(ViewAggregation, Attributes) -> Aggregation when
ViewAggregation :: #view_aggregation{},
Attributes :: opentelemetry:attributes_map(),
Aggregation :: t().

-callback aggregate(Table, Key, Value, Options) -> boolean() when
-callback aggregate(Table, ViewAggregation, Value, Attributes) -> boolean() when
Table :: ets:table(),
Key :: key(),
ViewAggregation :: #view_aggregation{},
Value :: number(),
Options :: options().
Attributes :: opentelemetry:attributes_map().

-callback checkpoint(Table, Name, ReaderId, Temporality, CollectionStartTime) -> ok when
-callback checkpoint(Table, ViewAggregation, CollectionStartTime) -> ok when
Table :: ets:table(),
Name :: atom(),
ReaderId :: reference(),
Temporality :: temporality(),
ViewAggregation :: #view_aggregation{},
CollectionStartTime :: integer().

-callback collect(Table, Name, ReaderId, Temporality, CollectionStartTime) -> [tuple()] when
-callback collect(Table, ViewAggregation, CollectionStartTime) -> tuple() when
Table :: ets:table(),
Name :: atom(),
ReaderId :: reference(),
Temporality :: temporality(),
ViewAggregation :: #view_aggregation{},
CollectionStartTime :: integer().

maybe_init_aggregate(MetricsTab, AggregationModule, Key, Value, Options) ->
case AggregationModule:aggregate(MetricsTab, Key, Value, Options) of
maybe_init_aggregate(MetricsTab, ViewAggregation=#view_aggregation{aggregation_module=AggregationModule},
Value, Attributes) ->
case AggregationModule:aggregate(MetricsTab, ViewAggregation, Value, Attributes) of
true ->
ok;
false ->
%% entry doesn't exist, create it and rerun the aggregate function
Metric = AggregationModule:init(Key, Options),
Metric = AggregationModule:init(ViewAggregation, Attributes),
%% don't overwrite a possible concurrent measurement doing the same
_ = ets:insert_new(MetricsTab, Metric),
AggregationModule:aggregate(MetricsTab, Key, Value, Options)
AggregationModule:aggregate(MetricsTab, ViewAggregation, Value, Attributes)
end.

-spec default_mapping() -> #{otel_instrument:kind() => module()}.
Expand Down
10 changes: 5 additions & 5 deletions apps/opentelemetry_experimental/src/otel_aggregation_drop.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

-export([init/2,
aggregate/4,
checkpoint/5,
collect/5]).
checkpoint/3,
collect/3]).

-include("otel_metrics.hrl").

Expand All @@ -19,8 +19,8 @@ init(_, _) ->
aggregate(_, _, _, _) ->
true.

checkpoint(_, _, _, _, _) ->
checkpoint(_, _, _) ->
ok.

collect(_, _, _, _, _) ->
[].
collect(_, _, _) ->
{}.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@

-export([init/2,
aggregate/4,
checkpoint/5,
collect/5]).
checkpoint/3,
collect/3]).

-include("otel_metrics.hrl").
-include_lib("opentelemetry_api_experimental/include/otel_metrics.hrl").
-include("otel_view.hrl").

-type t() :: #explicit_histogram_aggregation{}.

Expand All @@ -37,7 +38,10 @@

-define(MIN_DOUBLE, -9223372036854775807.0). %% the proto representation of size `fixed64'

init(Key, Options) ->
init(#view_aggregation{name=Name,
reader=ReaderId,
aggregation_options=Options}, Attributes) ->
Key = {Name, Attributes, ReaderId},
Boundaries = maps:get(boundaries, Options, ?DEFAULT_BOUNDARIES),
RecordMinMax = maps:get(record_min_max, Options, true),
#explicit_histogram_aggregation{key=Key,
Expand All @@ -51,7 +55,10 @@ init(Key, Options) ->
sum=0
}.

aggregate(Table, Key, Value, Options) ->
aggregate(Table, #view_aggregation{name=Name,
reader=ReaderId,
aggregation_options=Options}, Value, Attributes) ->
Key = {Name, Attributes, ReaderId},
Boundaries = maps:get(boundaries, Options, ?DEFAULT_BOUNDARIES),
try ets:lookup_element(Table, Key, #explicit_histogram_aggregation.bucket_counts) of
BucketCounts0 ->
Expand Down Expand Up @@ -123,8 +130,10 @@ aggregate(Table, Key, Value, Options) ->
false
end.

-dialyzer({nowarn_function, checkpoint/5}).
checkpoint(Tab, Name, ReaderPid, ?AGGREGATION_TEMPORALITY_DELTA, CollectionStartNano) ->
-dialyzer({nowarn_function, checkpoint/3}).
checkpoint(Tab, #view_aggregation{name=Name,
reader=ReaderId,
temporality=?AGGREGATION_TEMPORALITY_DELTA}, CollectionStartNano) ->
MS = [{#explicit_histogram_aggregation{key='$1',
start_time_unix_nano='_',
boundaries='$2',
Expand All @@ -136,7 +145,7 @@ checkpoint(Tab, Name, ReaderPid, ?AGGREGATION_TEMPORALITY_DELTA, CollectionStart
sum='$8'
},
[{'=:=', {element, 1, '$1'}, {const, Name}},
{'=:=', {element, 3, '$1'}, {const, ReaderPid}}],
{'=:=', {element, 3, '$1'}, {const, ReaderId}}],
[{#explicit_histogram_aggregation{key='$1',
start_time_unix_nano={const, CollectionStartNano},
boundaries='$2',
Expand All @@ -152,20 +161,23 @@ checkpoint(Tab, Name, ReaderPid, ?AGGREGATION_TEMPORALITY_DELTA, CollectionStart
_ = ets:select_replace(Tab, MS),

ok;
checkpoint(_Tab, _Name, _ReaderPid, _, _CollectionStartNano) ->
checkpoint(_Tab, _, _CollectionStartNano) ->
%% no good way to checkpoint the `counters' without being out of sync with
%% min/max/sum, so may as well just collect them in `collect', which will
%% also be out of sync, but best we can do right now

ok.

collect(Tab, Name, ReaderPid, _, CollectionStartTime) ->
collect(Tab, #view_aggregation{name=Name,
reader=ReaderPid,
temporality=Temporality}, CollectionStartTime) ->
Select = [{'$1',
[{'==', Name, {element, 1, {element, 2, '$1'}}},
{'==', ReaderPid, {element, 3, {element, 2, '$1'}}}],
['$1']}],
AttributesAggregation = ets:select(Tab, Select),
[datapoint(CollectionStartTime, SumAgg) || SumAgg <- AttributesAggregation].
#histogram{datapoints=[datapoint(CollectionStartTime, SumAgg) || SumAgg <- AttributesAggregation],
aggregation_temporality=Temporality}.

%%

Expand Down
Loading