Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging Handler #468

Merged
merged 8 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
name: Erlang

on:
Expand Down Expand Up @@ -44,18 +45,20 @@ jobs:
- name: Common Test tests
run: rebar3 ct --cover

- name: Publish Test Report
uses: mikepenz/action-junit-report@v2
- name: Upload Test Report
uses: actions/upload-artifact@v3
if: always() # always run even if the previous step fails
with:
report_paths: '_build/test/logs/*/junit_report.xml'
name: junit-test-results
path: '**/_build/test/logs/*/junit_report.xml'
retention-days: 1

- name: XRef
run: rebar3 xref
- name: Covertool
if: ${{ always() }}
run: rebar3 covertool generate
- uses: codecov/codecov-action@v2
- uses: codecov/codecov-action@v3
if: ${{ always() }}
with:
file: _build/test/covertool/opentelemetry.covertool.xml
Expand Down
25 changes: 25 additions & 0 deletions .github/workflows/report.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
name: report
on:
workflow_run:
workflows: [Erlang]
types: [completed]

permissions:
checks: write

jobs:
checks:
runs-on: ubuntu-latest
steps:
- name: Download Test Report
uses: dawidd6/action-download-artifact@v2
with:
name: junit-test-results
workflow: ${{ github.event.workflow.id }}
run_id: ${{ github.event.workflow_run.id }}
- name: Publish Test Report
uses: mikepenz/action-junit-report@v3
with:
commit: ${{github.event.workflow_run.head_sha}}
report_paths: '**/_build/test/logs/*/junit_report.xml'
3 changes: 2 additions & 1 deletion apps/opentelemetry/src/otel_batch_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ exporting(enter, _OldState, Data=#data{exporting_timeout_ms=ExportingTimeout,
{{timeout, export_spans}, SendInterval, export_spans}]}
end;

%% TODO: we need to just check if `exporter=undefined' right?
%% two hacks since we can't transition to a new state or send an action from `enter'
exporting(state_timeout, no_exporter, Data) ->
{next_state, idle, Data};
Expand Down Expand Up @@ -385,5 +386,5 @@ report_cb(#{source := exporter,
reason := Reason,
exporter := ExporterModule,
stacktrace := StackTrace}) ->
{"exporter threw exception: exporter=~p ~ts",
{"span exporter threw exception: exporter=~p ~ts",
[ExporterModule, otel_utils:format_exception(Kind, Reason, StackTrace)]}.
4 changes: 4 additions & 0 deletions apps/opentelemetry/src/otel_exporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
-export([init/1,
export_traces/4,
export_metrics/4,
export_logs/4,
shutdown/1,
report_cb/1]).

Expand Down Expand Up @@ -121,6 +122,9 @@ export_traces(ExporterModule, SpansTid, Resource, Config) ->
export_metrics(ExporterModule, MetricsTid, Resource, Config) ->
ExporterModule:export(metrics, MetricsTid, Resource, Config).

export_logs(ExporterModule, Batch, Resource, Config) ->
ExporterModule:export(logs, Batch, Resource, Config).

shutdown(undefined) ->
ok;
shutdown({ExporterModule, Config}) ->
Expand Down
10 changes: 10 additions & 0 deletions apps/opentelemetry_api/src/opentelemetry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
set_tracer/2,
get_application/1,
get_application_tracer/1,
get_application_scope/1,
set_text_map_propagator/1,
set_text_map_extractor/1,
get_text_map_extractor/0,
Expand Down Expand Up @@ -224,6 +225,15 @@ get_tracer(Name, Vsn, SchemaUrl) ->
get_application_tracer(ModuleName) ->
get_tracer(get_application(ModuleName)).

-spec get_application_scope(module()) -> instrumentation_scope() | undefined.
get_application_scope(ModuleName) ->
case get_application(ModuleName) of
{Name, Vsn, SchemaUrl} ->
instrumentation_scope(Name, Vsn, SchemaUrl);
_ ->
instrumentation_scope(<<>>, <<>>, <<>>)
end.

%% looks up the name, version and schema_url used to create a Trace for the OTP
%% Application a module is in. This name is used to look up a Tracer to use so
%% if none is found for the ModuleName the key used for the default tracer.
Expand Down
250 changes: 250 additions & 0 deletions apps/opentelemetry_experimental/src/otel_log_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
%%%------------------------------------------------------------------------
%% Copyright 2022, OpenTelemetry Authors
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% @doc
%% @end
%%%-------------------------------------------------------------------------
-module(otel_log_handler).

-behaviour(gen_statem).

-include_lib("kernel/include/logger.hrl").
-include_lib("opentelemetry_api/include/opentelemetry.hrl").

-export([start_link/2]).

-export([log/2,
adding_handler/1,
removing_handler/1,
changing_config/3,
filter_config/1,
report_cb/1]).
ferd marked this conversation as resolved.
Show resolved Hide resolved

-export([init/1,
callback_mode/0,
idle/3,
exporting/3,
handle_event/3]).

-type config() :: #{id => logger:handler_id(),
regname := atom(),
config => term(),
level => logger:level() | all | none,
module => module(),
filter_default => log | stop,
filters => [{logger:filter_id(), logger:filter()}],
formatter => {module(), logger:formatter_config()}}.

-define(DEFAULT_CALL_TIMEOUT, 5000).
-define(DEFAULT_MAX_QUEUE_SIZE, 2048).
-define(DEFAULT_SCHEDULED_DELAY_MS, timer:seconds(5)).
-define(DEFAULT_EXPORTER_TIMEOUT_MS, timer:minutes(5)).

-define(name_to_reg_name(Module, Id),
list_to_atom(lists:concat([Module, "_", Id]))).

-record(data, {exporter :: {module(), term()} | undefined,
exporter_config :: {module(), term()} | undefined,
resource :: otel_resource:t(),

runner_pid :: pid() | undefined,
max_queue_size :: integer() | infinity,
exporting_timeout_ms :: integer(),
scheduled_delay_ms :: integer(),

config :: #{},
batch :: #{opentelemetry:instrumentation_scope() => [logger:log_event()]}}).

start_link(RegName, Config) ->
gen_statem:start_link({local, RegName}, ?MODULE, [RegName, Config], []).

-spec adding_handler(Config) -> {ok, Config} | {error, Reason} when
Config :: config(),
Reason :: term().
adding_handler(#{id := Id,
module := Module}=Config) ->
RegName = ?name_to_reg_name(Module, Id),
ChildSpec =
#{id => Id,
start => {?MODULE, start_link, [RegName, Config]},
restart => temporary,
shutdown => 2000,
type => worker,
modules => [?MODULE]},
case supervisor:start_child(opentelemetry_experimental_sup, ChildSpec) of
{ok, _Pid} ->
%% ok = logger_handler_watcher:register_handler(Name,Pid),
%% OlpOpts = logger_olp:get_opts(Olp),
{ok, Config#{regname => RegName}};
{error, {Reason, Ch}} when is_tuple(Ch), element(1, Ch) == child ->
{error, Reason};
Error ->
Error
end.

%%%-----------------------------------------------------------------
%%% Updating handler config
-spec changing_config(SetOrUpdate, OldConfig, NewConfig) ->
{ok,Config} | {error,Reason} when
SetOrUpdate :: set | update,
OldConfig :: config(),
NewConfig :: config(),
Config :: config(),
Reason :: term().
changing_config(SetOrUpdate, OldConfig, NewConfig=#{regname := Id}) ->
gen_statem:call(Id, {changing_config, SetOrUpdate, OldConfig, NewConfig}).

%%%-----------------------------------------------------------------
%%% Handler being removed
-spec removing_handler(Config) -> ok when
Config :: config().
removing_handler(Config=#{regname := Id}) ->
gen_statem:call(Id, {removing_handler, Config}).

%%%-----------------------------------------------------------------
%%% Log a string or report
-spec log(LogEvent, Config) -> ok when
LogEvent :: logger:log_event(),
Config :: config().
log(LogEvent, _Config=#{regname := Id}) ->
Scope = case LogEvent of
#{meta := #{otel_scope := Scope0=#instrumentation_scope{}}} ->
Scope0;
#{meta := #{mfa := {Module, _, _}}} ->
opentelemetry:get_application_scope(Module);
_ ->
opentelemetry:instrumentation_scope(<<>>, <<>>, <<>>)
end,

gen_statem:cast(Id, {log, Scope, LogEvent}).

%%%-----------------------------------------------------------------
%%% Remove internal fields from configuration
-spec filter_config(Config) -> Config when
Config :: config().
filter_config(Config=#{regname := Id}) ->
gen_statem:call(Id, {filter_config, Config}).

init([_RegName, Config]) ->
process_flag(trap_exit, true),

Resource = otel_resource_detector:get_resource(),

SizeLimit = maps:get(max_queue_size, Config, ?DEFAULT_MAX_QUEUE_SIZE),
ExportingTimeout = maps:get(exporting_timeout_ms, Config, ?DEFAULT_EXPORTER_TIMEOUT_MS),
ScheduledDelay = maps:get(scheduled_delay_ms, Config, ?DEFAULT_SCHEDULED_DELAY_MS),

ExporterConfig = maps:get(exporter, Config, {opentelemetry_exporter, #{protocol => grpc}}),

{ok, idle, #data{exporter=undefined,
exporter_config=ExporterConfig,
resource=Resource,
config=Config,
max_queue_size=case SizeLimit of
infinity -> infinity;
_ -> SizeLimit div erlang:system_info(wordsize)
end,
exporting_timeout_ms=ExportingTimeout,
scheduled_delay_ms=ScheduledDelay,
batch=#{}}}.

callback_mode() ->
[state_functions, state_enter].

idle(enter, _OldState, Data=#data{exporter=undefined,
exporter_config=ExporterConfig,
scheduled_delay_ms=SendInterval}) ->
Exporter = init_exporter(ExporterConfig),
{keep_state, Data#data{exporter=Exporter},
[{{timeout, export_logs}, SendInterval, export_logs}]};
idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval}) ->
{keep_state_and_data, [{{timeout, export_logs}, SendInterval, export_logs}]};
idle(_, export_logs, Data=#data{exporter=undefined,
exporter_config=ExporterConfig}) ->
Exporter = init_exporter(ExporterConfig),
{next_state, exporting, Data#data{exporter=Exporter}, [{next_event, internal, export}]};
idle(_, export_logs, Data) ->
{next_state, exporting, Data, [{next_event, internal, export}]};
idle(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, Data).

exporting({timeout, export_logs}, export_logs, _) ->
{keep_state_and_data, [postpone]};
exporting(enter, _OldState, _Data) ->
keep_state_and_data;
exporting(internal, export, Data=#data{exporter=Exporter,
resource=Resource,
config=Config,
batch=Batch}) when map_size(Batch) =/= 0 ->
_ = export(Exporter, Resource, Batch, Config),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm noticing that this call here does not let the exporter update its state. If it could, we'd also be able to drastically simplify the FSM by letting export/4 do the optional call to init_exporter/1 and return the new config as well as letting the exporter update its state on return.

I however don't know what the future plans are for this module, so this drastic simplification possibly clashes with said plans.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea... I planned to simplify it by breaking out the shared logic from the batch_processor module. If that works there too then it makes sense to do.

{next_state, idle, Data#data{batch=#{}}};
exporting(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, Data).

handle_event({call, From}, {changing_config, _SetOrUpdate, _OldConfig, NewConfig}, Data) ->
{keep_state, Data#data{config=NewConfig}, [{reply, From, NewConfig}]};
handle_event({call, From}, {removing_handler, Config}, _Data) ->
%% TODO: flush
{keep_state_and_data, [{reply, From, Config}]};
handle_event({call, From}, {filter_handler, Config}, Data) ->
{keep_state, Data, [{reply, From, Config}]};
handle_event({call, From}, {filter_config, Config}, Data) ->
{keep_state, Data, [{reply, From, Config}]};
ferd marked this conversation as resolved.
Show resolved Hide resolved
handle_event({call, _From}, _Msg, _Data) ->
keep_state_and_data;
handle_event(cast, {log, Scope, LogEvent}, Data=#data{batch=Logs}) ->
{keep_state, Data#data{batch=maps:update_with(Scope, fun(V) ->
[LogEvent | V]
end, [LogEvent], Logs)}};
handle_event(_, _, _) ->
keep_state_and_data.

%%

init_exporter(ExporterConfig) ->
case otel_exporter:init(ExporterConfig) of
Exporter when Exporter =/= undefined andalso Exporter =/= none ->
Exporter;
_ ->
undefined
end.

export(undefined, _, _, _) ->
true;
export({ExporterModule, ExporterConfig}, Resource, Batch, Config) ->
%% don't let a exporter exception crash us
%% and return true if exporter failed
try
otel_exporter:export_logs(ExporterModule, {Batch, Config}, Resource, ExporterConfig)
=:= failed_not_retryable
catch
Kind:Reason:StackTrace ->
?LOG_WARNING(#{source => exporter,
during => export,
kind => Kind,
reason => Reason,
exporter => ExporterModule,
stacktrace => StackTrace}, #{report_cb => fun ?MODULE:report_cb/1}),
true
ferd marked this conversation as resolved.
Show resolved Hide resolved
end.

%% logger format functions
report_cb(#{source := exporter,
during := export,
kind := Kind,
reason := Reason,
exporter := ExporterModule,
stacktrace := StackTrace}) ->
{"log exporter threw exception: exporter=~p ~ts",
[ExporterModule, otel_utils:format_exception(Kind, Reason, StackTrace)]}.
3 changes: 2 additions & 1 deletion apps/opentelemetry_exporter/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
{opentelemetry_api, "~> 1.1"}]}.

{grpc, [{protos, ["opentelemetry-proto/opentelemetry/proto/collector/trace/v1",
"opentelemetry-proto/opentelemetry/proto/collector/metrics/v1"]},
"opentelemetry-proto/opentelemetry/proto/collector/metrics/v1",
"opentelemetry-proto/opentelemetry/proto/collector/logs/v1"]},
{gpb_opts, [{module_name_prefix, "opentelemetry_exporter_"},
{module_name_suffix, "_pb"},
{i, "apps/opentelemetry_exporter/opentelemetry-proto/"}]}]}.
Expand Down
18 changes: 18 additions & 0 deletions apps/opentelemetry_exporter/src/opentelemetry_exporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,24 @@ export(metrics, Tab, Resource, #state{protocol=grpc,
?LOG_INFO("OTLP grpc export failed with error: ~p", [Reason]),
error
end;
export(logs, {Logs, Config}, Resource, #state{protocol=grpc,
grpc_metadata=Metadata,
channel_pid=_ChannelPid}) ->
ExportRequest = otel_otlp_logs:to_proto(Logs, Resource, Config),
Ctx = grpcbox_metadata:append_to_outgoing_ctx(ctx:new(), Metadata),
case opentelemetry_logs_service:export(Ctx, ExportRequest, #{channel => ?MODULE}) of
{ok, _Response, _ResponseMetadata} ->
ok;
{error, {Status, Message}, _} ->
?LOG_INFO("OTLP grpc export failed with GRPC status ~s : ~s", [Status, Message]),
error;
{http_error, {Status, _}, _} ->
?LOG_INFO("OTLP grpc export failed with HTTP status code ~s", [Status]),
error;
{error, Reason} ->
?LOG_INFO("OTLP grpc export failed with error: ~p", [Reason]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any chance these logs messages are gonna be fed back into the OTel exporter and cause issues with write amplification?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh they definitely will

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a domain can be used here but then by default all these logs would be lost.. ugh

error
end;
export(_, _Tab, _Resource, _State) ->
{error, unimplemented}.

Expand Down
Loading