Skip to content

Commit

Permalink
Fix test issues
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyowensboggs committed Feb 23, 2024
1 parent 9809d77 commit b1f1ddf
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 40 deletions.
5 changes: 3 additions & 2 deletions lib/faktory_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ defmodule FaktoryWorker do
- `timeout` how long to wait for a response, in ms (default: #{@default_timeout})
"""
@spec send_command(command(), [send_command_opt()]) :: FaktoryWorker.Connection.response() | {:error, :timeout}
@spec send_command(command(), [send_command_opt()]) ::
FaktoryWorker.Connection.response() | {:error, :timeout}
def send_command(command, opts \\ []) do
try do
opts
Expand All @@ -101,7 +102,7 @@ defmodule FaktoryWorker do
)
catch
:exit, error ->
Logger.error(inspect(error))
Logger.error("[faktory-worker] " <> inspect(error))
{:error, :timeout}
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/faktory_worker/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ defmodule FaktoryWorker.ConnectionManager do
end
end

defp close_connection(%{conn: conn}) do
defp close_connection(%{conn: conn}) when not is_nil(conn) do
Connection.close(conn)
end

defp close_connection(%{conn: nil}), do: :ok

defp log_error(reason, {_, %{jid: jid}}) do
Logger.warn("[#{jid}] #{reason}")
end
Expand Down
40 changes: 20 additions & 20 deletions lib/faktory_worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ defmodule FaktoryWorker.Worker do

%{
state
| worker_state: :running_job,
job_timeout_ref: timeout_ref,
job_start: job_start,
job_ref: job_ref,
job_id: job["jid"],
job: job
| worker_state: :running_job,
job_timeout_ref: timeout_ref,
job_start: job_start,
job_ref: job_ref,
job_id: job["jid"],
job: job
}
end

Expand Down Expand Up @@ -200,13 +200,13 @@ defmodule FaktoryWorker.Worker do

schedule_fetch(%{
state
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
})
end

Expand All @@ -223,13 +223,13 @@ defmodule FaktoryWorker.Worker do

schedule_fetch(%{
state
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
})
end

Expand Down
5 changes: 4 additions & 1 deletion test/faktory_worker/connection_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ defmodule FaktoryWorker.ConnectionManagerTest do
{{:ok, result}, _} = ConnectionManager.send_command(state, {:push, payload})

assert result == "halt reason"
end) |> String.match?(~r/\[warn.*(?<!ing)\]*\[123456\]*[Halt: halt reason]/)
end)
|> String.match?(~r/\[warn.*(?<!ing)\]*\[123456\]*[Halt: halt reason]/)
end

test "should unset the connection when there is a socket failure" do
Expand All @@ -105,6 +106,8 @@ defmodule FaktoryWorker.ConnectionManagerTest do
{:error, :econnrefused}
end)

expect(FaktoryWorker.SocketMock, :close, fn _ -> :ok end)

opts = [socket_handler: FaktoryWorker.SocketMock]
state = ConnectionManager.new(opts)

Expand Down
50 changes: 34 additions & 16 deletions test/faktory_worker/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,17 @@ defmodule FaktoryWorker.WorkerTest do

worker_connection_mox()

expect(FaktoryWorker.SocketMock, :send, fn _, "FETCH test_queue default\r\n" ->
expect(FaktoryWorker.SocketMock, :send, fn _, "FETCH " <> msg ->
assert String.ends_with?(msg, "\r\n")

sorted_queues =
msg
|> String.trim("\r\n")
|> String.split(" ")
|> Enum.sort()

assert sorted_queues == ["default", "test_queue"]

:ok
end)

Expand Down Expand Up @@ -245,7 +255,7 @@ defmodule FaktoryWorker.WorkerTest do

state = Worker.send_fetch(state)

assert state.queues == ["test_queue", "default"]
assert Enum.sort(state.queues) == Enum.sort(["test_queue", "default"])
end
end

Expand All @@ -264,6 +274,7 @@ defmodule FaktoryWorker.WorkerTest do
|> Map.put(:job_id, "f47ccc395ef9d9646118434f")
|> Map.put(:job, %{"jid" => "f47ccc395ef9d9646118434f"})
|> Map.put(:job_timeout_ref, :erlang.make_ref())
|> Map.put(:job_start, System.monotonic_time(:millisecond))
end

test "should stop the job process" do
Expand Down Expand Up @@ -423,7 +434,8 @@ defmodule FaktoryWorker.WorkerTest do
job_timeout_ref: nil,
job_ref: nil,
job_id: nil,
job: nil
job: nil,
job_start: nil
}

new_state = Worker.handle_fetch_response({:ok, job}, state)
Expand Down Expand Up @@ -519,6 +531,7 @@ defmodule FaktoryWorker.WorkerTest do
|> Map.put(:job_ref, :erlang.make_ref())
|> Map.put(:job, job)
|> Map.put(:worker_state, :running_job)
|> Map.put(:job_start, System.monotonic_time(:millisecond))
end

test "should send a successful 'ACK' to faktory" do
Expand Down Expand Up @@ -598,11 +611,11 @@ defmodule FaktoryWorker.WorkerTest do
assert_receive {[:faktory_worker, :ack], outcome, metadata}
assert outcome == %{status: :ok}

assert metadata == %{
jid: job_id,
assert %{
jid: ^job_id,
args: [%{"hey" => "there!"}],
jobtype: "FaktoryWorker.TestQueueWorker"
}
} = metadata

detach_event_handler(event_handler_id)
end
Expand Down Expand Up @@ -729,11 +742,13 @@ defmodule FaktoryWorker.WorkerTest do
assert_receive {[:faktory_worker, :ack], outcome, metadata}
assert outcome == %{status: :error}

assert metadata == %{
jid: job_id,
assert %{
jid: ^job_id,
args: [%{"hey" => "there!"}],
jobtype: "FaktoryWorker.TestQueueWorker"
}
jobtype: "FaktoryWorker.TestQueueWorker",
queue: "test_queue",
duration: _
} = metadata

detach_event_handler(event_handler_id)
end
Expand Down Expand Up @@ -800,6 +815,7 @@ defmodule FaktoryWorker.WorkerTest do
{:error, :closed}
end)

expect(FaktoryWorker.SocketMock, :close, fn _ -> :ok end)
# the connection manager retries a failed request once
worker_connection_mox()

Expand All @@ -825,11 +841,11 @@ defmodule FaktoryWorker.WorkerTest do
assert_receive {[:faktory_worker, :failed_ack], outcome, metadata}
assert outcome == %{status: :ok}

assert metadata == %{
jid: job_id,
assert %{
jid: ^job_id,
args: [%{"hey" => "there!"}],
jobtype: "FaktoryWorker.TestQueueWorker"
}
} = metadata

assert_receive :fetch

Expand Down Expand Up @@ -866,6 +882,8 @@ defmodule FaktoryWorker.WorkerTest do
{:error, :closed}
end)

expect(FaktoryWorker.SocketMock, :close, fn conn -> conn end)

# the connection manager retries a failed request one more time
worker_connection_mox()

Expand All @@ -891,11 +909,11 @@ defmodule FaktoryWorker.WorkerTest do
assert_receive {[:faktory_worker, :failed_ack], outcome, metadata}
assert outcome == %{status: :error}

assert metadata == %{
jid: job_id,
assert %{
jid: ^job_id,
args: [%{"hey" => "there!"}],
jobtype: "FaktoryWorker.TestQueueWorker"
}
} = metadata

assert_receive :fetch

Expand Down

0 comments on commit b1f1ddf

Please sign in to comment.