Skip to content
This repository has been archived by the owner on May 21, 2022. It is now read-only.

HTTP/2 improvements #68

Merged
merged 14 commits into from
Dec 10, 2020
74 changes: 71 additions & 3 deletions lib/mojito/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ defmodule Mojito.Conn do
"""
@spec request(t, Mojito.request()) :: {:ok, t, reference} | {:error, any}
def request(conn, request) do
max_body_size = request.opts[:max_body_size]
response = %Mojito.Response{body: [], size: max_body_size}

with {:ok, relative_url, auth_headers} <-
Utils.get_relative_url_and_auth_headers(request.url),
{:ok, mint_conn, request_ref} <-
Expand All @@ -63,9 +66,74 @@ defmodule Mojito.Conn do
method_to_string(request.method),
relative_url,
auth_headers ++ request.headers,
request.body
) do
{:ok, %{conn | conn: mint_conn}, request_ref}
:stream
),
{:ok, mint_conn, response} <-
stream_request_body(mint_conn, request_ref, response, request.body) do
{:ok, %{conn | conn: mint_conn}, request_ref, response}
end
end

defp stream_request_body(mint_conn, request_ref, response, nil) do
stream_request_body(mint_conn, request_ref, response, "")
end

defp stream_request_body(mint_conn, request_ref, response, "") do
with {:ok, mint_conn} <-
Mint.HTTP.stream_request_body(mint_conn, request_ref, :eof) do
{:ok, mint_conn, response}
end
end

defp stream_request_body(
%Mint.HTTP1{} = mint_conn,
request_ref,
response,
body
) do
{chunk, rest} = String.split_at(body, 65535)

with {:ok, mint_conn} <-
Mint.HTTP.stream_request_body(mint_conn, request_ref, chunk) do
stream_request_body(mint_conn, request_ref, response, rest)
end
end

defp stream_request_body(
%Mint.HTTP2{} = mint_conn,
request_ref,
response,
body
) do
chunk_size =
min(
Mint.HTTP2.get_window_size(mint_conn, {:request, request_ref}),
Mint.HTTP2.get_window_size(mint_conn, :connection)
)

{chunk, rest} = String.split_at(body, chunk_size)

with {:ok, mint_conn} <-
Mint.HTTP.stream_request_body(mint_conn, request_ref, chunk) do
{mint_conn, response} =
if "" != rest do
{:ok, mint_conn, resps} =
receive do
msg -> Mint.HTTP.stream(mint_conn, msg)
end

{:ok, response} = Mojito.Response.apply_resps(response, resps)

{mint_conn, response}
else
{mint_conn, response}
end

if response.complete do
{:ok, mint_conn, response}
else
stream_request_body(mint_conn, request_ref, response, rest)
end
end
end

Expand Down
61 changes: 36 additions & 25 deletions lib/mojito/conn_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,24 @@ defmodule Mojito.ConnServer do
apply_resp(state, resp) |> apply_resps(rest)
end

defp apply_resp(state, {:status, request_ref, status}) do
response = Map.get(state.responses, request_ref)
response = response |> Map.put(:status_code, status)
defp apply_resp(state, {:status, request_ref, _status} = msg) do
{:ok, response} =
Map.get(state.responses, request_ref)
|> Response.apply_resp(msg)

%{state | responses: Map.put(state.responses, request_ref, response)}
end

defp apply_resp(state, {:headers, request_ref, headers}) do
response = Map.get(state.responses, request_ref)
response = response |> Map.put(:headers, headers)
defp apply_resp(state, {:headers, request_ref, _headers} = msg) do
{:ok, response} =
Map.get(state.responses, request_ref)
|> Response.apply_resp(msg)

%{state | responses: Map.put(state.responses, request_ref, response)}
end

defp apply_resp(state, {:data, request_ref, chunk}) do
response = Map.get(state.responses, request_ref)

case Utils.put_chunk(response, chunk) do
defp apply_resp(state, {:data, request_ref, _chunk} = msg) do
case Map.get(state.responses, request_ref) |> Response.apply_resp(msg) do
{:ok, response} ->
%{state | responses: Map.put(state.responses, request_ref, response)}

Expand Down Expand Up @@ -170,21 +172,30 @@ defmodule Mojito.ConnServer do
) :: {:ok, state, reference} | {:error, any}
defp start_request(state, request, reply_to, response_ref) do
with {:ok, state} <- ensure_connection(state, request.url, request.opts),
{:ok, conn, request_ref} <- Conn.request(state.conn, request) do
response = %Response{body: [], size: request.opts[:max_body_size]}
responses = state.responses |> Map.put(request_ref, response)
reply_tos = state.reply_tos |> Map.put(request_ref, reply_to)
response_refs = state.response_refs |> Map.put(request_ref, response_ref)

state = %{
state
| conn: conn,
responses: responses,
reply_tos: reply_tos,
response_refs: response_refs
}

{:ok, state, request_ref}
{:ok, conn, request_ref, response} <- Conn.request(state.conn, request) do
case response do
%{complete: true} ->
## Request was completed by server during stream_request_body
respond(reply_to, {:ok, response}, response_ref)
{:ok, %{state | conn: conn}, request_ref}

_ ->
responses = state.responses |> Map.put(request_ref, response)
reply_tos = state.reply_tos |> Map.put(request_ref, reply_to)

response_refs =
state.response_refs |> Map.put(request_ref, response_ref)

state = %{
state
| conn: conn,
responses: responses,
reply_tos: reply_tos,
response_refs: response_refs
}

{:ok, state, request_ref}
end
end
end

Expand Down
43 changes: 6 additions & 37 deletions lib/mojito/request/single.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Mojito.Request.Single do
@moduledoc false

alias Mojito.{Config, Conn, Error, Request, Response}
alias Mojito.Utils
require Logger

@doc ~S"""
Expand All @@ -26,10 +25,9 @@ defmodule Mojito.Request.Single do
def request(%Request{} = req) do
with {:ok, req} <- Request.validate_request(req),
{:ok, conn} <- Conn.connect(req.url, req.opts),
{:ok, conn, _ref} <- Conn.request(conn, req) do
{:ok, conn, _ref, response} <- Conn.request(conn, req) do
timeout = req.opts[:timeout] || Config.timeout()
max_body_size = req.opts[:max_body_size]
receive_response(conn, %Response{size: max_body_size}, timeout)
receive_response(conn, response, timeout)
end
end

Expand Down Expand Up @@ -75,17 +73,11 @@ defmodule Mojito.Request.Single do
case Mint.HTTP.stream(conn.conn, msg) do
{:ok, mint_conn, resps} ->
conn = %{conn | conn: mint_conn}
response = apply_resps(response, resps)

case response do
%{complete: true} ->
{:ok, response}

{:error, _} = err ->
err

_ ->
receive_response(conn, response, new_timeout.())
case Response.apply_resps(response, resps) do
{:ok, %{complete: true} = response} -> {:ok, response}
{:ok, response} -> receive_response(conn, response, new_timeout.())
err -> err
end

{:error, _, e, _} ->
Expand All @@ -95,27 +87,4 @@ defmodule Mojito.Request.Single do
receive_response(conn, response, new_timeout.())
end
end

defp apply_resps(response, []), do: response

defp apply_resps(response, [mint_resp | rest]) do
apply_resp(response, mint_resp) |> apply_resps(rest)
end

defp apply_resp(response, {:status, _request_ref, status_code}) do
%{response | status_code: status_code}
end

defp apply_resp(response, {:headers, _request_ref, headers}) do
%{response | headers: headers}
end

defp apply_resp(response, {:data, _request_ref, chunk}) do
{:ok, resp} = Utils.put_chunk(response, chunk)
resp
end

defp apply_resp(response, {:done, _request_ref}) do
%{response | complete: true, body: :erlang.iolist_to_binary(response.body)}
end
end
59 changes: 59 additions & 0 deletions lib/mojito/response.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,70 @@
defmodule Mojito.Response do
@moduledoc false

alias Mojito.{Error, Response}

defstruct status_code: nil,
headers: [],
body: "",
complete: false,
size: 0

@type t :: Mojito.response()

@doc ~S"""
Applies responses received from `Mint.HTTP.stream/2` to a `%Mojito.Response{}`.
"""
@spec apply_resps(t, [Mint.Types.response()]) :: {:ok, t} | {:error, any}
def apply_resps(response, []), do: {:ok, response}

def apply_resps(response, [mint_resp | rest]) do
with {:ok, response} <- apply_resp(response, mint_resp) do
apply_resps(response, rest)
end
end

@doc ~S"""
Applies a response received from `Mint.HTTP.stream/2` to a `%Mojito.Response{}`.
"""
@spec apply_resps(t, Mint.Types.response()) :: {:ok, t} | {:error, any}
def apply_resp(response, {:status, _request_ref, status_code}) do
{:ok, %{response | status_code: status_code}}
end

def apply_resp(response, {:headers, _request_ref, headers}) do
{:ok, %{response | headers: headers}}
end

def apply_resp(response, {:data, _request_ref, chunk}) do
with {:ok, response} <- put_chunk(response, chunk) do
{:ok, response}
end
end

def apply_resp(response, {:done, _request_ref}) do
body = :erlang.iolist_to_binary(response.body)
size = byte_size(body)
{:ok, %{response | complete: true, body: body, size: size}}
end

@doc ~S"""
Adds chunks to a response body, respecting the `response.size` field.
`response.size` should be set to the maximum number of bytes to accept
as the response body, or `nil` for no limit.
"""
@spec put_chunk(t, binary) :: {:ok, %Response{}} | {:error, any}
def put_chunk(%Response{size: nil} = response, chunk) do
{:ok, %{response | body: [response.body | [chunk]]}}
end

def put_chunk(%Response{size: remaining} = response, chunk) do
case remaining - byte_size(chunk) do
over_limit when over_limit < 0 ->
{:error, %Error{reason: :max_body_size_exceeded}}

new_remaining ->
{:ok,
%{response | body: [response.body | [chunk]], size: new_remaining}}
end
end
end
23 changes: 1 addition & 22 deletions lib/mojito/utils.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Mojito.Utils do
@moduledoc false

alias Mojito.{Error, Response}
alias Mojito.Error

@doc ~S"""
Ensures that the return value errors are of the form
Expand Down Expand Up @@ -101,25 +101,4 @@ defmodule Mojito.Utils do

def protocol_to_transport(proto),
do: {:error, "unknown protocol #{inspect(proto)}"}

@doc ~S"""
Adds chunks to a response body, respecting the `response.size` field.
`response.size` should be set to the maximum number of bytes to accept
as the response body, or `nil` for no limit.
"""
@spec put_chunk(%Response{}, binary) :: {:ok, %Response{}} | {:error, any}
def put_chunk(%Response{size: nil} = response, chunk) do
{:ok, %{response | body: [response.body | [chunk]]}}
end

def put_chunk(%Response{size: remaining} = response, chunk) do
case remaining - byte_size(chunk) do
over_limit when over_limit < 0 ->
{:error, %Error{reason: :max_body_size_exceeded}}

new_remaining ->
{:ok,
%{response | body: [response.body | [chunk]], size: new_remaining}}
end
end
end
6 changes: 3 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ defmodule Mojito.MixProject do

defp deps do
[
{:mint, "~> 1.0"},
{:mint, "~> 1.1"},
{:castore, "~> 0.1"},
{:poolboy, "~> 1.5"},
{:ex_spec, "~> 2.0", only: :test},
{:jason, "~> 1.0", only: :test},
{:cowboy, "~> 1.1", only: :test},
{:cowboy, "~> 2.0", only: :test},
{:plug, "~> 1.3", only: :test},
{:plug_cowboy, "~> 1.0", only: :test},
{:plug_cowboy, "~> 2.0", only: :test},
{:ex_doc, "~> 0.18", only: :dev, runtime: false},
{:dialyxir, "~> 0.5", only: :dev, runtime: false}
]
Expand Down
Loading