Skip to content

Commit

Permalink
Async function execution implemented
Browse files Browse the repository at this point in the history
Add more support for create, close and message
  • Loading branch information
sansyrox committed Dec 18, 2021
1 parent c122ca9 commit 074e055
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 97 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ actix-http = "3.0.0-beta.8"
socket2 = { version = "0.4.1", features = ["all"] }
actix = "0.12.0"
actix-web-actors = "4.0.0-beta.1"
futures = "0.3.17"

[package.metadata.maturin]
name = "robyn"
5 changes: 1 addition & 4 deletions integration_tests/base_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ async def connect():
global i
i+=1
if i==0:
print("hello, world")
return "Whaaat??"
elif i==1:
return "Whooo??"
Expand All @@ -22,12 +21,10 @@ async def connect():

@websocket.on("close")
def close():
print("Hello world")
return "Hello world, from ws"
return "GoodBye world, from ws"

@websocket.on("connect")
def message():
print("Hello world")
return "Hello world, from ws"


Expand Down
44 changes: 21 additions & 23 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use pyo3::prelude::*;
use pyo3::types::PyAny;

use actix_web::http::Method;
use dashmap::DashMap;
use matchit::Node;

/// Contains the thread safe hashmaps of different routes
Expand All @@ -21,7 +20,7 @@ pub struct Router {
options_routes: Arc<RwLock<Node<(PyFunction, u8)>>>,
connect_routes: Arc<RwLock<Node<(PyFunction, u8)>>>,
trace_routes: Arc<RwLock<Node<(PyFunction, u8)>>>,
web_socket_routes: DashMap<String, HashMap<String, (PyFunction, u8)>>,
web_socket_routes: Arc<RwLock<HashMap<String, HashMap<String, (PyFunction, u8)>>>>,
}

impl Router {
Expand All @@ -36,7 +35,7 @@ impl Router {
options_routes: Arc::new(RwLock::new(Node::new())),
connect_routes: Arc::new(RwLock::new(Node::new())),
trace_routes: Arc::new(RwLock::new(Node::new())),
web_socket_routes: DashMap::new(),
web_socket_routes: Arc::new(RwLock::new(HashMap::new())),
}
}

Expand All @@ -57,7 +56,9 @@ impl Router {
}

#[inline]
pub fn get_web_socket_map(&self) -> &DashMap<String, HashMap<String, (PyFunction, u8)>> {
pub fn get_web_socket_map(
&self,
) -> &Arc<RwLock<HashMap<String, HashMap<String, (PyFunction, u8)>>>> {
&self.web_socket_routes
}

Expand Down Expand Up @@ -117,42 +118,39 @@ impl Router {
let (close_route_function, close_route_is_async, close_route_params) = close_route;
let (message_route_function, message_route_is_async, message_route_params) = message_route;

let insert_in_router = |table: &DashMap<String, HashMap<String, (PyFunction, u8)>>,
handler: Py<PyAny>,
is_async: bool,
number_of_params: u8,
socket_type: &str| {
let function = if is_async {
PyFunction::CoRoutine(handler)
} else {
PyFunction::SyncFunction(handler)
let insert_in_router =
|handler: Py<PyAny>, is_async: bool, number_of_params: u8, socket_type: &str| {
let function = if is_async {
PyFunction::CoRoutine(handler)
} else {
PyFunction::SyncFunction(handler)
};

println!("socket type is {:?} {:?}", table, route);

table
.write()
.unwrap()
.entry(route.to_string())
.or_default()
.insert(socket_type.to_string(), (function, number_of_params))
};

let mut route_map = HashMap::new();
route_map.insert(socket_type.to_string(), (function, number_of_params));

println!("{:?}", table);
table.insert(route.to_string(), route_map);
};

insert_in_router(
table,
connect_route_function,
connect_route_is_async,
connect_route_params,
"connect",
);

insert_in_router(
table,
close_route_function,
close_route_is_async,
close_route_params,
"close",
);

insert_in_router(
table,
message_route_function,
message_route_is_async,
message_route_params,
Expand Down
14 changes: 5 additions & 9 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,10 @@ impl Server {
return Ok(());
}

println!("{}", name);

let borrow = socket.try_borrow_mut()?;
let held_socket: &SocketHeld = &*borrow;

let raw_socket = held_socket.get_socket();
println!("Got our socket {:?}", raw_socket);

let router = self.router.clone();
let headers = self.headers.clone();
Expand Down Expand Up @@ -123,12 +120,12 @@ impl Server {
.app_data(web::Data::new(headers.clone()));

let web_socket_map = router_copy.get_web_socket_map();
for elem in (web_socket_map).iter() {
let route = elem.key().clone();
let params = elem.value().clone();
for (elem, value) in (web_socket_map.read().unwrap()).iter() {
let route = elem.clone();
let params = value.clone();
let event_loop_hdl = event_loop_hdl.clone();
app = app.route(
&route,
&route.clone(),
web::get().to(
move |_router: web::Data<Arc<Router>>,
_headers: web::Data<Arc<Headers>>,
Expand Down Expand Up @@ -211,14 +208,13 @@ impl Server {
/// Add a new web socket route to the routing tables
/// can be called after the server has been started
pub fn add_web_socket_route(
&self,
&mut self,
route: &str,
// handler, is_async, number of params
connect_route: (Py<PyAny>, bool, u8),
close_route: (Py<PyAny>, bool, u8),
message_route: (Py<PyAny>, bool, u8),
) {
println!("WS Route added for {} ", route);
self.router
.add_websocket_route(route, connect_route, close_route, message_route);
}
Expand Down
109 changes: 50 additions & 59 deletions src/web_socket_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,65 @@ use std::collections::HashMap;
use std::sync::Arc;

/// Define HTTP actor
#[derive(Clone)]
struct MyWs {
router: Arc<HashMap<String, (PyFunction, u8)>>,
event_loop: PyObject,
}

fn execute_ws_functionn(
handler_function: &PyFunction,
event_loop: PyObject,
ctx: &mut ws::WebsocketContext<MyWs>,
ws: &MyWs,
) {
match handler_function {
PyFunction::SyncFunction(handler) => Python::with_gil(|py| {
let handler = handler.as_ref(py);
// call execute function
let op = handler.call0().unwrap();
let op: &str = op.extract().unwrap();
ctx.text(op);
}),
PyFunction::CoRoutine(handler) => {
let fut = Python::with_gil(|py| {
let handler = handler.as_ref(py);
let coro = handler.call0().unwrap();
pyo3_asyncio::into_future_with_loop(event_loop.as_ref(py), coro).unwrap()
});
let f = async move {
let output = fut.await.unwrap();
Python::with_gil(|py| {
let output: &str = output.extract(py).unwrap();
output.to_string()
})
};
let f = f.into_actor(ws).map(|res, _, ctx| {
ctx.text(res);
});
ctx.spawn(f);
}
}
}

// By default mailbox capacity is 16 messages.
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;

fn started(&mut self, ctx: &mut WebsocketContext<Self>) {
let handler_function = &self.router.get("connect").unwrap().0;
let _number_of_params = &self.router.get("connect").unwrap().1;
execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self);

println!("Actor is alive");
}

fn stopped(&mut self, _ctx: &mut WebsocketContext<Self>) {
println!("Actor is alive");
fn stopped(&mut self, ctx: &mut WebsocketContext<Self>) {
let handler_function = &self.router.get("close").expect("No close function").0;
let _number_of_params = &self.router.get("close").unwrap().1;
execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self);

println!("Actor is dead");
}
}

Expand All @@ -42,19 +86,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
let handler_function = &self.router.get("connect").unwrap().0;
let _number_of_params = &self.router.get("connect").unwrap().1;
println!("{:?}", handler_function);
match handler_function {
PyFunction::SyncFunction(handler) => Python::with_gil(|py| {
let handler = handler.as_ref(py);
// call execute function
let op = handler.call0().unwrap();
let op: &str = op.extract().unwrap();

println!("{}", op);
}),
PyFunction::CoRoutine(handler) => {
println!("Async functions are not supported in WS right now.");
}
}
execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self);
ctx.pong(&msg)
}

Expand All @@ -67,56 +99,15 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
// need to also passs this text as a param
let handler_function = &self.router.get("message").unwrap().0;
let _number_of_params = &self.router.get("message").unwrap().1;
match handler_function {
PyFunction::SyncFunction(handler) => Python::with_gil(|py| {
let handler = handler.as_ref(py);
// call execute function
let op = handler.call0().unwrap();
let op: &str = op.extract().unwrap();

return ctx.text(op);
}),
PyFunction::CoRoutine(handler) => {
let fut = Python::with_gil(|py| {
let handler = handler.as_ref(py);
let coro = handler.call0().unwrap();
pyo3_asyncio::into_future_with_loop(self.event_loop.as_ref(py), coro)
.unwrap()
});
let f = async move {
let op = fut.await.unwrap();
Python::with_gil(|py| {
let op: &str = op.extract(py).unwrap();
op.to_string()
})
};
let f = f.into_actor(self).map(|res, _, ctx| {
ctx.text(res);
});
ctx.spawn(f);
}
}
execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self);
}

Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(_close_reason)) => {
println!("Socket was closed");
let handler_function = &self.router.get("close").unwrap().0;
let handler_function = &self.router.get("close").expect("No close function").0;
let _number_of_params = &self.router.get("close").unwrap().1;
println!("{:?}", handler_function);
match handler_function {
PyFunction::SyncFunction(handler) => Python::with_gil(|py| {
let handler = handler.as_ref(py);
// call execute function
let op = handler.call0().unwrap();
let op: &str = op.extract().unwrap();

println!("{:?}", op);
}),
PyFunction::CoRoutine(_handler) => {
println!("Async functions are not supported in WS right now.");
}
}
execute_ws_functionn(handler_function, self.event_loop.clone(), ctx, &self);
}
_ => (),
}
Expand Down

0 comments on commit 074e055

Please sign in to comment.