-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESP32 Web Socket Listener, TCP Server #15
Comments
If you are running on the xtensa esp32 chip, then you might be hitting this issue: smol-rs/async-lock#84 Until this is fixed, the one workaround I have found to be effective is to use an older version of If that doesn't help, you might still be having a stack overflow issue. |
Unrelated, but maybe good to know: with Rust async on MCUs, you really have to avoid stuff like this: Due to inefficiencies as to how Rust futures are laid out, code like the above often ends up consuming 2x what you think it consumes, thus blowing up your stack, as most (all?) of the futures in your example app do live on stack. Prefer pre-allocating the buffer in the non-async code and then just passing a One of the more efficient ways to allocate the buffer (if you allocate it from heap, that is) is to do let mut buffer: MaybeUninit<[0; 8192]> = Box::new_uninit();
let buffer = unsafe { buffer.assume_init_mut() }; |
Great stuff. I will give Thank you for the hint, I will apply it in this case to reduce the stack requirements. I try to keep as much on the stack as possible in case I need to jump to |
|
Hi Ivan I did eventually get this working, but only with a reduced buffer. I will add it here for reference on Monday. Just out of curiousity, do you know why the esp-idf web socket implementation is so poorly supported? For example, the need to read the socket into an empty buffer only once is a really weird choice, and it clearly breaks if you send multiple messages back and forth on the web socket, expecting a "second packet" before a certain timeout. Your version worked like a charm (after much pain and suffering) |
Yep, that would be great, as I wonder why you needed to reduce the buffer.
Not sure what you mean here. Do you mean that in order to even understand how big the payload of the incoming frame is, you need to call whatever-the-method-for-reading-a-frame-was-called with a
Yeah, just keep in mind that I pushed to I was chasing this for the last two days; now is fixed and won't require code changes, but processing response headers might be a bit slower for now. |
Hi Here is my full example code for anyone that needs this in the future #![feature(generic_arg_infer)]
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::thread::Scope;
use channel_bridge::asynch::ws::WsError;
use edge_executor::LocalExecutor;
use edge_http::io::server::{Connection, DefaultServer, Handler, TaskHandler};
use edge_http::ws::MAX_BASE64_KEY_RESPONSE_LEN;
use edge_http::{io, Method, DEFAULT_MAX_HEADERS_COUNT};
use edge_std_nal_async::StdTcpConnection;
use edge_ws::{FrameHeader, FrameType};
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::signal::Signal;
use embassy_time::{Duration, Timer};
use embedded_io_async::{Read, Write};
use embedded_nal_async_xtra::TcpListen;
use esp_idf_svc::eventloop::EspSystemEventLoop;
use esp_idf_svc::hal::peripherals::Peripherals;
use esp_idf_svc::nvs::EspDefaultNvsPartition;
use esp_idf_svc::sys::EspError;
use esp_idf_svc::timer::EspTaskTimerService;
use esp_idf_svc::wifi::{AsyncWifi, Configuration};
use esp_idf_svc::wifi::{ClientConfiguration, EspWifi};
use log::info;
const SSID: &str = env!("WIFI_SSID");
const PASSWORD: &str = env!("WIFI_PASS");
// ws
pub static QUIT: Signal<CriticalSectionRawMutex, u32> = Signal::new();
const WS_MAX_CONNECTIONS: usize = 2;
pub const WS_MAX_FRAME_LEN: usize = 512;
// const
fn main() -> anyhow::Result<()> {
// It is necessary to call this function once. Otherwise some patches to the runtime
// implemented by esp-idf-sys might not link properly. See https://github.com/esp-rs/esp-idf-template/issues/71
esp_idf_svc::timer::embassy_time_driver::link();
esp_idf_svc::sys::link_patches();
// Bind the log crate to the ESP Logging facilities
esp_idf_svc::log::EspLogger::initialize_default();
esp_idf_svc::io::vfs::initialize_eventfd(5)?;
std::thread::scope(|scope: &Scope| run(scope))?;
Ok(())
}
fn run<'s>(scope: &'s Scope<'s, '_>) -> Result<(), EspError> {
let sys_loop = EspSystemEventLoop::take().unwrap();
let peripherals = Peripherals::take().unwrap();
let nvs = EspDefaultNvsPartition::take().unwrap();
let timer_service = EspTaskTimerService::new()?;
let task: std::thread::ScopedJoinHandle<Result<(), EspError>> = std::thread::Builder::new()
.stack_size(80000)
.spawn_scoped(scope, move || {
let executor = LocalExecutor::<8>::new();
let mut wifi = AsyncWifi::wrap(
EspWifi::new(peripherals.modem, sys_loop.clone(), Some(nvs))?,
sys_loop,
timer_service,
)?;
wifi.set_configuration(&Configuration::Client(ClientConfiguration {
ssid: SSID.try_into().unwrap(),
password: PASSWORD.try_into().unwrap(),
..Default::default()
}))?;
executor.spawn(connect_wifi(&mut wifi)).detach();
let mut httpd = httpd().unwrap();
let handler = http_handler().unwrap();
executor.spawn(run_ws(&mut httpd, handler)).detach();
esp_idf_svc::hal::task::block_on(executor.run(async {
loop {
Timer::after(Duration::from_millis(100)).await
}
}));
Ok(())
})
.unwrap();
let result: Result<(), _> = task.join().unwrap();
log::info!("Thread execution finised {result:?}");
Ok(())
}
pub async fn run_ws<H>(server: &mut DefaultServer, handler: H) -> Result<(), anyhow::Error>
where
H: for<'b> edge_http::io::server::Handler<'b, &'b mut edge_std_nal_async::StdTcpConnection, 64>,
{
let addr = "0.0.0.0:8881";
info!("Running HTTP ws server on {addr}");
let acceptor = edge_std_nal_async::Stack::new()
.listen(addr.parse().unwrap())
.await?;
server.run(acceptor, handler, None).await?;
Ok(())
}
struct WsHandler {
send_buff: UnsafeCell<MaybeUninit<[u8; MAX_BASE64_KEY_RESPONSE_LEN]>>,
recv_buff: UnsafeCell<MaybeUninit<[u8; 1024]>>,
}
impl WsHandler {
pub fn new() -> Self {
Self {
send_buff: UnsafeCell::new(MaybeUninit::uninit()),
recv_buff: UnsafeCell::new(MaybeUninit::uninit()),
}
}
async fn handle<'b, T, const N: usize>(
&self,
conn: &mut Connection<'b, T, N>,
) -> Result<(), HttpdError<T::Error>>
where
T: Read + Write,
{
let headers = conn.headers()?;
if !matches!(headers.method, Some(Method::Get)) {
conn.initiate_response(405, Some("Method Not Allowed"), &[])
.await?;
} else if !matches!(headers.path, Some("/")) {
conn.initiate_response(404, Some("Not Found"), &[]).await?;
} else if !conn.is_ws_upgrade_request()? {
conn.initiate_response(200, Some("OK"), &[("Content-Type", "text/plain")])
.await?;
conn.write_all(b"Initiate WS Upgrade request to switch this connection to WS")
.await?;
} else {
let send_buff: &mut [u8; _] =
unsafe { self.send_buff.get().as_mut().unwrap().assume_init_mut() };
let recv_buff = unsafe { self.recv_buff.get().as_mut().unwrap().assume_init_mut() };
self.handle_ws(send_buff, recv_buff, conn).await?;
}
Ok(())
}
async fn handle_ws<'b, T, const N: usize>(
&self,
send_buff: &mut [u8],
recv_buff: &mut [u8],
conn: &mut Connection<'b, T, N>,
) -> Result<(), HttpdError<T::Error>>
where
T: Read + Write,
{
let mut buf = send_buff[..MAX_BASE64_KEY_RESPONSE_LEN].try_into().unwrap();
conn.initiate_ws_upgrade_response(&mut buf).await?;
conn.complete().await?;
info!("Connection upgraded to WS, starting a simple WS echo server now");
let mut socket = conn.unbind()?;
let mut buf: [u8; 1024] = recv_buff[..1024].try_into().unwrap();
loop {
let mut header = FrameHeader::recv(&mut socket)
.await
.map_err(|w| HttpdError::Ws(WsError::IoError(w)))?;
let payload = header
.recv_payload(&mut socket, &mut buf)
.await
.map_err(|w| HttpdError::Ws(WsError::IoError(w)))?;
match header.frame_type {
FrameType::Text(_) => {
info!(
"Got {header}, with payload \"{}\"",
core::str::from_utf8(payload).unwrap()
);
}
FrameType::Binary(_) => {
info!("Got {header}, with payload {payload:?}");
}
FrameType::Close => {
info!("Got {header}, client closed the connection cleanly");
break;
}
_ => {
info!("Got {header}");
}
}
// Echo it back now
header.mask_key = None; // Servers never mask the payload
if matches!(header.frame_type, FrameType::Ping) {
header.frame_type = FrameType::Pong;
}
info!("Echoing back as {header}");
header
.send(&mut socket)
.await
.map_err(|w| HttpdError::Ws(WsError::IoError(w)))?;
header
.send_payload(&mut socket, payload)
.await
.map_err(|w| HttpdError::Ws(WsError::IoError(w)))?;
}
Ok(())
}
}
impl<'b, T, const N: usize> Handler<'b, T, N> for WsHandler
where
T: Read + Write,
T::Error: Send + Sync + std::error::Error + 'static,
{
type Error = anyhow::Error;
async fn handle(&self, conn: &mut Connection<'b, T, N>) -> Result<(), Self::Error> {
let res = WsHandler::handle(self, conn).await;
match res {
Ok(x) => (),
Err(x) => log::info!("Error in handling: {:?}", x),
}
Ok(())
}
}
async fn connect_wifi(wifi: &mut AsyncWifi<EspWifi<'static>>) -> anyhow::Result<()> {
let wifi_configuration: Configuration = Configuration::Client(ClientConfiguration {
ssid: SSID.try_into().unwrap(),
password: PASSWORD.try_into().unwrap(),
..Default::default()
});
wifi.set_configuration(&wifi_configuration)?;
wifi.start().await?;
info!("Wifi started");
wifi.connect().await?;
info!("Wifi connected");
wifi.wait_netif_up().await?;
info!("Wifi netif up");
Ok(())
}
#[derive(Debug)]
pub enum HttpdError<T> {
Http(io::Error<T>),
Ws(WsError<edge_ws::io::Error<T>>),
}
impl<T> From<io::Error<T>> for HttpdError<T> {
fn from(err: io::Error<T>) -> Self {
Self::Http(err)
}
}
impl<T> From<WsError<edge_ws::io::Error<T>>> for HttpdError<T> {
fn from(err: WsError<edge_ws::io::Error<T>>) -> Self {
Self::Ws(err)
}
}
fn httpd() -> Result<DefaultServer, EspError> {
Ok(DefaultServer::new())
}
fn http_handler() -> Result<WsHandler, EspError> {
Ok(WsHandler::new())
} |
So I went your suggested route with a buffer passed into the async task from the outside the executor.
Yes, first that null buffer, but this has been my unfortunate workflow for this:
This is completely outside of
Oh damn, I will see if this has impacted anything, but I'm sure it's all good. I hadn't seen that particular bug. |
Hi
I am trying, and failing, to use the edge-net stack on top of esp-idf for an ESP32.
Between stack overflows, watch dog timeouts and mutex access violations, I am stuck trying to implement the basic example for https://crates.io/crates/edge-ws over WiFi. I have tried to use a lot of ideas from
ruwm
to get this to run, but I'm just not having luck. (For added context, I can get theesp-idf-svc
http server to run and process web sockets requests, but it is very finicky, often reporting and error oncould not get second packet
, even after having received the full message, so I thought I would go the same route asruwm
(i'm building a power meter)).The current error I am seeing pops up from ESP land, but I have no idea how to interpret it.
Here is my code:
main.rs:
sdkconfig.defaults
Cargo.toml
.cargo/config.toml
The text was updated successfully, but these errors were encountered: