diff --git a/apps/opentelemetry/src/otel_batch_processor.erl b/apps/opentelemetry/src/otel_batch_processor.erl index 2e33074d..c187e391 100644 --- a/apps/opentelemetry/src/otel_batch_processor.erl +++ b/apps/opentelemetry/src/otel_batch_processor.erl @@ -190,11 +190,17 @@ callback_mode() -> idle(enter, _OldState, Data=#data{exporter=undefined, exporter_config=ExporterConfig, scheduled_delay_ms=SendInterval, + check_table_size_ms=CheckInterval, reg_name=RegName}) -> Exporter = init_exporter(RegName, ExporterConfig), - {keep_state, Data#data{exporter=Exporter}, [{{timeout, export_spans}, SendInterval, export_spans}]}; -idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval}) -> - {keep_state_and_data, [{{timeout, export_spans}, SendInterval, export_spans}]}; + {keep_state, Data#data{exporter=Exporter}, + [{{timeout, export_spans}, SendInterval, export_spans}, + {{timeout, check_table_size}, CheckInterval, check_table_size}]}; +idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval, + check_table_size_ms=CheckInterval}) -> + {keep_state_and_data, + [{{timeout, export_spans}, SendInterval, export_spans}, + {{timeout, check_table_size}, CheckInterval, check_table_size}]}; idle(_, export_spans, Data=#data{exporter=undefined, exporter_config=ExporterConfig, reg_name=RegName}) -> @@ -271,15 +277,15 @@ handle_event_(_State, _, force_flush, Data) -> handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=infinity}) -> keep_state_and_data; handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=MaxQueueSize, + check_table_size_ms=CheckInterval, reg_name=RegName}) -> case ets:info(?CURRENT_TABLE(RegName), size) of M when M >= MaxQueueSize -> - disable(RegName), - keep_state_and_data; + disable(RegName); _ -> - enable(RegName), - keep_state_and_data - end; + enable(RegName) + end, + {keep_state_and_data, [{{timeout, check_table_size}, CheckInterval, check_table_size}]}; handle_event_(_, {call, From}, {set_exporter, ExporterConfig}, Data=#data{exporter=OldExporter, reg_name=RegName}) -> otel_exporter:shutdown(OldExporter), diff --git a/apps/opentelemetry/test/otel_batch_processor_SUITE.erl b/apps/opentelemetry/test/otel_batch_processor_SUITE.erl index 4034b8ba..f1e48f26 100644 --- a/apps/opentelemetry/test/otel_batch_processor_SUITE.erl +++ b/apps/opentelemetry/test/otel_batch_processor_SUITE.erl @@ -5,17 +5,19 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). +-include("otel_span.hrl"). -include_lib("opentelemetry_api/include/opentelemetry.hrl"). all() -> - [exporting_timeout_test]. + [exporting_timeout_test, + check_table_size_test]. %% verifies that after the runner has to be killed for taking too long %% that everything is still functional and the exporter does not crash exporting_timeout_test(_Config) -> process_flag(trap_exit, true), - {ok, Pid, _} = otel_batch_processor:start_link(#{reg_name => test_processor, + {ok, Pid, _} = otel_batch_processor:start_link(#{name => test_processor, resource => otel_resource:create([]), exporter => ?MODULE, exporting_timeout_ms => 1, @@ -30,6 +32,34 @@ exporting_timeout_test(_Config) -> ok end. +check_table_size_test(_Config) -> + MaxQueueSize = 10, + CheckTableSizeMs = 1, + {ok, _Pid, #{reg_name := RegName}} = otel_batch_processor:start_link( + #{name => test_processor_check_size_test, + resource => otel_resource:create([]), + exporter => ?MODULE, + exporting_timeout_ms => timer:minutes(10), + %% long enough, so that it never happens during the test + scheduled_delay_ms => timer:minutes(10), + check_table_size_ms => CheckTableSizeMs, + max_queue_size => MaxQueueSize} + ), + %% max_queue_size limit is not reached + true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}), + lists:foreach(fun(_) -> + otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}) + end, + lists:seq(1, MaxQueueSize)), + %% Wait for more than CheckTablesizeMS to be sure check timeout occurred + timer:sleep(CheckTableSizeMs * 5), + dropped = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}), + + otel_batch_processor:force_flush(#{reg_name => RegName}), + %% force_flush is async, have to wait for some long enough time again, + timer:sleep(CheckTableSizeMs * 10), + true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}). + %% exporter behaviour init(_) -> @@ -40,3 +70,13 @@ export(_, _) -> shutdown(_) -> ok. + +%% helpers + +generate_span() -> + #span{trace_id = otel_id_generator:generate_trace_id(), + span_id = otel_id_generator:generate_span_id(), + name = "test_span", + trace_flags = 1, + is_recording = true, + instrumentation_scope = #instrumentation_scope{name = "test"}}.