From 9df86e6850b2fc1cac2383560d7b725bfe45cbe3 Mon Sep 17 00:00:00 2001 From: Bryan Paxton <39971740+starbelly@users.noreply.github.com> Date: Wed, 12 Apr 2023 15:12:12 -0500 Subject: [PATCH] Handle fragmented client responses (#63) --- VERSION | 2 +- lib/mllp/client.ex | 73 ++++++++- test/client_and_receiver_integration_test.exs | 5 +- test/client_test.exs | 138 +++++++++++++++++- 4 files changed, 213 insertions(+), 5 deletions(-) diff --git a/VERSION b/VERSION index 7ada0d3..7fc2521 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.8.5 +0.8.6 diff --git a/lib/mllp/client.ex b/lib/mllp/client.ex index a735ed5..adf3fac 100755 --- a/lib/mllp/client.ex +++ b/lib/mllp/client.ex @@ -178,6 +178,10 @@ defmodule MLLP.Client do def format_error(:timeout), do: "timed out" def format_error(:system_limit), do: "all available erlang emulator ports in use" + def format_error(:invalid_reply) do + "Invalid header received in server acknowledgment" + end + def format_error(posix) when is_atom(posix) do case :inet.format_error(posix) do 'unknown POSIX error' -> @@ -328,6 +332,9 @@ defmodule MLLP.Client do @spec stop(pid :: pid()) :: :ok def stop(pid), do: GenServer.stop(pid) + @header MLLP.Envelope.sb() + @trailer MLLP.Envelope.eb_cr() + ## GenServer callbacks @doc false @spec init(Keyword.t()) :: {:ok, MLLP.Client.t(), {:continue, :init_socket}} @@ -383,9 +390,10 @@ defmodule MLLP.Client do case state.tcp.send(state.socket, payload) do :ok -> - case state.tcp.recv(state.socket, 0, options1.reply_timeout) do + timeout = maybe_convert_time(options1.reply_timeout, :millisecond, :microsecond) + + case recv_ack(state, timeout) do {:ok, reply} -> - telemetry(:received, %{response: reply}, state) {:reply, {:ok, reply}, state} {:error, reason} -> @@ -464,6 +472,67 @@ defmodule MLLP.Client do stop_connection(state, reason, "process terminated") end + defp maybe_convert_time(:infinity, _, _), do: :infinity + + defp maybe_convert_time(t, from, to) do + System.convert_time_unit(t, from, to) + end + + defp recv_ack(state, timeout) do + recv_ack(state, {timeout, 0}, <<>>) + end + + defp recv_ack(_state, {time_left, time_owed}, _buffer) + when is_integer(time_left) and time_left <= time_owed do + {:error, :timeout} + end + + defp recv_ack(state, {time_left, time_owed}, buffer) do + {res, elapsed} = do_recv(state, 0, time_left) + + case res do + {:ok, reply} -> + new_buf = buffer <> reply + check = byte_size(new_buf) - 3 + + case new_buf do + <<@header, _ack::binary-size(check), @trailer>> -> + {:ok, new_buf} + + <<@header, _rest::binary>> -> + time_credit = update_recv_time_credit(time_left, time_owed + elapsed) + recv_ack(state, time_credit, new_buf) + + _ -> + {:error, :invalid_reply} + end + + {:error, _} = err -> + err + end + end + + defp do_recv(state, length, :infinity) do + res = state.tcp.recv(state.socket, length, :infinity) + {res, 0} + end + + defp do_recv(state, length, timeout) do + timeout_in_ms = System.convert_time_unit(timeout, :microsecond, :millisecond) + t1 = System.monotonic_time(:microsecond) + res = state.tcp.recv(state.socket, length, timeout_in_ms) + t2 = System.monotonic_time(:microsecond) + {res, t2 - t1} + end + + defp update_recv_time_credit(:infinity, _), do: {:infinity, 0} + + defp update_recv_time_credit(time_left, time_spent) do + time_charged = div(time_spent, 1000) * 1000 + time_owed = time_spent - time_charged + {time_left - time_charged, time_owed} + end + defp stop_connection(%State{} = state, error, context) do if state.socket != nil do telemetry( diff --git a/test/client_and_receiver_integration_test.exs b/test/client_and_receiver_integration_test.exs index bc4f8cb..4e0c905 100644 --- a/test/client_and_receiver_integration_test.exs +++ b/test/client_and_receiver_integration_test.exs @@ -519,7 +519,9 @@ defmodule ClientAndReceiverIntegrationTest do @tag allowed_clients: ["client-1"] test "returns client info in receiver context", ctx do ack = - "MSH|^~\\&|||||20060529090131-0500||ACK^A01^ACK|01052901|P|2.5\rMSA|AA|01052901|A real MLLP message dispatcher was not provided\r" + MLLP.Envelope.wrap_message( + "MSH|^~\\&|||||20060529090131-0500||ACK^A01^ACK|01052901|P|2.5\rMSA|AA|01052901|A real MLLP message dispatcher was not provided\r" + ) MLLP.DispatcherMock |> expect(:dispatch, fn :mllp_hl7, @@ -574,6 +576,7 @@ defmodule ClientAndReceiverIntegrationTest do :application_accept ) |> to_string() + |> MLLP.Envelope.wrap_message() {:ok, %{state | reply_buffer: reply}} end diff --git a/test/client_test.exs b/test/client_test.exs index abf7f94..2c05881 100755 --- a/test/client_test.exs +++ b/test/client_test.exs @@ -43,6 +43,9 @@ defmodule ClientTest do assert MLLP.Client.format_error(:system_limit) == "all available erlang emulator ports in use" + + assert MLLP.Client.format_error(:invalid_reply) == + "Invalid header received in server acknowledgment" end test "when given posix error" do @@ -134,7 +137,9 @@ defmodule ClientTest do packet = MLLP.Envelope.wrap_message(raw_hl7) tcp_reply = - "MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r" + MLLP.Envelope.wrap_message( + "MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r" + ) MLLP.TCPMock |> expect( @@ -156,6 +161,137 @@ defmodule ClientTest do ) end + test "when replies are fragmented" do + address = {127, 0, 0, 1} + port = 4090 + socket = make_ref() + raw_hl7 = HL7.Examples.wikipedia_sample_hl7() + message = HL7.Message.new(raw_hl7) + packet = MLLP.Envelope.wrap_message(raw_hl7) + + tcp_reply1 = + MLLP.Envelope.wrap_message( + "MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r" + ) + + {ack_frag1, ack_frag2} = String.split_at(tcp_reply1, 50) + + MLLP.TCPMock + |> expect( + :connect, + fn ^address, ^port, [:binary, {:packet, 0}, {:active, false}], 2000 -> + {:ok, socket} + end + ) + |> expect(:send, fn ^socket, ^packet -> :ok end) + |> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag1} end) + |> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag2} end) + + {:ok, client} = Client.start_link(address, port, tcp: MLLP.TCPMock, use_backoff: true) + + expected_ack = %MLLP.Ack{acknowledgement_code: "AA", text_message: "You win!"} + + assert( + {:ok, :application_accept, expected_ack} == + Client.send(client, message) + ) + + {ack_frag1, ack_frag2} = String.split_at(tcp_reply1, 50) + + {ack_frag2, ack_frag3} = String.split_at(ack_frag2, 10) + + MLLP.TCPMock + |> expect(:send, fn ^socket, ^packet -> :ok end) + |> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag1} end) + |> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag2} end) + |> expect(:recv, fn ^socket, 0, :infinity -> {:ok, ack_frag3} end) + + assert( + {:ok, :application_accept, expected_ack} == + Client.send(client, message) + ) + end + + test "when replies are fragmented and the last fragment is not received" do + address = {127, 0, 0, 1} + port = 4090 + socket = make_ref() + raw_hl7 = HL7.Examples.wikipedia_sample_hl7() + message = HL7.Message.new(raw_hl7) + packet = MLLP.Envelope.wrap_message(raw_hl7) + + tcp_reply1 = + MLLP.Envelope.wrap_message( + "MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r" + ) + + {ack_frag1, ack_frag2} = String.split_at(tcp_reply1, 50) + + {ack_frag2, _ack_frag3} = String.split_at(ack_frag2, 10) + + MLLP.TCPMock + |> expect( + :connect, + fn ^address, ^port, [:binary, {:packet, 0}, {:active, false}], 2000 -> + {:ok, socket} + end + ) + |> expect(:send, fn ^socket, ^packet -> :ok end) + |> expect(:recv, fn ^socket, 0, _ -> + Process.sleep(1) + {:ok, ack_frag1} + end) + |> expect(:recv, fn ^socket, 0, _ -> + Process.sleep(1) + {:ok, ack_frag2} + end) + + {:ok, client} = + Client.start_link(address, port, tcp: MLLP.TCPMock, use_backoff: true, reply_timeout: 3) + + expected_err = %MLLP.Client.Error{context: :recv, reason: :timeout, message: "timed out"} + + assert( + {:error, expected_err} == + Client.send(client, message) + ) + end + + test "when reply header is invalid" do + address = {127, 0, 0, 1} + port = 4090 + socket = make_ref() + raw_hl7 = HL7.Examples.wikipedia_sample_hl7() + message = HL7.Message.new(raw_hl7) + packet = MLLP.Envelope.wrap_message(raw_hl7) + + tcp_reply1 = + "MSH|^~\\&|SuperOE|XYZImgCtr|MegaReg|XYZHospC|20060529090131-0500||ACK^O01|01052901|P|2.5\rMSA|AA|01052901|You win!\r" + + MLLP.TCPMock + |> expect( + :connect, + fn ^address, ^port, [:binary, {:packet, 0}, {:active, false}], 2000 -> + {:ok, socket} + end + ) + |> expect(:send, fn ^socket, ^packet -> :ok end) + |> expect(:recv, fn ^socket, 0, :infinity -> {:ok, tcp_reply1} end) + + {:ok, client} = Client.start_link(address, port, tcp: MLLP.TCPMock, use_backoff: true) + + expected_err = %MLLP.Client.Error{ + context: :recv, + message: "Invalid header received in server acknowledgment", + reason: :invalid_reply + } + + assert( + {:error, expected_err} == + Client.send(client, message) + ) + end + test "when given non hl7" do address = {127, 0, 0, 1} port = 4090