Skip to content

Commit

Permalink
Request/Response streaming for Finch adapter, SSE middleware (#540)
Browse files Browse the repository at this point in the history
* Request/Response streaming for Finch adapter
* [Finch] Change error handling, fix for response stream (#573)

---------

Co-authored-by: Adam Hodowany <[email protected]>
  • Loading branch information
teamon and hodak authored Apr 11, 2024
1 parent 6954617 commit d488bb2
Show file tree
Hide file tree
Showing 10 changed files with 430 additions and 19 deletions.
51 changes: 45 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Tesla is an HTTP client loosely based on [Faraday](https://github.com/lostisland
It embraces the concept of middleware when processing the request/response cycle.

> Note that this README refers to the `master` branch of Tesla, not the latest
released version on Hex. See [the documentation](https://hexdocs.pm/tesla) for
the documentation of the version you're using.
> released version on Hex. See [the documentation](https://hexdocs.pm/tesla) for
> the documentation of the version you're using.
For the list of changes, checkout the latest [release notes](https://github.com/teamon/tesla/releases).

Expand Down Expand Up @@ -83,8 +83,8 @@ config :tesla, adapter: Tesla.Adapter.Hackney
```

> The default adapter is erlang's built-in `httpc`, but it is not recommended
to use it in production environment as it does not validate SSL certificates
[among other issues](https://github.com/teamon/tesla/issues?utf8=%E2%9C%93&q=is%3Aissue+label%3Ahttpc+).
> to use it in production environment as it does not validate SSL certificates
> [among other issues](https://github.com/teamon/tesla/issues?utf8=%E2%9C%93&q=is%3Aissue+label%3Ahttpc+).
## Documentation

Expand Down Expand Up @@ -243,7 +243,11 @@ Tesla.get(client, "/", opts: [adapter: [recv_timeout: 30_000]])

## Streaming

If adapter supports it, you can pass a [Stream](https://hexdocs.pm/elixir/main/Stream.html) as body, e.g.:
### Streaming Request Body

If adapter supports it, you can pass a
[Stream](https://hexdocs.pm/elixir/main/Stream.html) as request
body, e.g.:

```elixir
defmodule ElasticSearch do
Expand All @@ -259,7 +263,41 @@ defmodule ElasticSearch do
end
```

Each piece of stream will be encoded as JSON and sent as a new line (conforming to JSON stream format).
Each piece of stream will be encoded as JSON and sent as a new line (conforming
to JSON stream format).

### Streaming Response Body

If adapter supports it, you can pass a `response: :stream` option to return
response body as a
[Stream](https://elixir-lang.org/docs/stable/elixir/Stream.html)

```elixir
defmodule OpenAI do
def new(token) do
middleware = [
{Tesla.Middleware.BaseUrl, "https://api.openai.com/v1"},
{Tesla.Middleware.BearerAuth, token: token},
{Tesla.Middleware.JSON, decode_content_types: ["text/event-stream"]},
{Tesla.Middleware.SSE, only: :data}
]
Tesla.client(middleware, {Tesla.Adapter.Finch, name: MyFinch})
end

def completion(client, prompt) do
data = %{
model: "gpt-3.5-turbo",
messages: [%{role: "user", content: prompt}],
stream: true
}
Tesla.post(client, "/chat/completions", data, opts: [adapter: [response: :stream]])
end
end
client = OpenAI.new("<token>")
{:ok, env} = OpenAI.completion(client, "What is the meaning of life?")
env.body
|> Stream.each(fn chunk -> IO.inspect(chunk) end)
```

## Multipart

Expand Down Expand Up @@ -476,6 +514,7 @@ use Tesla, except: [:delete, :options]
```elixir
use Tesla, docs: false
```

### Encode only JSON request (do not decode response)

```elixir
Expand Down
82 changes: 72 additions & 10 deletions lib/tesla/adapter/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,37 +52,99 @@ if Code.ensure_loaded?(Finch) do
@behaviour Tesla.Adapter
alias Tesla.Multipart

@defaults [
receive_timeout: 15_000
]

@impl Tesla.Adapter
def call(%Tesla.Env{} = env, opts) do
opts = Tesla.Adapter.opts(env, opts)
opts = Tesla.Adapter.opts(@defaults, env, opts)

name = Keyword.fetch!(opts, :name)
url = Tesla.build_url(env.url, env.query)
req_opts = Keyword.take(opts, [:pool_timeout, :receive_timeout])
req = build(env.method, url, env.headers, env.body)

case request(name, env.method, url, env.headers, env.body, req_opts) do
case request(req, name, req_opts, opts) do
{:ok, %Finch.Response{status: status, headers: headers, body: body}} ->
{:ok, %Tesla.Env{env | status: status, headers: headers, body: body}}

{:error, mint_error} ->
{:error, Exception.message(mint_error)}
{:error, %Mint.TransportError{reason: reason}} ->
{:error, reason}

{:error, reason} ->
{:error, reason}
end
end

defp request(name, method, url, headers, %Multipart{} = mp, opts) do
defp build(method, url, headers, %Multipart{} = mp) do
headers = headers ++ Multipart.headers(mp)
body = Multipart.body(mp) |> Enum.to_list()

request(name, method, url, headers, body, opts)
build(method, url, headers, body)
end

defp request(_name, _method, _url, _headers, %Stream{}, _opts) do
raise "Streaming is not supported by this adapter!"
defp build(method, url, headers, %Stream{} = body_stream) do
build(method, url, headers, {:stream, body_stream})
end

defp request(name, method, url, headers, body, opts) do
defp build(method, url, headers, body_stream_fun) when is_function(body_stream_fun) do
build(method, url, headers, {:stream, body_stream_fun})
end

defp build(method, url, headers, body) do
Finch.build(method, url, headers, body)
|> Finch.request(name, opts)
end

defp request(req, name, req_opts, opts) do
case opts[:response] do
:stream -> stream(req, name, req_opts)
nil -> Finch.request(req, name, req_opts)
other -> raise "Unknown response option: #{inspect(other)}"
end
end

defp stream(req, name, opts) do
owner = self()
ref = make_ref()

fun = fn
{:status, status}, _acc -> status
{:headers, headers}, status -> send(owner, {ref, {:status, status, headers}})
{:data, data}, _acc -> send(owner, {ref, {:data, data}})
end

task =
Task.async(fn ->
case Finch.stream(req, name, nil, fun, opts) do
{:ok, _acc} -> send(owner, {ref, :eof})
{:error, error} -> send(owner, {ref, {:error, error}})
end
end)

receive do
{^ref, {:status, status, headers}} ->
body =
Stream.unfold(nil, fn _ ->
receive do
{^ref, {:data, data}} ->
{data, nil}

{^ref, :eof} ->
Task.await(task)
nil
after
opts[:receive_timeout] ->
Task.shutdown(task, :brutal_kill)
nil
end
end)

{:ok, %Finch.Response{status: status, headers: headers, body: body}}
after
opts[:receive_timeout] ->
{:error, :timeout}
end
end
end
end
17 changes: 16 additions & 1 deletion lib/tesla/middleware/json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,18 @@ defmodule Tesla.Middleware.JSON do
end
end

defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body),
do: {:ok, decode_stream(body, opts)}

defp decode_body(body, opts), do: process(body, :decode, opts)

defp decodable?(env, opts), do: decodable_body?(env) && decodable_content_type?(env, opts)

defp decodable_body?(env) do
(is_binary(env.body) && env.body != "") || (is_list(env.body) && env.body != [])
(is_binary(env.body) && env.body != "") ||
(is_list(env.body) && env.body != []) ||
is_function(env.body) ||
is_struct(env.body, Stream)
end

defp decodable_content_type?(env, opts) do
Expand All @@ -128,6 +134,15 @@ defmodule Tesla.Middleware.JSON do
end
end

defp decode_stream(body, opts) do
Stream.map(body, fn chunk ->
case decode_body(chunk, opts) do
{:ok, item} -> item
_ -> chunk
end
end)
end

defp content_types(opts),
do: @default_content_types ++ Keyword.get(opts, :decode_content_types, [])

Expand Down
102 changes: 102 additions & 0 deletions lib/tesla/middleware/sse.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
defmodule Tesla.Middleware.SSE do
@moduledoc """
Decode Server Sent Events.
This middleware is mostly useful when streaming response body.
## Examples
```
plug Tesla.Middleware.SSE, only: :data
```
## Options
- `:only` - keep only specified keys in event (necessary for using with `JSON` middleware)
- `:decode_content_types` - list of additional decodable content-types
"""

@behaviour Tesla.Middleware

@default_content_types ["text/event-stream"]

@impl Tesla.Middleware
def call(env, next, opts) do
opts = opts || []

with {:ok, env} <- Tesla.run(env, next) do
decode(env, opts)
end
end

def decode(env, opts) do
if decodable_content_type?(env, opts) do
{:ok, %{env | body: decode_body(env.body, opts)}}
else
{:ok, env}
end
end

defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body) do
body
|> Stream.chunk_while(
"",
fn elem, acc ->
{lines, [rest]} = (acc <> elem) |> String.split("\n\n") |> Enum.split(-1)
{:cont, lines, rest}
end,
fn
"" -> {:cont, ""}
acc -> {:cont, acc, ""}
end
)
|> Stream.flat_map(& &1)
|> Stream.map(&decode_message/1)
|> Stream.flat_map(&only(&1, opts[:only]))
end

defp decode_body(binary, opts) when is_binary(binary) do
binary
|> String.split("\n\n")
|> Enum.map(&decode_message/1)
|> Enum.flat_map(&only(&1, opts[:only]))
end

defp decode_message(message) do
message
|> String.split("\n")
|> Enum.map(&decode_body/1)
|> Enum.reduce(%{}, fn
:empty, acc -> acc
{:data, data}, acc -> Map.update(acc, :data, data, &(&1 <> "\n" <> data))
{key, value}, acc -> Map.put_new(acc, key, value)
end)
end

defp decode_body(": " <> comment), do: {:comment, comment}
defp decode_body("data: " <> data), do: {:data, data}
defp decode_body("event: " <> event), do: {:event, event}
defp decode_body("id: " <> id), do: {:id, id}
defp decode_body("retry: " <> retry), do: {:retry, retry}
defp decode_body(""), do: :empty

defp decodable_content_type?(env, opts) do
case Tesla.get_header(env, "content-type") do
nil -> false
content_type -> Enum.any?(content_types(opts), &String.starts_with?(content_type, &1))
end
end

defp content_types(opts),
do: @default_content_types ++ Keyword.get(opts, :decode_content_types, [])

defp only(message, nil), do: [message]

defp only(message, key) do
case Map.get(message, key) do
nil -> []
val -> [val]
end
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ defmodule Tesla.Mixfile do
Tesla.Middleware.PathParams,
Tesla.Middleware.Query,
Tesla.Middleware.Retry,
Tesla.Middleware.SSE,
Tesla.Middleware.Telemetry,
Tesla.Middleware.Timeout
]
Expand Down
2 changes: 1 addition & 1 deletion test/support/adapter_case/stream_request_body.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Tesla.AdapterCase.StreamRequestBody do
quote do
alias Tesla.Env

describe "Stream" do
describe "Stream Request" do
test "stream request body: Stream.map" do
request = %Env{
method: :post,
Expand Down
23 changes: 23 additions & 0 deletions test/support/adapter_case/stream_response_body.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Tesla.AdapterCase.StreamResponseBody do
defmacro __using__(_) do
quote do
alias Tesla.Env

describe "Stream Response" do
test "stream response body" do
request = %Env{
method: :get,
url: "#{@http}/stream/20"
}

assert {:ok, %Env{} = response} = call(request, response: :stream)
assert response.status == 200
assert is_function(response.body) || response.body.__struct__ == Stream

body = Enum.to_list(response.body)
assert Enum.count(body) == 20
end
end
end
end
end
Loading

0 comments on commit d488bb2

Please sign in to comment.