Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle fragmented client responses #63

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.8.5
0.8.6
73 changes: 71 additions & 2 deletions lib/mllp/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ defmodule MLLP.Client do
def format_error(:timeout), do: "timed out"
def format_error(:system_limit), do: "all available erlang emulator ports in use"

def format_error(:invalid_reply) do
"Invalid header received in server acknowledgment"
end

def format_error(posix) when is_atom(posix) do
case :inet.format_error(posix) do
'unknown POSIX error' ->
Expand Down Expand Up @@ -328,6 +332,9 @@ defmodule MLLP.Client do
@spec stop(pid :: pid()) :: :ok
def stop(pid), do: GenServer.stop(pid)

@header MLLP.Envelope.sb()
@trailer MLLP.Envelope.eb_cr()

## GenServer callbacks
@doc false
@spec init(Keyword.t()) :: {:ok, MLLP.Client.t(), {:continue, :init_socket}}
Expand Down Expand Up @@ -383,9 +390,10 @@ defmodule MLLP.Client do

case state.tcp.send(state.socket, payload) do
:ok ->
case state.tcp.recv(state.socket, 0, options1.reply_timeout) do
timeout = maybe_convert_time(options1.reply_timeout, :millisecond, :microsecond)

case recv_ack(state, timeout) do
{:ok, reply} ->
telemetry(:received, %{response: reply}, state)
{:reply, {:ok, reply}, state}

{:error, reason} ->
Expand Down Expand Up @@ -464,6 +472,67 @@ defmodule MLLP.Client do
stop_connection(state, reason, "process terminated")
end

defp maybe_convert_time(:infinity, _, _), do: :infinity

defp maybe_convert_time(t, from, to) do
System.convert_time_unit(t, from, to)
end

defp recv_ack(state, timeout) do
recv_ack(state, {timeout, 0}, <<>>)
end

defp recv_ack(_state, {time_left, time_owed}, _buffer)
when is_integer(time_left) and time_left <= time_owed do
{:error, :timeout}
end

defp recv_ack(state, {time_left, time_owed}, buffer) do
{res, elapsed} = do_recv(state, 0, time_left)

case res do
{:ok, reply} ->
new_buf = buffer <> reply
check = byte_size(new_buf) - 3

case new_buf do
vikas15bhardwaj marked this conversation as resolved.
Show resolved Hide resolved
<<@header, _ack::binary-size(check), @trailer>> ->
{:ok, new_buf}

<<@header, _rest::binary>> ->
time_credit = update_recv_time_credit(time_left, time_owed + elapsed)
recv_ack(state, time_credit, new_buf)

_ ->
starbelly marked this conversation as resolved.
Show resolved Hide resolved
{:error, :invalid_reply}
end

{:error, _} = err ->
err
end
end

defp do_recv(state, length, :infinity) do
res = state.tcp.recv(state.socket, length, :infinity)
{res, 0}
end

defp do_recv(state, length, timeout) do
timeout_in_ms = System.convert_time_unit(timeout, :microsecond, :millisecond)
t1 = System.monotonic_time(:microsecond)
res = state.tcp.recv(state.socket, length, timeout_in_ms)
t2 = System.monotonic_time(:microsecond)
{res, t2 - t1}
end

defp update_recv_time_credit(:infinity, _), do: {:infinity, 0}

defp update_recv_time_credit(time_left, time_spent) do
time_charged = div(time_spent, 1000) * 1000
time_owed = time_spent - time_charged
{time_left - time_charged, time_owed}
end

defp stop_connection(%State{} = state, error, context) do
if state.socket != nil do
telemetry(
Expand Down
5 changes: 4 additions & 1 deletion test/client_and_receiver_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,9 @@ defmodule ClientAndReceiverIntegrationTest do
@tag allowed_clients: ["client-1"]
test "returns client info in receiver context", ctx do
ack =
"MSH|^~\\&|||||20060529090131-0500||ACK^A01^ACK|01052901|P|2.5\rMSA|AA|01052901|A real MLLP message dispatcher was not provided\r"
MLLP.Envelope.wrap_message(
"MSH|^~\\&|||||20060529090131-0500||ACK^A01^ACK|01052901|P|2.5\rMSA|AA|01052901|A real MLLP message dispatcher was not provided\r"
)

MLLP.DispatcherMock
|> expect(:dispatch, fn :mllp_hl7,
Expand Down Expand Up @@ -574,6 +576,7 @@ defmodule ClientAndReceiverIntegrationTest do
:application_accept
)
|> to_string()
|> MLLP.Envelope.wrap_message()

{:ok, %{state | reply_buffer: reply}}
end
Expand Down
138 changes: 137 additions & 1 deletion test/client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ defmodule ClientTest do

assert MLLP.Client.format_error(:system_limit) ==
"all available erlang emulator ports in use"

assert MLLP.Client.format_error(:invalid_reply) ==
"Invalid header received in server acknowledgment"
end

test "when given posix error" do
Expand Down Expand Up @@ -134,7 +137,9 @@ defmodule ClientTest do
packet = MLLP.Envelope.wrap_message(raw_hl7)

tcp_reply =
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"
MLLP.Envelope.wrap_message(
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"
)

MLLP.TCPMock
|> expect(
Expand All @@ -156,6 +161,137 @@ defmodule ClientTest do
)
end

test "when replies are fragmented" do
address = {127, 0, 0, 1}
port = 4090
socket = make_ref()
raw_hl7 = HL7.Examples.wikipedia_sample_hl7()
message = HL7.Message.new(raw_hl7)
packet = MLLP.Envelope.wrap_message(raw_hl7)

tcp_reply1 =
MLLP.Envelope.wrap_message(
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"
)

{ack_frag1, ack_frag2} = String.split_at(tcp_reply1, 50)

MLLP.TCPMock
|> expect(
:connect,
fn ^address, ^port, [:binary, {:packet, 0}, {:active, false}], 2000 ->
{:ok, socket}
end
)
|> expect(:send, fn ^socket, ^packet -> :ok end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag1} end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag2} end)

{:ok, client} = Client.start_link(address, port, tcp: MLLP.TCPMock, use_backoff: true)

expected_ack = %MLLP.Ack{acknowledgement_code: "AA", text_message: "You win!"}

assert(
{:ok, :application_accept, expected_ack} ==
Client.send(client, message)
)

{ack_frag1, ack_frag2} = String.split_at(tcp_reply1, 50)

{ack_frag2, ack_frag3} = String.split_at(ack_frag2, 10)

MLLP.TCPMock
|> expect(:send, fn ^socket, ^packet -> :ok end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag1} end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag2} end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag3} end)

assert(
{:ok, :application_accept, expected_ack} ==
Client.send(client, message)
)
end

test "when replies are fragmented and the last fragment is not received" do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunately a brittle test. If we keep this approach, then we might want to tag it as such and not run it by default.

address = {127, 0, 0, 1}
port = 4090
socket = make_ref()
raw_hl7 = HL7.Examples.wikipedia_sample_hl7()
message = HL7.Message.new(raw_hl7)
packet = MLLP.Envelope.wrap_message(raw_hl7)

tcp_reply1 =
MLLP.Envelope.wrap_message(
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"
)

{ack_frag1, ack_frag2} = String.split_at(tcp_reply1, 50)

{ack_frag2, _ack_frag3} = String.split_at(ack_frag2, 10)

MLLP.TCPMock
|> expect(
:connect,
fn ^address, ^port, [:binary, {:packet, 0}, {:active, false}], 2000 ->
{:ok, socket}
end
)
|> expect(:send, fn ^socket, ^packet -> :ok end)
|> expect(:recv, fn ^socket, 0, _ ->
Process.sleep(1)
{:ok, ack_frag1}
end)
|> expect(:recv, fn ^socket, 0, _ ->
Process.sleep(1)
{:ok, ack_frag2}
end)

{:ok, client} =
Client.start_link(address, port, tcp: MLLP.TCPMock, use_backoff: true, reply_timeout: 3)

expected_err = %MLLP.Client.Error{context: :recv, reason: :timeout, message: "timed out"}

assert(
{:error, expected_err} ==
Client.send(client, message)
)
end

test "when reply header is invalid" do
address = {127, 0, 0, 1}
port = 4090
socket = make_ref()
raw_hl7 = HL7.Examples.wikipedia_sample_hl7()
message = HL7.Message.new(raw_hl7)
packet = MLLP.Envelope.wrap_message(raw_hl7)

tcp_reply1 =
"MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r"

MLLP.TCPMock
|> expect(
:connect,
fn ^address, ^port, [:binary, {:packet, 0}, {:active, false}], 2000 ->
{:ok, socket}
end
)
|> expect(:send, fn ^socket, ^packet -> :ok end)
|> expect(:recv, fn ^socket, 0, :infinity -> {:ok, tcp_reply1} end)

{:ok, client} = Client.start_link(address, port, tcp: MLLP.TCPMock, use_backoff: true)

expected_err = %MLLP.Client.Error{
context: :recv,
message: "Invalid header received in server acknowledgment",
reason: :invalid_reply
}

assert(
{:error, expected_err} ==
Client.send(client, message)
)
end

test "when given non hl7" do
address = {127, 0, 0, 1}
port = 4090
Expand Down