From 5a67f73436ddbc16d4e6781893428ddba4e71ae3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Niemier?= Date: Mon, 11 Feb 2019 00:47:24 +0100 Subject: [PATCH] feat: base for migrating oc_reporter to gen_event --- src/oc_internal.hrl | 19 +++ src/oc_internal_timer.erl | 38 ++++++ src/oc_reporter_stdout.erl | 13 +- src/oc_trace.erl | 29 ++++- ...{oc_reporter.erl => oc_trace_reporter.erl} | 119 ++++-------------- src/oc_trace_sup.erl | 52 ++++++++ 6 files changed, 174 insertions(+), 96 deletions(-) create mode 100644 src/oc_internal.hrl create mode 100644 src/oc_internal_timer.erl rename src/{oc_reporter.erl => oc_trace_reporter.erl} (52%) create mode 100644 src/oc_trace_sup.erl diff --git a/src/oc_internal.hrl b/src/oc_internal.hrl new file mode 100644 index 0000000..4003aed --- /dev/null +++ b/src/oc_internal.hrl @@ -0,0 +1,19 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2017, OpenCensus Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%%------------------------------------------------------------------------ + +-define(SPAN_TAB, oc_span_tab). + +-define(SPAN_CTX, oc_span_ctx_key). +-define(TAG_CTX, oc_tag_ctx_key). diff --git a/src/oc_internal_timer.erl b/src/oc_internal_timer.erl new file mode 100644 index 0000000..27a401a --- /dev/null +++ b/src/oc_internal_timer.erl @@ -0,0 +1,38 @@ +-module(oc_internal_timer). + +-callback ping() -> ok. + +-export([start_link/1, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2]). + +-record(state, {timer :: reference(), + interval :: pos_integer(), + module :: module()}). + +start_link(Opts) -> + gen_server:start_link(?MODULE, Opts, []). + +init(Opts) -> + Interval = proplists:get_value(interval, Opts), + Module = proplists:get_value(module, Opts), + Ref = erlang:send_after(Interval, self(), ping), + + {ok, #state{timer = Ref, + interval = Interval, + module = Module}}. + +handle_call(_Msg, _From, State) -> {reply, ok, State}. + +handle_cast(_Msg, State) -> {noreply, State}. + +handle_info(ping, #state{timer = Ref, interval = Interval, module = Mod}) -> + _ = erlang:cancel_timer(Ref), + ok = Mod:ping(), + NewRef = erlang:send_after(Interval, self(), ping), + + {noreply, #state{timer = NewRef, + interval = Interval, + module = Mod}}. diff --git a/src/oc_reporter_stdout.erl b/src/oc_reporter_stdout.erl index 583568e..3df4078 100644 --- a/src/oc_reporter_stdout.erl +++ b/src/oc_reporter_stdout.erl @@ -1,10 +1,17 @@ -module(oc_reporter_stdout). +-behaviour(gen_event). + -export([init/1, - report/2]). + handle_call/2, + handle_event/2]). init(_) -> ok. -report(Spans, _) -> - [io:format("~p~n", [Span]) || Span <- Spans]. +handle_call(_Msg, State) -> {ok, ok, State}. + +handle_event({spans, Spans}, State) -> + [io:format("~p~n", [Span]) || Span <- Spans], + + {ok, State}. diff --git a/src/oc_trace.erl b/src/oc_trace.erl index 9a3a653..832ea47 100644 --- a/src/oc_trace.erl +++ b/src/oc_trace.erl @@ -49,7 +49,11 @@ message_event/4, - set_status/3]). + set_status/3, + + add_handler/1, + add_handler/2, + delete_handler/1]). -dialyzer({nowarn_function, update_trace_options/2}). @@ -327,6 +331,29 @@ link(LinkType, TraceId, SpanId, Attributes) -> span_id=SpanId, attributes=Attributes}. +%%-------------------------------------------------------------------- +%% @doc +%% @equiv add_handler(Handler, []). +%% @end +%%-------------------------------------------------------------------- +add_handler(Handler) -> add_handler(Handler, []). + +%%-------------------------------------------------------------------- +%% @doc +%% Add new handler +%% @end +%%-------------------------------------------------------------------- +add_handler(Handler, Args) -> + gen_event:add_handler(oc_trace_reporter, Handler, Args). + +%%-------------------------------------------------------------------- +%% @doc +%% Delete handler +%% @end +%%-------------------------------------------------------------------- +delete_handler(Handler) -> + gen_event:delete_handler(oc_trace_reporter, Handler, []). + %% Internal functions lookup_and_replace(#span_ctx{span_id=SpanId, diff --git a/src/oc_reporter.erl b/src/oc_trace_reporter.erl similarity index 52% rename from src/oc_reporter.erl rename to src/oc_trace_reporter.erl index cb49bf8..38f988f 100644 --- a/src/oc_reporter.erl +++ b/src/oc_trace_reporter.erl @@ -16,23 +16,14 @@ %% and creates the buffer of trace spans to be reported. %% @end %%%----------------------------------------------------------------------- --module(oc_reporter). +-module(oc_trace_reporter). --behaviour(gen_server). +-behaviour(oc_internal_timer). --compile({no_auto_import, [register/2]}). +-export([start_link/1, + store_span/1]). --export([start_link/0, - store_span/1, - register/1, - register/2]). - --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - code_change/3, - terminate/2]). +-export([ping/0]). -include("opencensus.hrl"). -include("oc_logger.hrl"). @@ -50,29 +41,20 @@ %% until it returns. -callback report(nonempty_list(opencensus:span()), opts()) -> ok. --record(state, {reporters :: [{module(), term()}], - send_interval_ms :: integer(), - timer_ref :: reference()}). - -define(BUFFER_1, oc_report_buffer1). -define(BUFFER_2, oc_report_buffer2). -define(BUFFER_STATUS, oc_report_status). -start_link() -> +start_link(Handlers) -> maybe_init_ets(), - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + case gen_event:start_link({local, ?MODULE}, []) of + {ok, Pid} -> + [gen_event:add_handler(Pid, Handler, Opts) + || {Handler, Opts} <- Handlers], -%% @doc -%% @equiv register(Reporter, []). -%% @end -register(Reporter) -> register(Reporter, []). - -%% @doc -%% Register new traces reporter `Reporter' with `Config'. -%% @end --spec register(module(), term()) -> ok. -register(Reporter, Options) -> - gen_server:call(?MODULE, {register, init_reporter({Reporter, Options})}). + {ok, Pid}; + Other -> Other + end. -spec store_span(opencensus:span()) -> true | {error, invalid_span} | {error, no_report_buffer}. store_span(Span=#span{}) -> @@ -86,55 +68,7 @@ store_span(Span=#span{}) -> store_span(_) -> {error, invalid_span}. -init(_Args) -> - SendInterval = application:get_env(opencensus, send_interval_ms, 500), - Reporters = [init_reporter(Config) || Config <- application:get_env(opencensus, reporters, [])], - Ref = erlang:send_after(SendInterval, self(), report_spans), - {ok, #state{reporters=Reporters, - send_interval_ms=SendInterval, - timer_ref=Ref}}. - -handle_call({register, Reporter}, _From, #state{reporters=Reporters} = State) -> - {reply, ok, State#state{reporters=[Reporter | Reporters]}}; -handle_call(_, _From, State) -> - {noreply, State}. - -handle_cast(_, State) -> - {noreply, State}. - -handle_info(report_spans, State=#state{reporters=Reporters, - send_interval_ms=SendInterval, - timer_ref=Ref}) -> - erlang:cancel_timer(Ref), - Ref1 = erlang:send_after(SendInterval, self(), report_spans), - send_spans(Reporters), - {noreply, State#state{timer_ref=Ref1}}. - -code_change(_, State, _) -> - {ok, State}. - -terminate(_, #state{timer_ref=Ref}) -> - erlang:cancel_timer(Ref), - ok. - -init_reporter({Reporter, Config}) -> - {Reporter, Reporter:init(Config)}; -init_reporter(Reporter) when is_atom(Reporter) -> - {Reporter, Reporter:init([])}. - -maybe_init_ets() -> - case ets:info(?BUFFER_STATUS, name) of - undefined -> - [ets:new(Tab, [named_table, public | TableProps ]) || - {Tab, TableProps} <- [{?BUFFER_1, [{write_concurrency, true}, {keypos, #span.span_id}]}, - {?BUFFER_2, [{write_concurrency, true}, {keypos, #span.span_id}]}, - {?BUFFER_STATUS, [{read_concurrency, true}]}]], - ets:insert(?BUFFER_STATUS, {current_buffer, ?BUFFER_1}); - _ -> - ok - end. - -send_spans(Reporters) -> +ping() -> [{_, Buffer}] = ets:lookup(?BUFFER_STATUS, current_buffer), NewBuffer = case Buffer of ?BUFFER_1 -> @@ -148,19 +82,20 @@ send_spans(Reporters) -> ok; Spans -> ets:delete_all_objects(Buffer), - [report(Reporter, Spans, Config) - || {Reporter, Config} <- Reporters], + gen_event:sync_notify(?MODULE, {spans, Spans}), ok end. -report(undefined, _, _) -> - ok; -report(Reporter, Spans, Config) -> - %% don't let a reporter exception crash us - try - Reporter:report(Spans, Config) - catch - ?WITH_STACKTRACE(Class, Exception, StackTrace) - ?LOG_INFO("reporter threw exception: reporter=~p ~p:~p stacktrace=~p", - [Reporter, Class, Exception, StackTrace]) +maybe_init_ets() -> + case ets:info(?BUFFER_STATUS, name) of + undefined -> + [ets:new(Tab, [named_table, public | TableProps]) || + {Tab, TableProps} <- [{?BUFFER_1, [{write_concurrency, true}, {keypos, #span.span_id}]}, + {?BUFFER_2, [{write_concurrency, true}, {keypos, #span.span_id}]}, + {?BUFFER_STATUS, [{read_concurrency, true}]}]], + ets:insert(?BUFFER_STATUS, {current_buffer, ?BUFFER_1}), + + ok; + _ -> + ok end. diff --git a/src/oc_trace_sup.erl b/src/oc_trace_sup.erl new file mode 100644 index 0000000..4f02764 --- /dev/null +++ b/src/oc_trace_sup.erl @@ -0,0 +1,52 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2017, OpenCensus Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%%------------------------------------------------------------------------ + +-module(oc_trace_sup). + +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +-include("opencensus.hrl"). + +start_link(Opts) -> + supervisor:start_link(?MODULE, Opts). + +init(Opts) -> + Interval = proplists:get_value(Opts, interval, 500), + Handlers = proplists:get_value(Opts, handlers, []), + + Exporter = #{id => exporter, + start => {oc_trace_reporter, start_link, [Handlers]}}, + % TODO: Rename oc_span_sweeper to oc_trace_sweeper + Sweeper = #{id => sweeper, + start => {oc_span_sweeper, start_link, []}}, + Timer = #{id => timer, + start => {oc_internal_timer, start_link, [{interval, Interval}, + {module, oc_trace_reporter}]} + }, + + ok = maybe_init_span_tab(), + + {ok, {#{strategy => one_for_one}, [Exporter, Timer, Sweeper]}}. + +maybe_init_span_tab() -> + case ets:info(?SPAN_TAB, name) of + undefined -> + ets:new(?SPAN_TAB, [named_table, public, {write_concurrency, true}, {keypos, #span.span_id}]), + ok; + _ -> + ok + end.