Skip to content

Commit

Permalink
fix(broker): handle subscription errors and adjust logging level for …
Browse files Browse the repository at this point in the history
…buffer threshold
  • Loading branch information
rhblind committed Jan 30, 2025
1 parent 6fcde67 commit 38cbb7f
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions lib/off_broadway/emqqt/broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ defmodule OffBroadway.EMQTT.Broker do
def handle_continue(:subscribe_to_topics, state) do
subscriptions =
Enum.map(state.topics, &subscribe(state.emqtt, &1))
|> Enum.map(fn {:ok, %{via: port}, qos} -> {port, qos} end)
|> Enum.map(fn
{:ok, %{via: port}, qos} -> {port, qos}
{:error, reason} -> Logger.error("Subscribing to topic failed with reason #{inspect(reason)}")
end)

# Start a timer to check the buffer fill percentage and pause/resume the EMQTT client
ref =
Expand Down Expand Up @@ -143,7 +146,7 @@ defmodule OffBroadway.EMQTT.Broker do
def check_buffer_threshold(buffer_size, {min_threshold, max_threshold}, ets_table, emqtt) do
case buffer_fill_percentage(buffer_size, :ets.info(ets_table, :size)) do
fill_percentage when fill_percentage >= max_threshold ->
Logger.warning(
Logger.debug(
"Buffer fill percentage for client id #{:emqtt.info(emqtt)[:clientid]} is " <>
"#{:erlang.float_to_binary(fill_percentage, decimals: 2)}%, pausing EMQTT client"
)
Expand Down

0 comments on commit 38cbb7f

Please sign in to comment.