Skip to content

Commit

Permalink
Merge branch 'streaming-json'
Browse files Browse the repository at this point in the history
  • Loading branch information
zargony committed Oct 6, 2024
2 parents e2aaf21 + a57cbca commit dec9f79
Show file tree
Hide file tree
Showing 6 changed files with 1,257 additions and 76 deletions.
26 changes: 0 additions & 26 deletions firmware/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions firmware/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ embassy-sync = "0.6"
embassy-time = { version = "0.3", features = ["generic-queue"] }
embedded-graphics = "0.8"
embedded-hal-async = "1.0"
embedded-io-async = "0.6"
embedded-io-async = { version = "0.6", features = ["alloc"] }
embedded-storage = "0.3"
esp-alloc = "0.4"
esp-backtrace = { version = "0.14", features = ["esp32c3", "custom-halt", "panic-handler", "exception-handler", "println"] }
Expand All @@ -67,7 +67,5 @@ log = { version = "0.4", features = ["release_max_level_info"] }
pn532 = "0.4"
rand_core = "0.6"
reqwless = { version = "0.12", default-features = false, features = ["alloc", "embedded-tls"] }
serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
ssd1306 = { version = "0.9", features = ["async"] }
u8g2-fonts = "0.4"
43 changes: 30 additions & 13 deletions firmware/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
use crate::json::{self, FromJson};
use alloc::string::String;
use core::fmt;
use core::ops::Deref;
use embedded_io_async::BufRead;
use embedded_storage::ReadStorage;
use esp_partition_table::{DataPartitionType, PartitionTable, PartitionType};
use esp_storage::FlashStorage;
use log::{debug, info, warn};
use serde::Deserialize;

/// String with sensitive content (debug and display output redacted)
#[derive(Default, Deserialize)]
#[derive(Default)]
pub struct SensitiveString(String);

impl TryFrom<json::Value> for SensitiveString {
type Error = json::Error<()>;

fn try_from(value: json::Value) -> Result<Self, Self::Error> {
Ok(Self(value.try_into()?))
}
}

impl fmt::Debug for SensitiveString {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.0.is_empty() {
Expand Down Expand Up @@ -50,18 +59,33 @@ impl Deref for SensitiveString {
///
/// If there is no valid JSON or no valid `nvs` data partition, a default configuration is provided
/// (which isn't very useful, but at least doesn't prevent the device from starting).
#[derive(Debug, Default, Deserialize)]
#[serde(default, rename_all = "kebab-case")]
#[derive(Debug, Default)]
pub struct Config {
/// Wifi SSID to connect to
pub wifi_ssid: String,
/// Wifi password
pub wifi_password: SensitiveString,
}

impl FromJson for Config {
async fn from_json<R: BufRead>(
reader: &mut json::Reader<R>,
) -> Result<Self, json::Error<R::Error>> {
let mut this = Self::default();
reader
.read_object(|k, v| match k.as_str() {
"wifi-ssid" => this.wifi_ssid = v.try_into().unwrap(),
"wifi-password" => this.wifi_password = v.try_into().unwrap(),
_ => (),
})
.await?;
Ok(this)
}
}

impl Config {
/// Read configuration from nvs flash partition
pub fn read() -> Self {
pub async fn read() -> Self {
let mut storage = FlashStorage::new();

// Read partition table (at 0x8000 by default)
Expand All @@ -88,16 +112,9 @@ impl Config {
warn!("Config: Unable to read nvs partition");
return Self::default();
}
// Find first non-ascii character and trim to the end. This removes trailing 0xff bytes
// (unused flash bytes), which would otherwise lead to 'trailing characters' serde error
// nightly: let (json, _rest) = bytes.split_once(|b| !b.is_ascii());
let json = bytes
.split(|b| !b.is_ascii())
.next()
.unwrap_or(bytes.as_ref());

// Parse JSON config
let config = match serde_json::from_slice::<Self>(json) {
let config = match json::Reader::new(&bytes[..]).read().await {
Ok(config) => config,
Err(err) => {
warn!(
Expand Down
69 changes: 36 additions & 33 deletions firmware/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::json::{self, FromJson, Reader as JsonReader, ToJson, Writer as JsonWriter};
use crate::wifi::{DnsSocket, TcpClient, TcpConnection, Wifi};
use alloc::vec::Vec;
use core::convert::Infallible;
use core::fmt;
use embedded_io_async::{Read, Write};
use log::debug;
Expand All @@ -8,7 +11,6 @@ use reqwless::client::{
use reqwless::headers::ContentType;
use reqwless::request::{RequestBody, RequestBuilder};
use reqwless::response::StatusCode;
use serde::{de::DeserializeOwned, Serialize};

/// Maximum size of response from server
const MAX_RESPONSE_SIZE: usize = 4096;
Expand All @@ -25,7 +27,7 @@ pub enum Error {
/// Network / http client error
Network(reqwless::Error),
/// Request could not be built
MalformedRequest(serde_json::Error),
MalformedRequest(json::Error<Infallible>),
/// Authorization required (HTTP status 401)
Unauthorized,
/// Server returned an error (HTTP status 4xx)
Expand All @@ -34,7 +36,7 @@ pub enum Error {
#[allow(clippy::enum_variant_names)]
ServerError(StatusCode),
/// Response could not be parsed
MalformedResponse(serde_json::Error),
MalformedResponse(json::Error<reqwless::Error>),
}

impl From<reqwless::Error> for Error {
Expand Down Expand Up @@ -107,40 +109,40 @@ impl<'a> Http<'a> {

/// Send GET request, deserialize JSON response
#[allow(dead_code)]
pub async fn get<T: DeserializeOwned>(&mut self, path: &str) -> Result<T, Error> {
pub async fn get<T: FromJson>(&mut self, path: &str) -> Result<T, Error> {
let base_url = self.base_url;
let mut resource = self.resource().await?;

let mut resource = self.resource().await?;
debug!("HTTP: GET {}/{}", base_url, path);
Self::send_request_parse_response(
resource
.get(path)
.headers(&[("Accept", "application/json")]),
)
.await
let request = resource
.get(path)
.headers(&[("Accept", "application/json")]);

Self::send_request_parse_response(request).await
}

/// Serialize data to JSON, send POST request, deserialize JSON response
#[allow(dead_code)]
pub async fn post<T: Serialize, U: DeserializeOwned>(
&mut self,
path: &str,
data: &T,
) -> Result<U, Error> {
let body = serde_json::to_vec(&data).map_err(Error::MalformedRequest)?;

pub async fn post<T: ToJson, U: FromJson>(&mut self, path: &str, data: &T) -> Result<U, Error> {
let base_url = self.base_url;
let mut resource = self.resource().await?;

// OPTIMIZE: Don't buffer but stream request body. Only needed if we start sending much data
let mut json_writer = JsonWriter::new(Vec::new());
json_writer
.write(data)
.await
.map_err(Error::MalformedRequest)?;
let body = json_writer.into_inner();

let mut resource = self.resource().await?;
debug!("HTTP: POST {}/{} ({} bytes)", base_url, path, body.len());
Self::send_request_parse_response(
resource
.post(path)
.content_type(ContentType::ApplicationJson)
.headers(&[("Accept", "application/json")])
.body(&body[..]),
)
.await
let request = resource
.post(path)
.content_type(ContentType::ApplicationJson)
.headers(&[("Accept", "application/json")])
.body(&body[..]);

Self::send_request_parse_response(request).await
}
}

Expand All @@ -167,25 +169,26 @@ impl<'a> Http<'a> {
}

/// Send request, deserialize JSON response
async fn send_request_parse_response<C: Read + Write, B: RequestBody, T: DeserializeOwned>(
async fn send_request_parse_response<C: Read + Write, B: RequestBody, T: FromJson>(
request: HttpResourceRequestBuilder<'_, '_, C, B>,
) -> Result<T, Error> {
// rx_buf is used to buffer response headers. The response body reader uses this only for
// non-TLS connections. Body reader of TLS connections will use the TLS read_buffer for
// buffering parts of the body. However, read_to_end will again always use this buffer.
let mut rx_buf = [0; MAX_RESPONSE_SIZE];
let response = request.send(&mut rx_buf).await?;

let status = response.status;
Self::parse_status_code(status)?;
debug!("HTTP: Status {}", status.0);

// Reqwless' content-type parsing is unreliable, so parse the body in any case. Parsing
// will fail if it's not JSON.
// if !matches!(response.content_type, Some(ContentType::ApplicationJson)) {
// return Err(Error::InvalidResponse);
// }

// TODO: Use streaming JSON parser so that we don't need to buffer the full response body
let body = response.body().read_to_end().await?;
debug!("HTTP: Status {} ({} bytes)", status.0, body.len());

serde_json::from_slice(body).map_err(Error::MalformedResponse)
let mut json_reader = JsonReader::new(response.body().reader());
json_reader.read().await.map_err(Error::MalformedResponse)
}
}
Loading

0 comments on commit dec9f79

Please sign in to comment.