Skip to content

Commit

Permalink
Use streaming JSON for HTTP bodies
Browse files Browse the repository at this point in the history
  • Loading branch information
zargony committed Oct 5, 2024
1 parent 3856d8e commit 2ac7751
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 36 deletions.
2 changes: 1 addition & 1 deletion firmware/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ embassy-sync = "0.6"
embassy-time = { version = "0.3", features = ["generic-queue"] }
embedded-graphics = "0.8"
embedded-hal-async = "1.0"
embedded-io-async = "0.6"
embedded-io-async = { version = "0.6", features = ["alloc"] }
embedded-storage = "0.3"
esp-alloc = "0.4"
esp-backtrace = { version = "0.14", features = ["esp32c3", "custom-halt", "panic-handler", "exception-handler", "println"] }
Expand Down
66 changes: 33 additions & 33 deletions firmware/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::json::{self, FromJson, Reader as JsonReader, ToJson, Writer as JsonWriter};
use crate::wifi::{DnsSocket, TcpClient, TcpConnection, Wifi};
use alloc::vec::Vec;
use core::convert::Infallible;
use core::fmt;
use embedded_io_async::{Read, Write};
use log::debug;
Expand All @@ -8,7 +11,6 @@ use reqwless::client::{
use reqwless::headers::ContentType;
use reqwless::request::{RequestBody, RequestBuilder};
use reqwless::response::StatusCode;
use serde::{de::DeserializeOwned, Serialize};

/// Maximum size of response from server
const MAX_RESPONSE_SIZE: usize = 4096;
Expand All @@ -25,7 +27,7 @@ pub enum Error {
/// Network / http client error
Network(reqwless::Error),
/// Request could not be built
MalformedRequest(serde_json::Error),
MalformedRequest(json::Error<Infallible>),
/// Authorization required (HTTP status 401)
Unauthorized,
/// Server returned an error (HTTP status 4xx)
Expand All @@ -34,7 +36,7 @@ pub enum Error {
#[allow(clippy::enum_variant_names)]
ServerError(StatusCode),
/// Response could not be parsed
MalformedResponse(serde_json::Error),
MalformedResponse(json::Error<reqwless::Error>),
}

impl From<reqwless::Error> for Error {
Expand Down Expand Up @@ -107,40 +109,40 @@ impl<'a> Http<'a> {

/// Send GET request, deserialize JSON response
#[allow(dead_code)]
pub async fn get<T: DeserializeOwned>(&mut self, path: &str) -> Result<T, Error> {
pub async fn get<T: FromJson>(&mut self, path: &str) -> Result<T, Error> {
let base_url = self.base_url;
let mut resource = self.resource().await?;

let mut resource = self.resource().await?;
debug!("HTTP: GET {}/{}", base_url, path);
Self::send_request_parse_response(
resource
.get(path)
.headers(&[("Accept", "application/json")]),
)
.await
let request = resource
.get(path)
.headers(&[("Accept", "application/json")]);

Self::send_request_parse_response(request).await
}

/// Serialize data to JSON, send POST request, deserialize JSON response
#[allow(dead_code)]
pub async fn post<T: Serialize, U: DeserializeOwned>(
&mut self,
path: &str,
data: &T,
) -> Result<U, Error> {
let body = serde_json::to_vec(&data).map_err(Error::MalformedRequest)?;

pub async fn post<T: ToJson, U: FromJson>(&mut self, path: &str, data: &T) -> Result<U, Error> {
let base_url = self.base_url;
let mut resource = self.resource().await?;

// OPTIMIZE: Don't buffer but stream request body. Only needed if we start sending much data
let mut json_writer = JsonWriter::new(Vec::new());
json_writer
.write(data)
.await
.map_err(Error::MalformedRequest)?;
let body = json_writer.into_inner();

let mut resource = self.resource().await?;
debug!("HTTP: POST {}/{} ({} bytes)", base_url, path, body.len());
Self::send_request_parse_response(
resource
.post(path)
.content_type(ContentType::ApplicationJson)
.headers(&[("Accept", "application/json")])
.body(&body[..]),
)
.await
let request = resource
.post(path)
.content_type(ContentType::ApplicationJson)
.headers(&[("Accept", "application/json")])
.body(&body[..]);

Self::send_request_parse_response(request).await
}
}

Expand All @@ -167,7 +169,7 @@ impl<'a> Http<'a> {
}

/// Send request, deserialize JSON response
async fn send_request_parse_response<C: Read + Write, B: RequestBody, T: DeserializeOwned>(
async fn send_request_parse_response<C: Read + Write, B: RequestBody, T: FromJson>(
request: HttpResourceRequestBuilder<'_, '_, C, B>,
) -> Result<T, Error> {
// rx_buf is used to buffer response headers. The response body reader uses this only for
Expand All @@ -178,17 +180,15 @@ impl<'a> Http<'a> {

let status = response.status;
Self::parse_status_code(status)?;
debug!("HTTP: Status {}", status.0);

// Reqwless' content-type parsing is unreliable, so parse the body in any case. Parsing
// will fail if it's not JSON.
// if !matches!(response.content_type, Some(ContentType::ApplicationJson)) {
// return Err(Error::InvalidResponse);
// }

// TODO: Use streaming JSON parser so that we don't need to buffer the full response body
let body = response.body().read_to_end().await?;
debug!("HTTP: Status {} ({} bytes)", status.0, body.len());

serde_json::from_slice(body).map_err(Error::MalformedResponse)
let mut json_reader = JsonReader::new(response.body().reader());
json_reader.read().await.map_err(Error::MalformedResponse)
}
}
2 changes: 0 additions & 2 deletions firmware/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ pub struct Reader<R> {

impl<R: BufRead> Reader<R> {
/// Create JSON reader
#[allow(dead_code)]
pub fn new(reader: R) -> Self {
Self { reader, pos: 0 }
}
Expand Down Expand Up @@ -537,7 +536,6 @@ pub struct Writer<W> {

impl<W: Write> Writer<W> {
/// Create JSON writer
#[allow(dead_code)]
pub fn new(writer: W) -> Self {
Self { writer }
}
Expand Down

0 comments on commit 2ac7751

Please sign in to comment.