Skip to content

Commit

Permalink
feat: set more consumer metadata on message processing (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
btkostner authored Nov 21, 2024
1 parent 37bd7ff commit 9f42529
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
2 changes: 1 addition & 1 deletion lib/kafee/consumer/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ defmodule Kafee.Consumer.Adapter do
"""
@spec push_message(atom(), Kafee.Consumer.options(), Message.t()) :: :ok
def push_message(consumer, options, %Message{} = message) do
Message.set_logger_request_id(message)
Message.set_logger_metadata(message)

span_name = Message.get_otel_span_name(message)
span_attributes = Message.get_otel_span_attributes(message)
Expand Down
18 changes: 11 additions & 7 deletions lib/kafee/consumer/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,16 @@ defmodule Kafee.Consumer.Message do
end

@doc false
@spec set_logger_request_id(t()) :: no_return()
def set_logger_request_id(%Message{} = message) do
if request_id = get_request_id(message) do
Logger.metadata(request_id: request_id)
else
Logger.metadata(request_id: nil)
end
@spec set_logger_metadata(t()) :: no_return()
def set_logger_metadata(%Message{} = message) do
Logger.metadata([
{:"messaging.system", "kafka"},
{:"messaging.source.name", message.topic},
{:"messaging.kafka.message.key", message.key},
{:"messaging.kafka.consumer.group", message.consumer_group},
{:"messaging.kafka.partition", message.partition},
{:"messaging.kafka.message.offset", message.offset},
{:request_id, get_request_id(message)}
])
end
end

0 comments on commit 9f42529

Please sign in to comment.