Skip to content

Commit

Permalink
Use Req for all requests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko committed Nov 25, 2024
1 parent 7c8f464 commit 7332468
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 374 deletions.
2 changes: 0 additions & 2 deletions lib/livebook/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ defmodule Livebook.Application do
require Logger

def start(_type, _args) do
Livebook.Utils.HTTP.set_proxy_options()

Livebook.ZTA.init()
create_teams_hub = parse_teams_hub()
setup_optional_dependencies()
Expand Down
76 changes: 39 additions & 37 deletions lib/livebook/file_system/file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -418,44 +418,46 @@ defmodule Livebook.FileSystem.File do
end

defimpl Collectable, for: Livebook.FileSystem.File do
def into(%Livebook.FileSystem.File{path: path} = file) do
file_system =
file
|> Livebook.FileSystem.File.fetch_file_system()
|> unwrap!()

file
|> Livebook.FileSystem.File.maybe_ensure_local()
|> unwrap!()

state =
file_system
|> Livebook.FileSystem.write_stream_init(path, [])
|> unwrap!()

collector = fn
state, {:cont, chunk} when is_binary(chunk) ->
file_system
|> Livebook.FileSystem.write_stream_chunk(state, chunk)
|> unwrap!()

state, :done ->
file_system
|> Livebook.FileSystem.write_stream_finish(state)
|> unwrap!()

file

state, :halt ->
file_system
|> Livebook.FileSystem.write_stream_halt(state)
|> unwrap!()
end
alias Livebook.FileSystem

def into(%FileSystem.File{path: path} = file) do
with :ok <- FileSystem.File.maybe_ensure_local(file),
{:ok, file_system} <- FileSystem.File.fetch_file_system(file),
{:ok, state} <- FileSystem.write_stream_init(file_system, path, []) do
collector = fn
state, {:cont, chunk} when is_binary(chunk) ->
case FileSystem.write_stream_chunk(file_system, state, chunk) do
{:ok, state} ->
state

{:error, error} ->
cancel(file_system, state)
raise error
end

state, :done ->
case FileSystem.write_stream_finish(file_system, state) do
:ok ->
file

{:error, error} ->
cancel(file_system, state)
raise error
end

state, :halt ->
cancel(file_system, state)
:ok
end

{state, collector}
{state, collector}
else
{:error, error} -> raise error
end
end

defp unwrap!(:ok), do: :ok
defp unwrap!({:ok, result}), do: result
defp unwrap!({:error, error}), do: raise(error)
defp cancel(file_system, state) do
# Try to cleanup
_ = FileSystem.write_stream_halt(file_system, state)
end
end
67 changes: 39 additions & 28 deletions lib/livebook/file_system/s3/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Livebook.FileSystem.S3.Client do
delimiter = opts[:delimiter]
query = %{"list-type" => "2", "prefix" => prefix, "delimiter" => delimiter}

case get(file_system, "/", query: query) do
case request(file_system, "/", query: query) do
{:ok, %{status: 200, body: %{"ListBucketResult" => result}}} ->
file_keys = xml_get_list(result, "Contents", "Key")
prefix_keys = xml_get_list(result, "CommonPrefixes", "Prefix")
Expand All @@ -36,7 +36,7 @@ defmodule Livebook.FileSystem.S3.Client do
# name, but the listing endpoint does, so we just list keys
# with an upper limit of 0 and retrieve the bucket name.

case get(file_system, "/", query: %{"list-type" => "2", "max-keys" => "0"}) do
case request(file_system, "/", query: %{"list-type" => "2", "max-keys" => "0"}) do
{:ok, %{status: 200, body: %{"ListBucketResult" => result}}} -> {:ok, result["Name"]}
other -> request_response_to_error(other)
end
Expand All @@ -47,7 +47,7 @@ defmodule Livebook.FileSystem.S3.Client do
"""
@spec get_object(S3.t(), String.t()) :: {:ok, map()} | {:error, String.t()}
def get_object(file_system, key) do
case get(file_system, "/" <> encode_key(key), long: true, decode: false) do
case request(file_system, "/" <> encode_key(key), long: true, decode: false) do
{:ok, %{status: 200, body: body}} -> {:ok, body}
{:ok, %{status: 404}} -> FileSystem.Utils.posix_error(:enoent)
other -> request_response_to_error(other)
Expand All @@ -72,7 +72,7 @@ defmodule Livebook.FileSystem.S3.Client do
"""
@spec put_object(S3.t(), String.t(), String.t() | nil) :: :ok | {:error, String.t()}
def put_object(file_system, key, content) do
case put(file_system, "/" <> encode_key(key), body: content, long: true) do
case request(file_system, "/" <> encode_key(key), method: :put, body: content, long: true) do
{:ok, %{status: 200}} -> :ok
other -> request_response_to_error(other)
end
Expand All @@ -83,7 +83,8 @@ defmodule Livebook.FileSystem.S3.Client do
"""
@spec head_object(S3.t(), String.t()) :: {:ok, map()} | {:error, String.t()}
def head_object(file_system, key) do
with {:ok, %{status: 200, headers: headers}} <- head(file_system, "/" <> encode_key(key)),
with {:ok, %{status: 200, headers: headers}} <-
request(file_system, "/" <> encode_key(key), method: :head),
{:ok, [etag]} <- Map.fetch(headers, "etag") do
{:ok, %{etag: etag}}
else
Expand All @@ -100,7 +101,7 @@ defmodule Livebook.FileSystem.S3.Client do
copy_source = bucket <> "/" <> encode_key(source_key)
headers = [{"x-amz-copy-source", copy_source}]

case put(file_system, "/" <> encode_key(destination_key), headers: headers) do
case request(file_system, "/" <> encode_key(destination_key), method: :put, headers: headers) do
{:ok, %{status: 200}} -> :ok
{:ok, %{status: 404}} -> FileSystem.Utils.posix_error(:enoent)
other -> request_response_to_error(other)
Expand All @@ -112,7 +113,7 @@ defmodule Livebook.FileSystem.S3.Client do
"""
@spec delete_object(S3.t(), String.t()) :: :ok | {:error, String.t()}
def delete_object(file_system, key) do
case delete(file_system, "/" <> encode_key(key)) do
case request(file_system, "/" <> encode_key(key), method: :delete) do
{:ok, %{status: 204}} -> :ok
{:ok, %{status: 404}} -> :ok
other -> request_response_to_error(other)
Expand All @@ -133,8 +134,9 @@ defmodule Livebook.FileSystem.S3.Client do

body_md5 = :crypto.hash(:md5, body)
headers = [{"Content-MD5", Base.encode64(body_md5)}]
query = %{"delete" => ""}

case post(file_system, "/", query: %{"delete" => ""}, headers: headers, body: body) do
case request(file_system, "/", query: query, method: :post, headers: headers, body: body) do
{:ok, %{status: 200, body: %{"Error" => _}}} = result -> request_response_to_error(result)
{:ok, %{status: 200}} -> :ok
other -> request_response_to_error(other)
Expand Down Expand Up @@ -167,7 +169,7 @@ defmodule Livebook.FileSystem.S3.Client do
def create_multipart_upload(file_system, key) do
query = %{"uploads" => ""}

case post(file_system, "/" <> encode_key(key), query: query, body: "") do
case request(file_system, "/" <> encode_key(key), method: :post, query: query, body: "") do
{:ok, %{status: 200, body: %{"InitiateMultipartUploadResult" => result}}} ->
{:ok, result["UploadId"]}

Expand All @@ -183,10 +185,10 @@ defmodule Livebook.FileSystem.S3.Client do
{:ok, map()} | {:error, String.t()}
def upload_part(file_system, key, upload_id, part_number, content) do
query = %{"uploadId" => upload_id, "partNumber" => part_number}
opts = [query: query, body: content, long: true]
opts = [method: :put, query: query, body: content, long: true]

with {:ok, %{status: 200, headers: headers}} <-
put(file_system, "/" <> encode_key(key), opts),
request(file_system, "/" <> encode_key(key), opts),
{:ok, [etag]} <- Map.fetch(headers, "etag") do
{:ok, %{etag: etag}}
else
Expand All @@ -208,7 +210,7 @@ defmodule Livebook.FileSystem.S3.Client do
|> S3.XML.encode_to_iodata!()
|> IO.iodata_to_binary()

case post(file_system, "/" <> encode_key(key), query: query, body: body) do
case request(file_system, "/" <> encode_key(key), method: :post, query: query, body: body) do
{:ok, %{status: 200}} -> :ok
other -> request_response_to_error(other)
end
Expand All @@ -221,30 +223,31 @@ defmodule Livebook.FileSystem.S3.Client do
def abort_multipart_upload(file_system, key, upload_id) do
query = %{"uploadId" => upload_id}

case delete(file_system, "/" <> encode_key(key), query: query) do
case request(file_system, "/" <> encode_key(key), method: :delete, query: query) do
{:ok, %{status: 204}} -> :ok
other -> request_response_to_error(other)
end
end

# Convenient API

defp get(file_system, path, opts), do: request(file_system, :get, path, opts)
defp post(file_system, path, opts), do: request(file_system, :post, path, opts)
defp put(file_system, path, opts), do: request(file_system, :put, path, opts)
defp head(file_system, path), do: request(file_system, :head, path, [])
defp delete(file_system, path, opts \\ []), do: request(file_system, :delete, path, opts)

defp download(file_system, path, collectable, opts \\ []) do
query = opts[:query] || %{}
headers = opts[:headers] || []
url = build_url(file_system, path, query)
headers = sign_headers(file_system, :get, url, headers)

Livebook.Utils.HTTP.download(url, collectable, headers: headers)
end
req = Req.new() |> Livebook.Utils.req_attach_defaults()

case Req.get(req, url: url, headers: headers, into: collectable) do
{:ok, %{status: 200, body: collected}} ->
{:ok, collected}

# Private
{:ok, %{status: status}} ->
{:error, "download failed, HTTP status #{status}", status}

{:error, exception} ->
{:error, "download failed, reason: #{Exception.message(exception)}}", nil}
end
end

defp encode_key(key) do
key
Expand Down Expand Up @@ -284,21 +287,29 @@ defmodule Livebook.FileSystem.S3.Client do
)
end

defp request(file_system, method, path, opts) do
defp request(file_system, path, opts) do
long = Keyword.get(opts, :long, false)
decode? = Keyword.get(opts, :decode, true)

method = opts[:method] || :get
query = opts[:query] || %{}
headers = opts[:headers] || []
body = opts[:body]
timeout = if long, do: 60_000, else: 30_000

url = build_url(file_system, path, query)
headers = sign_headers(file_system, method, url, headers, body)
body = body && {"application/octet-stream", body}

req = Req.new() |> Livebook.Utils.req_attach_defaults()

result =
Livebook.Utils.HTTP.request(method, url, headers: headers, body: body, timeout: timeout)
Req.request(req,
method: method,
url: url,
headers: headers,
body: body,
receive_timeout: timeout
)

if decode?, do: decode(result), else: result
end
Expand All @@ -316,7 +327,7 @@ defmodule Livebook.FileSystem.S3.Client do
defp xml?(response) do
guess_xml? = String.starts_with?(response.body, "<?xml")

case Livebook.Utils.HTTP.fetch_content_type(response.headers) do
case Livebook.Utils.fetch_content_type(response) do
{:ok, content_type} when content_type in ["text/xml", "application/xml"] -> true
# Apparently some requests return XML without content-type
:error when guess_xml? -> true
Expand Down
14 changes: 8 additions & 6 deletions lib/livebook/notebook/content_loader.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
defmodule Livebook.Notebook.ContentLoader do
alias Livebook.Utils.HTTP

@typedoc """
A location from where content gets loaded.
"""
Expand Down Expand Up @@ -53,10 +51,14 @@ defmodule Livebook.Notebook.ContentLoader do
"""
@spec fetch_content(String.t()) :: {:ok, String.t()} | {:error, String.t()}
def fetch_content(url) do
case HTTP.request(:get, url) do
{:ok, %{status: 200, headers: headers, body: body}} ->
# Given the URL has arbitrary user-specified host, we specify
# :pool_max_idle_time, so the Finch pool terminates eventually
req = Req.new(pool_max_idle_time: 60_000) |> Livebook.Utils.req_attach_defaults()

case Req.get(req, url: url) do
{:ok, %{status: 200} = res} ->
valid_content? =
case HTTP.fetch_content_type(headers) do
case Livebook.Utils.fetch_content_type(res) do
{:ok, content_type} ->
content_type in ["text/plain", "text/markdown", "application/octet-stream"]

Expand All @@ -65,7 +67,7 @@ defmodule Livebook.Notebook.ContentLoader do
end

if valid_content? do
{:ok, body}
{:ok, res.body}
else
{:error, "invalid content type, make sure the URL points to live markdown"}
end
Expand Down
36 changes: 18 additions & 18 deletions lib/livebook/runtime/dependencies.ex
Original file line number Diff line number Diff line change
Expand Up @@ -272,24 +272,24 @@ defmodule Livebook.Runtime.Dependencies do
def search_hex(search, opts) do
api_url = opts[:api_url] || "https://hex.pm/api"

params = %{"search" => "name:#{search}*", "sort" => "recent_downloads"}
url = api_url <> "/packages?" <> URI.encode_query(params)

case Livebook.Utils.HTTP.request(:get, url) do
{:ok, %{status: status, body: body}} ->
with 200 <- status, {:ok, packages} <- Jason.decode(body) do
packages =
packages
|> Enum.map(&parse_package/1)
|> reorder_packages(search)

{:ok, packages}
else
_ -> {:error, "unexpected response"}
end

{:error, reason} ->
{:error, "failed to make a request, reason: #{inspect(reason)}"}
req = Req.new(base_url: api_url) |> Livebook.Utils.req_attach_defaults()

params = [search: "name:#{search}*", sort: "recent_downloads"]

case Req.get(req, url: "/packages", params: params) do
{:ok, %{status: 200} = resp} ->
packages =
resp.body
|> Enum.map(&parse_package/1)
|> reorder_packages(search)

{:ok, packages}

{:ok, %{status: status}} ->
{:error, "unexpected response, HTTP status #{status}"}

{:error, exception} ->
{:error, "failed to make a request, reason: #{Exception.message(exception)}}"}
end
end

Expand Down
15 changes: 11 additions & 4 deletions lib/livebook/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3075,12 +3075,19 @@ defmodule Livebook.Session do
end

defp download_content(url, file) do
case Livebook.Utils.HTTP.download(url, file) do
{:ok, _file} ->
# Given the URL has arbitrary user-specified host, we specify
# :pool_max_idle_time, so the Finch pool terminates eventually
req = Req.new(pool_max_idle_time: 60_000) |> Livebook.Utils.req_attach_defaults()

case Req.get(req, url: url, into: file) do
{:ok, %{status: 200}} ->
:ok

{:error, message, status} ->
{:error, "download failed, " <> message, status}
{:ok, %{status: status}} ->
{:error, "download failed, HTTP status #{status}", status}

{:error, exception} ->
{:error, "download failed, reason: #{Exception.message(exception)}}", nil}
end
end

Expand Down
Loading

0 comments on commit 7332468

Please sign in to comment.