Skip to content

Commit

Permalink
Feat: Add Support for Durable Objects (cloudflare#12)
Browse files Browse the repository at this point in the history
* allow rust to access pre-existing durable object

* Transition to ES Modules build-upload type to prepare for exporting a DO from Rust

* updated

* update

* finished testing storage API

* update

Co-authored-by: Leo Orshansky <[email protected]>
  • Loading branch information
leoorshansky and Leo Orshansky authored Aug 6, 2021
1 parent c4bfe84 commit 7034fa4
Show file tree
Hide file tree
Showing 23 changed files with 1,011 additions and 125 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
**/target
Cargo.lock
.DS_Store
.DS_Store
**/worker/generated/*
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[workspace]
members = ["edgeworker-sys", "macros/cf", "rust-sandbox", "worker"]
members = ["edgeworker-sys", "macros/cf", "macros/durable_object", "rust-sandbox", "worker"]
3 changes: 2 additions & 1 deletion macros/cf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ quote = "1.0.9"
syn = "1.0.72"
wasm-bindgen-macro-support = "0.2.74"
web-sys = "0.3.51"
worker = { path = "../../worker" }
worker = { path = "../../worker" }
durable_object = { path = "../durable_object" }
15 changes: 10 additions & 5 deletions macros/cf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream {
// let input_arg = input_fn.sig.inputs.first().expect("#[cf::worker(fetch)] attribute requires exactly one input, of type `worker::Request`");

// save original fn name for re-use in the wrapper fn
let original_input_fn_ident = input_fn.sig.ident.clone();
let output_fn_ident = Ident::new("glue_fetch", input_fn.sig.ident.span());
let input_fn_ident = Ident::new(&(input_fn.sig.ident.to_string() + "_fetch_glue"), input_fn.sig.ident.span());
let wrapper_fn_ident = Ident::new("fetch", input_fn.sig.ident.span());
// rename the original attributed fn
input_fn.sig.ident = output_fn_ident.clone();
input_fn.sig.ident = input_fn_ident.clone();

// create a new "main" function that takes the edgeworker_sys::Request, and calls the
// original attributed function, passing in a converted worker::Request
let wrapper_fn = quote! {
pub async fn #original_input_fn_ident(ty: String, req: edgeworker_sys::Request) -> worker::Result<edgeworker_sys::Response> {
pub async fn #wrapper_fn_ident(req: ::edgeworker_sys::Request, env: ::worker::Env) -> ::worker::Result<::edgeworker_sys::Response> {
// get the worker::Result<worker::Response> by calling the original fn
#output_fn_ident(worker::Request::from((ty, req))).await
#input_fn_ident(worker::Request::from(req), env).await
.map(edgeworker_sys::Response::from)
}
};
Expand Down Expand Up @@ -75,3 +75,8 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream {
_ => exit_missing_attr(),
}
}

#[proc_macro_attribute]
pub fn durable_object(_attr: TokenStream, item: TokenStream) -> TokenStream {
durable_object::expand_macro(item.into()).unwrap_or_else(syn::Error::into_compile_error).into()
}
10 changes: 10 additions & 0 deletions macros/durable_object/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "durable_object"
version = "0.1.0"
edition = "2018"

[dependencies]
worker = { path = "../../worker" }
syn = "1.0.74"
quote = "1.0.9"
proc-macro2 = "1.0.28"
96 changes: 96 additions & 0 deletions macros/durable_object/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use proc_macro2::{Ident, TokenStream};
use quote::{ToTokens, quote};
use syn::{Error, ImplItem, Item, spanned::Spanned};

pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
let item = syn::parse2::<Item>(tokens)?;
match item {
Item::Impl(imp) => {
let impl_token = imp.impl_token;
let trai = imp.trait_.clone();
let (_, trai, _) = trai.ok_or_else(|| Error::new_spanned(impl_token, "Must be a DurableObject trait impl"))?;
if !trai.segments.last().map(|x| x.ident == "DurableObject").unwrap_or(false) {
return Err(Error::new(trai.span(), "Must be a DurableObject trait impl"))
}

let pound = syn::Token![#](imp.span()).to_token_stream();
let wasm_bindgen_attr = quote! {#pound[::wasm_bindgen::prelude::wasm_bindgen]};


let struct_name = imp.self_ty;
let items = imp.items;
let mut tokenized = vec![];
for item in items {
let mut method = match item {
ImplItem::Method(m) => m,
_ => return Err(Error::new_spanned(item, "Impl block must only contain methods"))
};
let tokens = match method.sig.ident.to_string().as_str() {
"constructor" => {
method.sig.ident = Ident::new("_constructor", method.sig.ident.span());
quote! {
#pound[::wasm_bindgen::prelude::wasm_bindgen(constructor)]
pub #method
}
},
"fetch" => {
method.sig.ident = Ident::new("_fetch_raw", method.sig.ident.span());
quote! {
#pound[::wasm_bindgen::prelude::wasm_bindgen(js_name = fetch)]
pub fn _fetch(&mut self, req: ::edgeworker_sys::Request) -> ::js_sys::Promise {
// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
// we know something that Rust doesn't: that the Durable Object will never be destroyed
// while there is still a running promise inside of it, therefore we can let a reference
// to the durable object escape into a static-lifetime future.
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

::wasm_bindgen_futures::future_to_promise(async move {
static_self._fetch_raw(req.into()).await.map(::edgeworker_sys::Response::from).map(::wasm_bindgen::JsValue::from)
.map_err(::wasm_bindgen::JsValue::from)
})
}

#method
}
},
_ => panic!()
};
tokenized.push(tokens);
}
Ok(quote! {
#wasm_bindgen_attr
impl #struct_name {
#(#tokenized)*
}

#pound[async_trait::async_trait(?Send)]
impl ::worker::durable::DurableObject for #struct_name {
fn constructor(state: ::worker::durable::State, env: ::worker::Env) -> Self {
Self::_constructor(state, env)
}

async fn fetch(&mut self, req: ::worker::Request) -> ::worker::Result<worker::Response> {
self._fetch_raw(req).await
}
}

trait __Need_Durable_Object_Trait_Impl_With_durable_object_Attribute { const MACROED: bool = true; }
impl __Need_Durable_Object_Trait_Impl_With_durable_object_Attribute for #struct_name {}
})
},
Item::Struct(struc) => {
let tokens = struc.to_token_stream();
let pound = syn::Token![#](struc.span()).to_token_stream();
let struct_name = struc.ident;
Ok(quote! {
#pound[::wasm_bindgen::prelude::wasm_bindgen]
#tokens

const _: bool = <#struct_name as __Need_Durable_Object_Trait_Impl_With_durable_object_Attribute>::MACROED;
})
},
_ => Err(Error::new(item.span(), "Durable Object macro can only be applied to structs and their impl of DurableObject trait"))
}
}
3 changes: 3 additions & 0 deletions rust-sandbox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ cfg-if = "0.1.2"
console_error_panic_hook = { version = "0.1.1", optional = true }
wee_alloc = { version = "0.4.2", optional = true }
cf = { path = "../macros/cf" }
durable_object = { path = "../macros/durable_object" }
edgeworker-sys = { path = "../edgeworker-sys" }
serde = { version = "1.0.126", features = ["derive"] }
worker = { path = "../worker" }
wasm-bindgen = "=0.2.74"
wasm-bindgen-futures = "0.4.24"
js-sys = "0.3.51"
worker-kv = "0.2.0"
http = "0.2.4"
url = "2.2.2"
async-trait = "0.1.50"

[dev-dependencies]
wasm-bindgen-test = "0.2"
Expand Down
36 changes: 36 additions & 0 deletions rust-sandbox/src/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use cf::durable_object;
use worker::{prelude::*, durable::{State}};

const ONE_HOUR: u64 = 3600000;

#[durable_object]
pub struct Counter {
count: usize,
state: State,
initialized: bool,
last_backup: Date
}

#[durable_object]
impl DurableObject for Counter {
fn constructor(state: worker::durable::State, _env: worker::Env) -> Self {
Self { count: 0, initialized: false, state, last_backup: Date::now() }
}

async fn fetch(&mut self, _req: worker::Request) -> worker::Result<worker::Response> {
// Get info from last backup
if !self.initialized {
self.initialized = true;
self.count = self.state.storage().get("count").await.unwrap_or(0);
}

// Do a backup every hour
if Date::now().as_millis() - self.last_backup.as_millis() > ONE_HOUR {
self.last_backup = Date::now();
self.state.storage().put("count", self.count).await?;
}

self.count += 1;
Response::ok(self.count.to_string())
}
}
43 changes: 24 additions & 19 deletions rust-sandbox/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use worker::{kv::KvStore, prelude::*, Router};
use worker::{durable::ObjectNamespace, kv::KvStore, prelude::*, Router};

mod test;
mod counter;
mod utils;

#[derive(Deserialize, Serialize)]
Expand Down Expand Up @@ -28,37 +30,34 @@ struct User {
date_from_str: String,
}

fn handle_a_request(_req: Request, _params: Params) -> Result<Response> {
Response::ok("weeee".into())
fn handle_a_request(_req: Request, _env: Env, _params: Params) -> Result<Response> {
Response::ok("weeee")
}

#[cf::worker(fetch)]
pub async fn main(req: Request) -> Result<Response> {
console_log!("request at: {:?}", req.path());

pub async fn main(req: Request, env: Env) -> Result<Response> {
utils::set_panic_hook();

let mut router = Router::new();

router.get("/request", handle_a_request)?;
router.post("/headers", |req, _| {
router.post("/headers", |req, _, _| {
let mut headers: http::HeaderMap = req.headers().into();
headers.append("Hello", "World!".parse().unwrap());

// TODO: make api for Response new and mut to add headers
Response::ok("returned your headers to you.".into())
.map(|res| res.with_headers(headers.into()))
Response::ok("returned your headers to you.").map(|res| res.with_headers(headers.into()))
})?;

router.on("/user/:id/test", |req, params| {
router.on("/user/:id/test", |req, _env, params| {
if !matches!(req.method(), Method::Get) {
return Response::error("Method Not Allowed".into(), 405);
return Response::error("Method Not Allowed", 405);
}
let id = params.get("id").unwrap_or("not found");
Response::ok(format!("TEST user id: {}", id))
})?;

router.on("/user/:id", |_req, params| {
router.on("/user/:id", |_req, _env, params| {
let id = params.get("id").unwrap_or("not found");
Response::from_json(&User {
id: id.into(),
Expand All @@ -71,25 +70,25 @@ pub async fn main(req: Request) -> Result<Response> {
})
})?;

router.post("/account/:id/zones", |_, params| {
router.post("/account/:id/zones", |_, _, params| {
Response::ok(format!(
"Create new zone for Account: {}",
params.get("id").unwrap_or("not found")
))
})?;

router.get("/account/:id/zones", |_, params| {
router.get("/account/:id/zones", |_, _, params| {
Response::ok(format!(
"Account id: {}..... You get a zone, you get a zone!",
params.get("id").unwrap_or("not found")
))
})?;

router.on_async("/async", |mut req, _params| async move {
router.on_async("/async", |mut req, _env, _params| async move {
Response::ok(format!("Request body: {}", req.text().await?))
})?;

router.on_async("/fetch", |_req, _params| async move {
router.on_async("/fetch", |_req, _env, _params| async move {
let req = Request::new("https://example.com", "POST")?;
let resp = Fetch::Request(&req).send().await?;
let resp2 = Fetch::Url("https://example.com").send().await?;
Expand All @@ -100,7 +99,7 @@ pub async fn main(req: Request) -> Result<Response> {
))
})?;

router.on_async("/fetch_json", |_req, _params| async move {
router.on_async("/fetch_json", |_req, _env, _params| async move {
let data: ApiData = Fetch::Url("https://jsonplaceholder.typicode.com/todos/1")
.send()
.await?
Expand All @@ -112,13 +111,19 @@ pub async fn main(req: Request) -> Result<Response> {
))
})?;

router.on_async("/proxy_request/:url", |_req, params| {
router.on_async("/proxy_request/:url", |_req, _env, params| {
// Must copy the parameters into the heap here for lifetime purposes
let url = params.get("url").unwrap().to_string();
async move { Fetch::Url(&url).send().await }
})?;

router.run(req).await
router.on_async("durable", |_req, e, _params| async move {
let namespace = e.get_binding::<ObjectNamespace>("COUNTER")?;
let stub = namespace.id_from_name("A")?.get_stub()?;
stub.fetch_with_str("/").await
})?;

router.run(req, env).await

// match (req.method(), req.path().as_str()) {
// (Method::Get, "/") => {
Expand Down
48 changes: 48 additions & 0 deletions rust-sandbox/src/test/durable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::ensure;
use worker::{durable::ObjectNamespace, prelude::*, Result};

pub async fn basic_test(env: &Env) -> Result<()> {
let namespace: ObjectNamespace = env.get_binding("MY_CLASS")?;
let id = namespace.id_from_name("A")?;
let bad = env.get_binding::<ObjectNamespace>("DFSDF");
ensure!(bad.is_err(), "Invalid binding did not raise error");

let stub = id.get_stub()?;
let res = stub.fetch_with_str("hello").await?.text().await?;
let res2 = stub
.fetch_with_request(Request::new_with_init(
"hello",
RequestInit::new().body(Some(&"lol".into())).method("POST"),
)?)
.await?
.text()
.await?;

ensure!(res == res2, "Durable object responded wrong to 'hello'");

let res = stub.fetch_with_str("storage").await?.text().await?;
let num = res
.parse::<usize>()
.map_err(|_| "Durable Object responded wrong to 'storage': ".to_string() + &res)?;
let res = stub.fetch_with_str("storage").await?.text().await?;
let num2 = res
.parse::<usize>()
.map_err(|_| "Durable Object responded wrong to 'storage'".to_string())?;

ensure!(
num2 == num + 1,
"Durable object responded wrong to 'storage'"
);

let res = stub.fetch_with_str("transaction").await?.text().await?;
let num = res
.parse::<usize>()
.map_err(|_| "Durable Object responded wrong to 'transaction': ".to_string() + &res)?;

ensure!(
num == num2 + 1,
"Durable object responded wrong to 'storage'"
);

Ok(())
}
Loading

0 comments on commit 7034fa4

Please sign in to comment.