Skip to content

Commit

Permalink
Merge pull request #172 from criticalbh/fix_open_port_dangling
Browse files Browse the repository at this point in the history
fix issue when port is left dangling and catch poolboy timeout errors
  • Loading branch information
Ch4s3 authored Oct 31, 2022
2 parents 1c5d30f + 725e366 commit b19485d
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 12 deletions.
23 changes: 15 additions & 8 deletions lib/faktory_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule FaktoryWorker do
For a full list of configuration options see the [Configuration](configuration.html) documentation.
"""

require Logger
alias FaktoryWorker.{ConnectionManager, Pool}

@default_timeout 5_000
Expand Down Expand Up @@ -88,14 +89,20 @@ defmodule FaktoryWorker do
- `timeout` how long to wait for a response, in ms (default: #{@default_timeout})
"""
@spec send_command(command(), [send_command_opt()]) :: FaktoryWorker.Connection.response()
@spec send_command(command(), [send_command_opt()]) :: FaktoryWorker.Connection.response() | {:error, :timeout}
def send_command(command, opts \\ []) do
opts
|> Keyword.get(:faktory_name, __MODULE__)
|> Pool.format_pool_name()
|> :poolboy.transaction(
&ConnectionManager.Server.send_command(&1, command),
Keyword.get(opts, :timeout, @default_timeout)
)
try do
opts
|> Keyword.get(:faktory_name, __MODULE__)
|> Pool.format_pool_name()
|> :poolboy.transaction(
&ConnectionManager.Server.send_command(&1, command),
Keyword.get(opts, :timeout, @default_timeout)
)
catch
:exit, error ->
Logger.error(inspect(error))
{:error, :timeout}
end
end
end
9 changes: 6 additions & 3 deletions lib/faktory_worker/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ defmodule FaktoryWorker.Connection do
end
end

@spec close(conn :: __MODULE__.t()) :: :ok
def close(%{socket_handler: socket_handler} = conn) do
socket_handler.close(conn)
end

@spec send_command(connection :: __MODULE__.t(), Protocol.protocol_command()) :: response()
def send_command(%{socket_handler: socket_handler} = connection, :end) do
with {:ok, payload} <- Protocol.encode_command(:end),
Expand Down Expand Up @@ -74,9 +79,7 @@ defmodule FaktoryWorker.Connection do

defp send_handshake({:ok, %{"v" => version}}, _, _) when version != @faktory_version do
{:error,
"Only Faktory version '#{@faktory_version}' is supported (connected to Faktory version '#{
version
}')."}
"Only Faktory version '#{@faktory_version}' is supported (connected to Faktory version '#{version}')."}
end

defp send_handshake({:ok, response}, connection, opts) do
Expand Down
6 changes: 6 additions & 0 deletions lib/faktory_worker/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ defmodule FaktoryWorker.ConnectionManager do
def send_command(state, command, allow_retry \\ true) do
case try_send_command(state, command) do
{{:error, reason}, _} when reason in @connection_errors ->
# Close dangling port
close_connection(state)
error = {:error, "Failed to connect to Faktory"}
state = %{state | conn: nil}

Expand Down Expand Up @@ -78,6 +80,10 @@ defmodule FaktoryWorker.ConnectionManager do
end
end

defp close_connection(%{conn: conn}) do
Connection.close(conn)
end

defp log_error(reason, {_, %{jid: jid}}) do
Logger.warn("[#{jid}] #{reason}")
end
Expand Down
2 changes: 2 additions & 0 deletions lib/faktory_worker/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ defmodule FaktoryWorker.Socket do

@callback recv(connection :: Connection.t(), length :: pos_integer()) ::
{:ok, String.t()} | {:error, term()}

@callback close(connection :: Connection.t()) :: :ok
end
5 changes: 5 additions & 0 deletions lib/faktory_worker/socket/ssl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ defmodule FaktoryWorker.Socket.Ssl do
result
end

@impl true
def close(%{socket: socket}) do
:ssl.close(socket)
end

defp try_connect(host, port, opts) do
host = String.to_charlist(host)
tls_verify = Keyword.get(opts, :tls_verify, true)
Expand Down
5 changes: 5 additions & 0 deletions lib/faktory_worker/socket/tcp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ defmodule FaktoryWorker.Socket.Tcp do
result
end

@impl true
def close(%{socket: socket}) do
:gen_tcp.close(socket)
end

defp try_connect(host, port) do
host = String.to_charlist(host)

Expand Down
1 change: 0 additions & 1 deletion lib/faktory_worker/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,4 @@ defmodule FaktoryWorker.Telemetry do
defp log_error(outcome, jid, args, worker_module) do
log_error("#{outcome} (#{worker_module}) jid-#{jid} #{inspect(args)}")
end

end

0 comments on commit b19485d

Please sign in to comment.