Skip to content

Commit

Permalink
Handle fragmented client responses (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
starbelly authored Apr 12, 2023
1 parent 27c738f commit 9df86e6
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 5 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.8.5
0.8.6
73 changes: 71 additions & 2 deletions lib/mllp/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ defmodule MLLP.Client do
def format_error(:timeout), do: "timed out"
def format_error(:system_limit), do: "all available erlang emulator ports in use"

def format_error(:invalid_reply) do
"Invalid header received in server acknowledgment"
end

def format_error(posix) when is_atom(posix) do
case :inet.format_error(posix) do
'unknown POSIX error' ->
Expand Down Expand Up @@ -328,6 +332,9 @@ defmodule MLLP.Client do
@spec stop(pid :: pid()) :: :ok
def stop(pid), do: GenServer.stop(pid)

@header MLLP.Envelope.sb()
@trailer MLLP.Envelope.eb_cr()

## GenServer callbacks
@doc false
@spec init(Keyword.t()) :: {:ok, MLLP.Client.t(), {:continue, :init_socket}}
Expand Down Expand Up @@ -383,9 +390,10 @@ defmodule MLLP.Client do

case state.tcp.send(state.socket, payload) do
:ok ->
case state.tcp.recv(state.socket, 0, options1.reply_timeout) do
timeout = maybe_convert_time(options1.reply_timeout, :millisecond, :microsecond)

case recv_ack(state, timeout) do
{:ok, reply} ->
telemetry(:received, %{response: reply}, state)
{:reply, {:ok, reply}, state}

{:error, reason} ->
Expand Down Expand Up @@ -464,6 +472,67 @@ defmodule MLLP.Client do
stop_connection(state, reason, "process terminated")
end

defp maybe_convert_time(:infinity, _, _), do: :infinity

defp maybe_convert_time(t, from, to) do
System.convert_time_unit(t, from, to)
end

defp recv_ack(state, timeout) do
recv_ack(state, {timeout, 0}, <<>>)
end

defp recv_ack(_state, {time_left, time_owed}, _buffer)
when is_integer(time_left) and time_left <= time_owed do
{:error, :timeout}
end

defp recv_ack(state, {time_left, time_owed}, buffer) do
{res, elapsed} = do_recv(state, 0, time_left)

case res do
{:ok, reply} ->
new_buf = buffer <> reply
check = byte_size(new_buf) - 3

case new_buf do
<<@header, _ack::binary-size(check), @trailer>> ->
{:ok, new_buf}

<<@header, _rest::binary>> ->
time_credit = update_recv_time_credit(time_left, time_owed + elapsed)
recv_ack(state, time_credit, new_buf)

_ ->
{:error, :invalid_reply}
end

{:error, _} = err ->
err
end
end

defp do_recv(state, length, :infinity) do
res = state.tcp.recv(state.socket, length, :infinity)
{res, 0}
end

defp do_recv(state, length, timeout) do
timeout_in_ms = System.convert_time_unit(timeout, :microsecond, :millisecond)
t1 = System.monotonic_time(:microsecond)
res = state.tcp.recv(state.socket, length, timeout_in_ms)
t2 = System.monotonic_time(:microsecond)
{res, t2 - t1}
end

defp update_recv_time_credit(:infinity, _), do: {:infinity, 0}

defp update_recv_time_credit(time_left, time_spent) do
time_charged = div(time_spent, 1000) * 1000
time_owed = time_spent - time_charged
{time_left - time_charged, time_owed}
end

defp stop_connection(%State{} = state, error, context) do
if state.socket != nil do
telemetry(
Expand Down
5 changes: 4 additions & 1 deletion test/client_and_receiver_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,9 @@ defmodule ClientAndReceiverIntegrationTest do
@tag allowed_clients: ["client-1"]
test "returns client info in receiver context", ctx do
ack =
"MSH|^~\\&|||||20060529090131-0500||ACK^A01^ACK|01052901|P|2.5\rMSA|AA|01052901|A real MLLP message dispatcher was not provided\r"
MLLP.Envelope.wrap_message(
"MSH|^~\\&|||||20060529090131-0500||ACK^A01^ACK|01052901|P|2.5\rMSA|AA|01052901|A real MLLP message dispatcher was not provided\r"
)

MLLP.DispatcherMock
|> expect(:dispatch, fn :mllp_hl7,
Expand Down Expand Up @@ -574,6 +576,7 @@ defmodule ClientAndReceiverIntegrationTest do
:application_accept
)
|> to_string()
|> MLLP.Envelope.wrap_message()

{:ok, %{state | reply_buffer: reply}}
end
Expand Down
138 changes: 137 additions & 1 deletion test/client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ defmodule ClientTest do

assert MLLP.Client.format_error(:system_limit) ==
"all available erlang emulator ports in use"

assert MLLP.Client.format_error(:invalid_reply) ==
"Invalid header received in server acknowledgment"
end

test "when given posix error" do
Expand Down Expand Up @@ -134,7 +137,9 @@ defmodule ClientTest do
packet = MLLP.Envelope.wrap_message(raw_hl7)

tcp_reply =
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"
MLLP.Envelope.wrap_message(
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"
)

MLLP.TCPMock
|> expect(
Expand All @@ -156,6 +161,137 @@ defmodule ClientTest do
)
end

test "when replies are fragmented" do
address = {127, 0, 0, 1}
port = 4090
socket = make_ref()
raw_hl7 = HL7.Examples.wikipedia_sample_hl7()
message = HL7.Message.new(raw_hl7)
packet = MLLP.Envelope.wrap_message(raw_hl7)

tcp_reply1 =
MLLP.Envelope.wrap_message(
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"
)

{ack_frag1, ack_frag2} = String.split_at(tcp_reply1, 50)

MLLP.TCPMock
|> expect(
:connect,
fn ^address, ^port, [:binary, {:packet, 0}, {:active, false}], 2000 ->
{:ok, socket}
end
)
|> expect(:send, fn ^socket, ^packet -> :ok end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag1} end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag2} end)

{:ok, client} = Client.start_link(address, port, tcp: MLLP.TCPMock, use_backoff: true)

expected_ack = %MLLP.Ack{acknowledgement_code: "AA", text_message: "You win!"}

assert(
{:ok, :application_accept, expected_ack} ==
Client.send(client, message)
)

{ack_frag1, ack_frag2} = String.split_at(tcp_reply1, 50)

{ack_frag2, ack_frag3} = String.split_at(ack_frag2, 10)

MLLP.TCPMock
|> expect(:send, fn ^socket, ^packet -> :ok end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag1} end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag2} end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag3} end)

assert(
{:ok, :application_accept, expected_ack} ==
Client.send(client, message)
)
end

test "when replies are fragmented and the last fragment is not received" do
address = {127, 0, 0, 1}
port = 4090
socket = make_ref()
raw_hl7 = HL7.Examples.wikipedia_sample_hl7()
message = HL7.Message.new(raw_hl7)
packet = MLLP.Envelope.wrap_message(raw_hl7)

tcp_reply1 =
MLLP.Envelope.wrap_message(
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"
)

{ack_frag1, ack_frag2} = String.split_at(tcp_reply1, 50)

{ack_frag2, _ack_frag3} = String.split_at(ack_frag2, 10)

MLLP.TCPMock
|> expect(
:connect,
fn ^address, ^port, [:binary, {:packet, 0}, {:active, false}], 2000 ->
{:ok, socket}
end
)
|> expect(:send, fn ^socket, ^packet -> :ok end)
|> expect(:recv, fn ^socket, 0, _ ->
Process.sleep(1)
{:ok, ack_frag1}
end)
|> expect(:recv, fn ^socket, 0, _ ->
Process.sleep(1)
{:ok, ack_frag2}
end)

{:ok, client} =
Client.start_link(address, port, tcp: MLLP.TCPMock, use_backoff: true, reply_timeout: 3)

expected_err = %MLLP.Client.Error{context: :recv, reason: :timeout, message: "timed out"}

assert(
{:error, expected_err} ==
Client.send(client, message)
)
end

test "when reply header is invalid" do
address = {127, 0, 0, 1}
port = 4090
socket = make_ref()
raw_hl7 = HL7.Examples.wikipedia_sample_hl7()
message = HL7.Message.new(raw_hl7)
packet = MLLP.Envelope.wrap_message(raw_hl7)

tcp_reply1 =
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"

MLLP.TCPMock
|> expect(
:connect,
fn ^address, ^port, [:binary, {:packet, 0}, {:active, false}], 2000 ->
{:ok, socket}
end
)
|> expect(:send, fn ^socket, ^packet -> :ok end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, tcp_reply1} end)

{:ok, client} = Client.start_link(address, port, tcp: MLLP.TCPMock, use_backoff: true)

expected_err = %MLLP.Client.Error{
context: :recv,
message: "Invalid header received in server acknowledgment",
reason: :invalid_reply
}

assert(
{:error, expected_err} ==
Client.send(client, message)
)
end

test "when given non hl7" do
address = {127, 0, 0, 1}
port = 4090
Expand Down

0 comments on commit 9df86e6

Please sign in to comment.