Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Hibernatable Web Sockets API #436

Merged
merged 15 commits into from
Feb 28, 2024
166 changes: 148 additions & 18 deletions worker-macros/src/durable_object.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use proc_macro2::{Ident, TokenStream};
use quote::{quote, ToTokens};
use syn::{spanned::Spanned, Error, FnArg, ImplItem, Item, Type, TypePath};
use syn::{spanned::Spanned, Error, FnArg, ImplItem, Item, Type, TypePath, Visibility};

pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
let item = syn::parse2::<Item>(tokens)?;
Expand All @@ -20,24 +20,37 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
let struct_name = imp.self_ty;
let items = imp.items;
let mut tokenized = vec![];
let mut has_alarm = false;

#[derive(Default)]
struct OptionalMethods {
has_alarm: bool,
has_websocket_message: bool,
has_websocket_close: bool,
has_websocket_error: bool,
}

let mut optional_methods = OptionalMethods::default();

for item in items {
let impl_method = match item {
ImplItem::Fn(func) => func,
_ => return Err(Error::new_spanned(item, "Impl block must only contain methods"))
};

let span = impl_method.sig.ident.span();

let tokens = match impl_method.sig.ident.to_string().as_str() {
"new" => {
let mut method = impl_method.clone();
method.sig.ident = Ident::new("_new", method.sig.ident.span());
method.vis = Visibility::Inherited;


// modify the `state` argument so it is type ObjectState
let arg_tokens = method.sig.inputs.first_mut().expect("DurableObject `new` method must have 2 arguments: state and env").into_token_stream();
match syn::parse2::<FnArg>(arg_tokens)? {
FnArg::Typed(pat) => {
let path = syn::parse2::<TypePath>(quote!{worker_sys::DurableObjectState})?;
let path = syn::parse2::<TypePath>(quote!{worker::worker_sys::DurableObjectState})?;
let mut updated_pat = pat;
updated_pat.ty = Box::new(Type::Path(path));

Expand All @@ -57,17 +70,19 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
prepended.extend(method.block.stmts);
method.block.stmts = prepended;

quote! {
Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(constructor)]
pub #method
}
})
},
"fetch" => {
let mut method = impl_method.clone();
method.sig.ident = Ident::new("_fetch_raw", method.sig.ident.span());
quote! {
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = fetch)]
pub fn _fetch(&mut self, req: worker_sys::web_sys::Request) -> js_sys::Promise {
pub fn _fetch(&mut self, req: worker::worker_sys::web_sys::Request) -> worker::js_sys::Promise {
// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
Expand All @@ -77,22 +92,24 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._fetch_raw(req.into()).await.map(worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
static_self._fetch_raw(req.into()).await.map(worker::worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
}
})
},
"alarm" => {
has_alarm = true;
optional_methods.has_alarm = true;

let mut method = impl_method.clone();
method.sig.ident = Ident::new("_alarm_raw", method.sig.ident.span());
quote! {
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = alarm)]
pub fn _alarm(&mut self) -> js_sys::Promise {
pub fn _alarm(&mut self) -> worker::js_sys::Promise {
// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
Expand All @@ -102,24 +119,131 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._alarm_raw().await.map(worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
static_self._alarm_raw().await.map(worker::worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
}
}
_ => panic!()
})
},
"websocket_message" => {
optional_methods.has_websocket_message = true;

let mut method = impl_method.clone();
method.sig.ident = Ident::new("_websocket_message_raw", method.sig.ident.span());
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = webSocketMessage)]
pub fn _websocket_message(&mut self, ws: worker::worker_sys::web_sys::WebSocket, message: wasm_bindgen::JsValue) -> worker::js_sys::Promise {
let ws_message = if let Some(string_message) = message.as_string() {
worker::WebSocketIncomingMessage::String(string_message)
} else {
let v = worker::js_sys::Uint8Array::new(&message).to_vec();
worker::WebSocketIncomingMessage::Binary(v)
};

// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
// we know something that Rust doesn't: that the Durable Object will never be destroyed
// while there is still a running promise inside of it, therefore we can let a reference
// to the durable object escape into a static-lifetime future.
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._websocket_message_raw(ws.into(), ws_message).await.map(|_| wasm_bindgen::JsValue::NULL)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
})
},
"websocket_close" => {
optional_methods.has_websocket_close = true;

let mut method = impl_method.clone();
method.sig.ident = Ident::new("_websocket_close_raw", method.sig.ident.span());
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = webSocketClose)]
pub fn _websocket_close(&mut self, ws: worker::worker_sys::web_sys::WebSocket, code: usize, reason: String, was_clean: bool) -> worker::js_sys::Promise {
// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
// we know something that Rust doesn't: that the Durable Object will never be destroyed
// while there is still a running promise inside of it, therefore we can let a reference
// to the durable object escape into a static-lifetime future.
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._websocket_close_raw(ws.into(), code, reason, was_clean).await.map(|_| wasm_bindgen::JsValue::NULL)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
})
},
"websocket_error" => {
optional_methods.has_websocket_error = true;

let mut method = impl_method.clone();
method.sig.ident = Ident::new("_websocket_error_raw", method.sig.ident.span());
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = webSocketError)]
pub fn _websocket_error(&mut self, ws: worker::worker_sys::web_sys::WebSocket, error: wasm_bindgen::JsValue) -> worker::js_sys::Promise {
// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
// we know something that Rust doesn't: that the Durable Object will never be destroyed
// while there is still a running promise inside of it, therefore we can let a reference
// to the durable object escape into a static-lifetime future.
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._websocket_error_raw(ws.into(), error.into()).await.map(|_| wasm_bindgen::JsValue::NULL)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
})
},
ident => Err(Error::new(span, format!("Unsupported method `{}`, please move extra impl methods to a separate impl definition", ident)))
};
tokenized.push(tokens);
tokenized.push(tokens?);
}

let alarm_tokens = has_alarm.then(|| quote! {
let alarm_tokens = optional_methods.has_alarm.then(|| quote! {
async fn alarm(&mut self) -> ::worker::Result<worker::Response> {
self._alarm_raw().await
}
});

let websocket_message_tokens = optional_methods.has_websocket_message.then(|| quote! {
async fn websocket_message(&mut self, ws: ::worker::WebSocket, message: ::worker::WebSocketIncomingMessage) -> ::worker::Result<()> {
self._websocket_message_raw(ws, message).await
}
});

let websocket_close_tokens = optional_methods.has_websocket_close.then(|| quote! {
async fn websocket_close(&mut self, ws: ::worker::WebSocket, code: usize, reason: String, was_clean: bool) -> ::worker::Result<()> {
self._websocket_close_raw(ws, code, reason, was_clean).await
}
});

let websocket_error_tokens = optional_methods.has_websocket_error.then(|| quote! {
async fn websocket_error(&mut self, ws: ::worker::WebSocket, error: ::worker::Error) -> ::worker::Result<()> {
self._websocket_error_raw(ws, error).await
}
});

Ok(quote! {
#wasm_bindgen_attr
impl #struct_name {
Expand All @@ -137,6 +261,12 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
}

#alarm_tokens

#websocket_message_tokens

#websocket_close_tokens

#websocket_error_tokens
}

trait __Need_Durable_Object_Trait_Impl_With_durable_object_Attribute { const MACROED: bool = true; }
Expand Down
20 changes: 20 additions & 0 deletions worker-sys/src/ext/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ mod glue {

#[wasm_bindgen(method, catch)]
pub fn accept(this: &WebSocket) -> Result<(), JsValue>;

#[wasm_bindgen(method, catch, js_name = "serializeAttachment")]
pub fn serialize_attachment(this: &WebSocket, value: JsValue) -> Result<(), JsValue>;

#[wasm_bindgen(method, catch, js_name = "deserializeAttachment")]
pub fn deserialize_attachment(this: &WebSocket) -> Result<JsValue, JsValue>;
}
}

Expand All @@ -18,10 +24,24 @@ pub trait WebSocketExt {
///
/// [CF Documentation](https://developers.cloudflare.com/workers/runtime-apis/websockets#accept)
fn accept(&self) -> Result<(), JsValue>;

fn serialize_attachment(&self, value: JsValue) -> Result<(), JsValue>;

fn deserialize_attachment(&self) -> Result<JsValue, JsValue>;
}

impl WebSocketExt for web_sys::WebSocket {
fn accept(&self) -> Result<(), JsValue> {
self.unchecked_ref::<glue::WebSocket>().accept()
}

fn serialize_attachment(&self, value: JsValue) -> Result<(), JsValue> {
self.unchecked_ref::<glue::WebSocket>()
.serialize_attachment(value)
}

fn deserialize_attachment(&self) -> Result<JsValue, JsValue> {
self.unchecked_ref::<glue::WebSocket>()
.deserialize_attachment()
}
}
17 changes: 17 additions & 0 deletions worker-sys/src/types/durable_object/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,21 @@ extern "C" {

#[wasm_bindgen(method, js_name=waitUntil)]
pub fn wait_until(this: &DurableObjectState, promise: &js_sys::Promise);

#[wasm_bindgen(method, js_name=acceptWebSocket)]
pub fn accept_websocket(this: &DurableObjectState, ws: &web_sys::WebSocket);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling accept() on a WebSocket in an Object will incur duration charges for the entire time the WebSocket is connected. If you prefer, use state.acceptWebSocket() instead, which will stop incurring duration charges once all event handlers finish running.

https://developers.cloudflare.com/durable-objects/platform/pricing/

The support of acceptWebSocket() is critical to many existing rust-based Durable Object applications since it would dramatically lower the cost incurred by WebSocket connection currently implemented by accept() method. Really appreciate your work!


#[wasm_bindgen(method, js_name=acceptWebSocket)]
pub fn accept_websocket_with_tags(
this: &DurableObjectState,
ws: &web_sys::WebSocket,
tags: Vec<JsValue>,
Copy link
Contributor Author

@DylanRJohnston DylanRJohnston Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasm_bindgen v0.2.89 supports using Vec<String> directly. There's a few places in the workers-rs API surface that could benefit from this change.

);

#[wasm_bindgen(method, js_name=getWebSockets)]
pub fn get_websockets(this: &DurableObjectState) -> Vec<web_sys::WebSocket>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why do there need to be two versions of acceptWebSocket and getWebSockets? The JS and C++ methods all provide an optional/maybe for tags, would an Option not work here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion (I am not the author), it's a style choice but this is probably the right one (in Rust anyway).

It's slightly kinder to the caller to have two functions, and probably more idiomatic in Rust. Passing a parameter None hurts the readability of the code, because it's not obvious at a glance what that parameter means.

The rest of this crate seems to mostly follow this style as well, e.g. ObjectNamespace::unique_id/unique_id_with_jurisdiction, Storage::list/list_with_options...

Copy link
Contributor Author

@DylanRJohnston DylanRJohnston Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's an unfortunate impedance mismatch between function overloading in JS and the lack thereof in Rust. I'm just trying to match the style of the rest of the repo which manually "un-overloads" the functions as Eric pointed out.

I kind of wish Rust did have function overloading, but it probably has complicated interactions with the trait solver and type inference.


#[wasm_bindgen(method, js_name=getWebSockets)]
pub fn get_websockets_with_tag(this: &DurableObjectState, tag: &str)
-> Vec<web_sys::WebSocket>;
}
Loading
Loading