Skip to content

Commit

Permalink
add: convert options to charlist
Browse files Browse the repository at this point in the history
  • Loading branch information
rhblind committed Oct 29, 2024
1 parent cc3c92d commit b63948f
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
1 change: 1 addition & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ _Released unreleased_
- Emitting `off_broadway_emqtt.receive_message.ack` reads message topic from message receipt instead of from the message body.
This ensures that topic is included in telemetry events even if the message has been altered during dispatch.
- Move `emqtt.start_link/1` and `emqtt.connect/1` to a `handle_continue/2` callback to prevent blocking `GenServer.init/1`.
- Convert `host` and `server_name_indication` to charlist when validating options.

## v0.1.0 - Initial release

Expand Down
17 changes: 9 additions & 8 deletions lib/off_broadway/emqqt/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule OffBroadway.EMQTT.Options do
keys: [
host: [
doc: "The host of the MQTT broker",
type: :string,
type: {:custom, __MODULE__, :type_charlist, [[{:name, :name}]]},
required: true
],
port: [
Expand Down Expand Up @@ -73,7 +73,7 @@ defmodule OffBroadway.EMQTT.Options do
],
server_name_indication: [
doc: "Server name indication",
type: :string
type: {:custom, __MODULE__, :type_charlist, [[{:name, :name}]]}
],
verify: [
doc: "Verify mode",
Expand Down Expand Up @@ -201,14 +201,15 @@ defmodule OffBroadway.EMQTT.Options do
def type_subopt(value, [{:name, _}]) when value in @qos, do: {:ok, value}
def type_subopt({:rh, qos} = value, [{:name, _}]) when qos in [0, 1, 2], do: {:ok, value}
def type_subopt({:qos, qos} = value, [{:name, _}]) when qos in [0, 1, 2], do: {:ok, value}

def type_subopt({:rap, boolean} = value, [{:name, _}]) when is_boolean(boolean),
do: {:ok, value}

def type_subopt({:nl, boolean} = value, [{:name, _}]) when is_boolean(boolean),
do: {:ok, value}
def type_subopt({:rap, boolean} = value, [{:name, _}]) when is_boolean(boolean), do: {:ok, value}
def type_subopt({:nl, boolean} = value, [{:name, _}]) when is_boolean(boolean), do: {:ok, value}

def type_subopt(value, [{:name, name}]) do
{:error, "#{inspect(value)} is not a valid subopt value for #{name}"}
end

def type_charlist(value, [{:name, _}]) when is_binary(value), do: {:ok, to_charlist(value)}

def type_charlist(value, [{:name, name}]),
do: {:error, "#{inspect(value)} is not a valid value for #{name}, expected a binary"}
end
4 changes: 1 addition & 3 deletions lib/off_broadway/emqqt/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ defmodule OffBroadway.EMQTT.Producer do
with {:ok, broadway} <- Keyword.fetch(broadway_opts, :name),
{:ok, config} <- Keyword.fetch(opts, :config),
{:ok, client_id} <- Keyword.fetch(config, :clientid),
{host, config} <- Keyword.pop(config, :host),
config <- Keyword.put(config, :name, emqtt_process_name(client_id)),
config <- Keyword.put(config, :host, to_charlist(host)) do
config <- Keyword.put(config, :name, emqtt_process_name(client_id)) do
:persistent_term.put(broadway, %{
config: config,
# FIXME Acking should be configurable based on if the :emqtt process is started
Expand Down

0 comments on commit b63948f

Please sign in to comment.