diff --git a/Cargo.lock b/Cargo.lock index 3ca451efb..f7e7bacd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1183,6 +1183,7 @@ name = "robyn" version = "0.7.1" dependencies = [ "actix-files", + "actix-http", "actix-web", "anyhow", "dashmap", @@ -1190,6 +1191,7 @@ dependencies = [ "matchit", "pyo3", "pyo3-asyncio", + "socket2", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index a1e8f221a..3d299145b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,8 @@ actix-web = "4.0.0-beta.8" actix-files = "0.6.0-beta.4" futures-util = "0.3.15" matchit = "0.4.3" +actix-http = "3.0.0-beta.8" +socket2 = { version = "0.4.1", features = ["all"] } [package.metadata.maturin] name = "robyn" diff --git a/README.md b/README.md index 69d568d08..3c8070454 100644 --- a/README.md +++ b/README.md @@ -50,11 +50,19 @@ If you're feeling curious. You can take a look at a more detailed architecture [ ## To Run -### Without hot reloading -`python3 app.py` +``` +python3 app.py -h + +usage: base_routes.py [-h] [--processes PROCESSES] [--workers WORKERS] [--dev DEV] -### With hot reloading -`python3 app.py --dev=true` +Robyn, a fast async web framework with a rust runtime. + +optional arguments: + -h, --help show this help message and exit + --processes PROCESSES : allows you to choose the number of parallel processes + --workers WORKERS : allows you to choose the number of workers + --dev DEV : this flag gives the option to enable hot reloading or not +``` ## Contributors/Supporters diff --git a/integration_tests/base_routes.py b/integration_tests/base_routes.py index 7bd9bf952..515ae000e 100644 --- a/integration_tests/base_routes.py +++ b/integration_tests/base_routes.py @@ -3,23 +3,23 @@ # robyn_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../robyn") # sys.path.insert(0, robyn_path) -from robyn import Robyn, static_file, jsonify +from robyn import Robyn, static_file, jsonify, SocketHeld import asyncio import os import pathlib app = Robyn(__file__) + callCount = 0 @app.get("/") -async def h(requests): - print(requests) +async def hello(request): global callCount callCount += 1 message = "Called " + str(callCount) + " times" - return message + return jsonify(request) @app.get("/test/:id") @@ -41,21 +41,41 @@ async def json(request): return jsonify({"hello": "world"}) @app.post("/post") -async def postreq(request): +async def post(): + return "POST Request" + +@app.post("/post_with_body") +async def postreq_with_body(request): return bytearray(request["body"]).decode("utf-8") @app.put("/put") -async def putreq(request): +async def put(request): + return "PUT Request" + +@app.put("/put_with_body") +async def putreq_with_body(request): + print(request) return bytearray(request["body"]).decode("utf-8") + @app.delete("/delete") -async def deletereq(request): +async def delete(request): + return "DELETE Request" + +@app.delete("/delete_with_body") +async def deletereq_with_body(request): return bytearray(request["body"]).decode("utf-8") + @app.patch("/patch") -async def patchreq(request): +async def patch(request): + return "PATCH Request" + +@app.patch("/patch_with_body") +async def patchreq_with_body(request): return bytearray(request["body"]).decode("utf-8") + @app.get("/sleep") async def sleeper(): await asyncio.sleep(5) @@ -71,5 +91,7 @@ def blocker(): if __name__ == "__main__": app.add_header("server", "robyn") - app.add_directory(route="/test_dir",directory_path="./test_dir/build", index_file="index.html") + current_file_path = pathlib.Path(__file__).parent.resolve() + os.path.join(current_file_path, "build") + app.add_directory(route="/test_dir",directory_path=os.path.join(current_file_path, "build/"), index_file="index.html") app.start(port=5000, url='0.0.0.0') diff --git a/integration_tests/test_delete_requests.py b/integration_tests/test_delete_requests.py new file mode 100644 index 000000000..e34bfaaa7 --- /dev/null +++ b/integration_tests/test_delete_requests.py @@ -0,0 +1,16 @@ +import requests + +BASE_URL = "http://127.0.0.1:5000" + +def test_delete(session): + res = requests.delete(f"{BASE_URL}/delete") + assert (res.status_code == 200) + assert res.text=="DELETE Request" + +def test_delete_with_param(session): + res = requests.delete(f"{BASE_URL}/delete_with_body", data = { + "hello": "world" + }) + assert (res.status_code == 200) + assert res.text=="hello=world" + diff --git a/integration_tests/test_dir/build/index.html b/integration_tests/test_dir/build/index.html new file mode 100644 index 000000000..1d6de97e0 --- /dev/null +++ b/integration_tests/test_dir/build/index.html @@ -0,0 +1,11 @@ + + + + + + Document + + + Hello, World + + diff --git a/integration_tests/test_get_requests.py b/integration_tests/test_get_requests.py index 0bfa1c4a8..2823f9025 100644 --- a/integration_tests/test_get_requests.py +++ b/integration_tests/test_get_requests.py @@ -1,15 +1,17 @@ import requests +BASE_URL = "http://127.0.0.1:5000" + def test_index_request(session): - res = requests.get("http://127.0.0.1:5000/") + res = requests.get(f"{BASE_URL}") assert(res.status_code == 200) def test_jsonify(session): - r = requests.get("http://127.0.0.1:5000/jsonify") + r = requests.get(f"{BASE_URL}/jsonify") assert r.json()=={"hello":"world"} assert r.status_code==200 def test_html(session): - r = requests.get("http://127.0.0.1:5000/test/123") + r = requests.get(f"{BASE_URL}/test/123") assert "Hello world. How are you?" in r.text diff --git a/integration_tests/test_patch_requests.py b/integration_tests/test_patch_requests.py new file mode 100644 index 000000000..14dd7b6bf --- /dev/null +++ b/integration_tests/test_patch_requests.py @@ -0,0 +1,15 @@ +import requests + +BASE_URL = "http://127.0.0.1:5000" + +def test_patch(session): + res = requests.patch(f"{BASE_URL}/patch") + assert (res.status_code == 200) + assert res.text=="PATCH Request" + +def test_patch_with_param(session): + res = requests.patch(f"{BASE_URL}/patch_with_body", data = { + "hello": "world" + }) + assert (res.status_code == 200) + assert res.text=="hello=world" diff --git a/integration_tests/test_post_requests.py b/integration_tests/test_post_requests.py new file mode 100644 index 000000000..59dd14782 --- /dev/null +++ b/integration_tests/test_post_requests.py @@ -0,0 +1,22 @@ +import requests + +BASE_URL = "http://127.0.0.1:5000" + +def test_post(session): + res = requests.post(f"{BASE_URL}/post") + assert (res.status_code == 200) + assert res.text=="POST Request" + +def test_post_with_param(session): + res = requests.post(f"{BASE_URL}/post_with_body", data = { + "hello": "world" + }) + assert res.text=="hello=world" + assert (res.status_code == 200) + + +def test_jsonify_request(session): + res = requests.post(f"{BASE_URL}/jsonify/123") + assert(res.status_code == 200) + assert res.json()=={"hello":"world"} + diff --git a/integration_tests/test_put_requests.py b/integration_tests/test_put_requests.py new file mode 100644 index 000000000..f656ad293 --- /dev/null +++ b/integration_tests/test_put_requests.py @@ -0,0 +1,16 @@ +import requests + +BASE_URL = "http://127.0.0.1:5000" + +def test_put(session): + res = requests.put(f"{BASE_URL}/put") + assert (res.status_code == 200) + assert res.text=="PUT Request" + +def test_put_with_param(session): + res = requests.put(f"{BASE_URL}/put_with_body", data = { + "hello": "world" + }) + + assert (res.status_code == 200) + assert res.text=="hello=world" diff --git a/pyproject.toml b/pyproject.toml index 865ce4152..61417cace 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,5 +17,12 @@ build-backend = "poetry.core.masonry.api" [project] name = "robyn" dependencies = [ - "watchdog>=2.1.3,<3" + 'watchdog == 2.1.3', + 'multiprocess == 0.70.12.2', +# conditional + 'uvloop == 0.16.0; sys_platform == "darwin"', + 'uvloop == 0.16.0; platform_machine == "x86_64"', + 'uvloop == 0.16.0; platform_machine == "i686"' ] + + diff --git a/requirements.txt b/requirements.txt index bb75d0a8f..9a83bde4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ requests==2.26.0 pytest==6.2.5 maturin +uvloop +watchdog +multiprocess==0.70.12.2 diff --git a/robyn/__init__.py b/robyn/__init__.py index fcebaa44a..3f83a060e 100644 --- a/robyn/__init__.py +++ b/robyn/__init__.py @@ -1,15 +1,21 @@ # default imports import os -import argparse import asyncio from inspect import signature +import multiprocessing as mp +mp.allow_connection_pickling() -from .robyn import Server +# custom imports and exports +from .robyn import Server, SocketHeld +from .argument_parser import ArgumentParser from .responses import static_file, jsonify from .dev_event_handler import EventHandler +from .processpool import spawn_process from .log_colors import Colors +# 3rd party imports and exports +from multiprocess import Process from watchdog.observers import Observer @@ -22,12 +28,14 @@ def __init__(self, file_object): self.file_path = file_object self.directory_path = directory_path self.server = Server(directory_path) - self.dev = self._is_dev() - - def _is_dev(self): - parser = argparse.ArgumentParser() - parser.add_argument('--dev', default=False, type=lambda x: (str(x).lower() == 'true')) - return parser.parse_args().dev + self.parser = ArgumentParser() + self.dev = self.parser.is_dev() + self.processes = self.parser.num_processes() + self.workers = self.parser.workers() + self.routes = [] + self.headers = [] + self.routes = [] + self.directories = [] def add_route(self, route_type, endpoint, handler): @@ -42,15 +50,15 @@ def add_route(self, route_type, endpoint, handler): """ We will add the status code here only """ number_of_params = len(signature(handler).parameters) - self.server.add_route( - route_type, endpoint, handler, asyncio.iscoroutinefunction(handler), number_of_params + self.routes.append( + ( route_type, endpoint, handler, asyncio.iscoroutinefunction(handler), number_of_params) ) def add_directory(self, route, directory_path, index_file=None, show_files_listing=False): - self.server.add_directory(route, directory_path, index_file, show_files_listing) + self.directories.append(( route, directory_path, index_file, show_files_listing )) def add_header(self, key, value): - self.server.add_header(key, value) + self.headers.append(( key, value )) def remove_header(self, key): self.server.remove_header(key) @@ -61,8 +69,18 @@ def start(self, url="127.0.0.1", port=5000): :param port [int]: [reperesents the port number at which the server is listening] """ + socket = SocketHeld(f"0.0.0.0:{port}", port) + workers = self.workers if not self.dev: - self.server.start(url, port) + for process_number in range(self.processes): + copied = socket.try_clone() + p = Process( + target=spawn_process, + args=(url, port, self.directories, self.headers, self.routes, copied, f"Process {process_number}", workers), + ) + p.start() + + input("Press Cntrl + C to stop \n") else: event_handler = EventHandler(self.file_path) event_handler.start_server_first_time() diff --git a/robyn/argument_parser.py b/robyn/argument_parser.py new file mode 100644 index 000000000..982ee18aa --- /dev/null +++ b/robyn/argument_parser.py @@ -0,0 +1,22 @@ +import argparse + +class ArgumentParser(argparse.ArgumentParser): + def __init__(self): + self.parser = argparse.ArgumentParser(description="Robyn, a fast async web framework with a rust runtime.") + self.parser.add_argument('--processes', type=int, default=1, required=False) + self.parser.add_argument('--workers', type=int, default=1, required=False) + self.parser.add_argument('--dev', default=False, type=lambda x: (str(x).lower() == 'true')) + self.args = self.parser.parse_args() + + def num_processes(self): + return self.args.processes + + def workers(self): + return self.args.workers + + def is_dev(self): + _is_dev = self.args.dev + if _is_dev and ( self.num_processes() != 1 or self.workers() != 1 ): + raise Exception("--processes and --workers shouldn't be used with --dev") + return _is_dev + diff --git a/robyn/processpool.py b/robyn/processpool.py new file mode 100644 index 000000000..07b247a90 --- /dev/null +++ b/robyn/processpool.py @@ -0,0 +1,55 @@ +from .robyn import Server + +import sys +import multiprocessing as mp +import asyncio +# import platform + + +mp.allow_connection_pickling() + + +def spawn_process(url, port, directories, headers, routes, socket, process_name, workers): + """ + This function is called by the main process handler to create a server runtime. + This functions allows one runtime per process. + + :param url string: the base url at which the server will listen + :param port string: the port at which the url will listen to + :param directories tuple: the list of all the directories and related data in a tuple + :param headers tuple: All the global headers in a tuple + :param routes tuple: The routes touple, containing the description about every route. + :param socket Socket: This is the main tcp socket, which is being shared across multiple processes. + :param process_name string: This is the name given to the process to identify the process + :param workers number: This is the name given to the process to identify the process + """ + # platform_name = platform.machine() + if sys.platform.startswith("win32") or sys.platform.startswith("linux-cross"): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + else: + # uv loop doesn't support windows or arm machines at the moment + # but uv loop is much faster than native asyncio + import uvloop + uvloop.install() + loop = uvloop.new_event_loop() + asyncio.set_event_loop(loop) + + server = Server() + + print(directories) + + for directory in directories: + route, directory_path, index_file, show_files_listing = directory + server.add_directory(route, directory_path, index_file, show_files_listing) + + for key, val in headers: + server.add_header(key, val) + + + for route in routes: + route_type, endpoint, handler, is_async, number_of_params = route + server.add_route(route_type, endpoint, handler, is_async, number_of_params) + + server.start(url, port, socket, process_name, workers) + asyncio.get_event_loop().run_forever() diff --git a/robyn/test-requirements.txt b/robyn/test-requirements.txt index 48c677e53..3977436a3 100644 --- a/robyn/test-requirements.txt +++ b/robyn/test-requirements.txt @@ -2,3 +2,5 @@ pytest==6.2.5 maturin watchdog requests==2.26.0 +uvloop==0.16.0 +multiprocess==0.70.12.2 diff --git a/src/lib.rs b/src/lib.rs index a07698593..8b68e660a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,11 @@ mod processor; mod router; mod server; +mod shared_socket; mod types; use server::Server; +use shared_socket::SocketHeld; // pyO3 module use pyo3::prelude::*; @@ -12,6 +14,7 @@ use pyo3::prelude::*; pub fn robyn(_py: Python<'_>, m: &PyModule) -> PyResult<()> { // the pymodule class to make the rustPyFunctions available m.add_class::()?; + m.add_class::()?; pyo3::prepare_freethreaded_python(); Ok(()) } diff --git a/src/router.rs b/src/router.rs index 5f3029ddb..da835a0c8 100644 --- a/src/router.rs +++ b/src/router.rs @@ -111,21 +111,3 @@ impl Router { } } } - -// #[cfg(test)] -// mod router_test { -// use super::*; -// #[test] -// fn test_no_route() { -// let router = Router::new(); -// assert_eq!(router.get_route(Method::GET, "/").is_none(), true); -// // let handler = Python::with_gil(|py| { -// // let dict = pyo3::types::PyDict::new(py); -// // assert!(dict.is_instance::().unwrap()); -// // let any: Py = dict.into_py(py); -// // any -// // }); -// // router.add_route("GET", "/", handler, false); -// // assert_eq!(router.get_route(Method::GET, "/").is_some(), true); -// } -// } diff --git a/src/server.rs b/src/server.rs index 9a74595be..625d99b2d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,18 +1,20 @@ use crate::processor::{apply_headers, handle_request}; use crate::router::Router; +use crate::shared_socket::SocketHeld; use crate::types::Headers; use actix_files::Files; +use std::convert::TryInto; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::{Arc, RwLock}; use std::thread; // pyO3 module +use actix_http::KeepAlive; use actix_web::*; use dashmap::DashMap; use pyo3::prelude::*; use pyo3::types::PyAny; -// hyper modules static STARTED: AtomicBool = AtomicBool::new(false); #[derive(Clone)] @@ -41,18 +43,35 @@ impl Server { } } - pub fn start(&mut self, py: Python, url: String, port: u16) { + pub fn start( + &mut self, + py: Python, + url: String, + port: u16, + socket: &PyCell, + name: String, + workers: usize, + ) -> PyResult<()> { if STARTED .compare_exchange(false, true, SeqCst, Relaxed) .is_err() { println!("Already running..."); - return; + return Ok(()); } + println!("{}", name); + + let borrow = socket.try_borrow_mut()?; + let held_socket: &SocketHeld = &*borrow; + + let raw_socket = held_socket.get_socket(); + println!("Got our socket {:?}", raw_socket); + let router = self.router.clone(); let headers = self.headers.clone(); let directories = self.directories.clone(); + let workers = Arc::new(workers); let asyncio = py.import("asyncio").unwrap(); @@ -67,6 +86,8 @@ impl Server { actix_web::rt::System::new().block_on(async move { let addr = format!("{}:{}", url, port); + println!("The number of workers are {}", workers.clone()); + HttpServer::new(move || { let mut app = App::new(); let event_loop_hdl = event_loop_hdl.clone(); @@ -103,7 +124,10 @@ impl Server { }) })) }) - .bind(addr) + .keep_alive(KeepAlive::Os) + .workers(*workers.clone()) + .client_timeout(0) + .listen(raw_socket.try_into().unwrap()) .unwrap() .run() .await @@ -112,6 +136,7 @@ impl Server { }); event_loop.call_method0("run_forever").unwrap(); + Ok(()) } pub fn add_directory( diff --git a/src/shared_socket.rs b/src/shared_socket.rs new file mode 100644 index 000000000..ed56bc27d --- /dev/null +++ b/src/shared_socket.rs @@ -0,0 +1,37 @@ +use pyo3::prelude::*; + +use socket2::{Domain, Protocol, Socket, Type}; +use std::net::SocketAddr; + +#[pyclass] +#[derive(Debug)] +pub struct SocketHeld { + pub socket: Socket, +} + +#[pymethods] +impl SocketHeld { + #[new] + pub fn new(address: String, port: i32) -> PyResult { + let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?; + println!("{}", address); + let address: SocketAddr = address.parse()?; + socket.set_reuse_address(true)?; + //socket.set_reuse_port(true)?; + socket.bind(&address.into())?; + socket.listen(1024)?; + + Ok(SocketHeld { socket }) + } + + pub fn try_clone(&self) -> PyResult { + let copied = self.socket.try_clone()?; + Ok(SocketHeld { socket: copied }) + } +} + +impl SocketHeld { + pub fn get_socket(&self) -> Socket { + self.socket.try_clone().unwrap() + } +}