diff --git a/Cargo.lock b/Cargo.lock index 846abe6..6ee3a83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,6 +41,18 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.85" @@ -277,9 +289,11 @@ dependencies = [ name = "examples" version = "0.0.0" dependencies = [ + "anyhow", "aws-sign-v4", "chrono", "http", + "hyper", "libc", "nginx-sys", "ngx", @@ -308,6 +322,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -350,6 +376,45 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -367,8 +432,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -400,6 +467,41 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "httparse" +version = "1.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" + +[[package]] +name = "hyper" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http", + "http-body", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "iana-time-zone" version = "0.1.61" @@ -546,6 +648,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "nginx-sys" version = "0.5.0" @@ -566,10 +677,15 @@ dependencies = [ name = "ngx" version = "0.5.0" dependencies = [ + "async-task", + "bytes", "errno", + "flume", "foreign-types", + "http-body", "libc", "nginx-sys", + "pin-project-lite", "target-triple", ] @@ -652,6 +768,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "prettyplease" version = "0.2.27" @@ -866,6 +988,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "subtle" @@ -930,6 +1055,12 @@ dependencies = [ "syn", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.17.0" @@ -987,6 +1118,15 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 10b6911..6cc74ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,13 +25,26 @@ repository.workspace = true rust-version.workspace = true [dependencies] +async-task = { version = "4.7.1", optional = true } +bytes = { version = "1.8.0", optional = true } errno = "0.3.9" +flume = { version = "0.11.0", optional = true } foreign-types = "0.5.0" +http-body = { version = "1.0.1", optional = true } libc = "0.2.169" nginx-sys = { path = "nginx-sys", default-features=false, version = "0.5.0"} +pin-project-lite = { version = "0.2.15", optional = true } [features] -default = ["vendored","std"] +default = ["async", "vendored","std"] +async = [ + "std", + "dep:async-task", + "dep:bytes", + "dep:flume", + "dep:http-body", + "dep:pin-project-lite", +] # Enables the components using memory allocation. # If no `std` flag, `alloc` crate is internally used instead. This flag is mainly for `no_std` build. alloc = [] diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 32d8bba..97d4e39 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -14,14 +14,16 @@ build = "../build.rs" [dependencies] nginx-sys = { path = "../nginx-sys/", default-features = false } -ngx = { path = "../", default-features = false, features = ["std"] } +ngx = { path = "../", default-features = false, features = ["async", "std"] } [dev-dependencies] +anyhow = "1.0.89" aws-sign-v4 = "0.3.0" chrono = "0.4.23" http = "1.1.0" libc = "0.2.140" tokio = { version = "1.33.0", features = ["full"] } +hyper = { version = "1.5.0", features = ["http1", "client"] } [[example]] name = "curl" @@ -49,6 +51,11 @@ name = "async" path = "async.rs" crate-type = ["cdylib"] +[[example]] +name = "async_ngx" +path = "async_ngx.rs" +crate-type = ["cdylib"] + [features] default = ["export-modules", "ngx/vendored"] # Generate `ngx_modules` table with module exports diff --git a/examples/async_ngx.rs b/examples/async_ngx.rs new file mode 100644 index 0000000..b3bbfbb --- /dev/null +++ b/examples/async_ngx.rs @@ -0,0 +1,602 @@ +#![allow(unused)] + +use std::os::raw::{c_char, c_void}; +use std::pin::Pin; +use std::ptr::addr_of; +use std::task::Poll; + +use anyhow::Result; + +use ngx::core::{self, NgxConfRef}; +use ngx::ffi::{ + self, nginx_version, ngx_chain_t, ngx_command_t, ngx_conf_t, ngx_http_core_module, ngx_http_module_t, + ngx_http_request_t, ngx_int_t, ngx_module_t, ngx_uint_t, ngx_url_t, NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, + NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_RS_MODULE_SIGNATURE, +}; +use ngx::http::{ngx_http_conf_get_module_loc_conf, HTTPModule, MergeConfigError}; +use ngx::r#async::{self as ngx_async, http::RequestBody}; +use ngx::{ngx_log_debug, ngx_log_debug_http, ngx_string, ForeignTypeRef}; + +struct Module; + +impl ngx::http::HTTPModule for Module { + type MainConf = (); + type SrvConf = (); + type LocConf = ModuleConfig; +} + +struct ModuleConfig { + endpoint: ngx_url_t, +} + +impl Default for ModuleConfig { + fn default() -> Self { + ModuleConfig { + endpoint: unsafe { std::mem::zeroed() }, + } + } +} + +impl ngx::http::Merge for ModuleConfig { + fn merge(&mut self, prev: &ModuleConfig) -> Result<(), MergeConfigError> { + if self.endpoint.url.is_empty() { + self.endpoint = prev.endpoint; + } + Ok(()) + } +} + +static mut NGX_HTTP_ASYNC_NGX_COMMANDS: [ngx_command_t; 2] = [ + ngx_command_t { + name: ngx_string!("async_pass"), + type_: (NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1) as ngx_uint_t, + set: Some(ngx_http_async_commands_async_pass), + conf: NGX_HTTP_LOC_CONF_OFFSET, + offset: 0, + post: std::ptr::null_mut(), + }, + ngx_command_t::empty(), +]; + +static NGX_HTTP_ASYNC_NGX_MODULE_CTX: ngx_http_module_t = ngx_http_module_t { + preconfiguration: Some(Module::preconfiguration), + postconfiguration: Some(Module::postconfiguration), + create_main_conf: Some(Module::create_main_conf), + init_main_conf: Some(Module::init_main_conf), + create_srv_conf: Some(Module::create_srv_conf), + merge_srv_conf: Some(Module::merge_srv_conf), + create_loc_conf: Some(Module::create_loc_conf), + merge_loc_conf: Some(Module::merge_loc_conf), +}; + +// Generate the `ngx_modules` table with exported modules. +// This feature is required to build a 'cdylib' dynamic module outside of the NGINX buildsystem. +#[cfg(feature = "export-modules")] +ngx::ngx_modules!(ngx_http_async_ngx_module); + +#[used] +#[allow(non_upper_case_globals)] +#[cfg_attr(not(feature = "export-modules"), no_mangle)] +pub static mut ngx_http_async_ngx_module: ngx_module_t = ngx_module_t { + ctx: std::ptr::addr_of!(NGX_HTTP_ASYNC_NGX_MODULE_CTX).cast_mut().cast(), + #[allow(static_mut_refs)] + commands: unsafe { NGX_HTTP_ASYNC_NGX_COMMANDS.as_mut_ptr() }, + type_: NGX_HTTP_MODULE as ngx_uint_t, + + ..ngx_module_t::default() +}; + +fn to_request_builder(request: &ngx::http::Request) -> Result { + let method = http::Method::try_from(request.method().as_str()).expect("supported method"); + + // SAFETY: server config block for ngx_http_core_module always exists and is always ngx_http_core_srv_conf_t + let cscf = unsafe { + request + .get_module_srv_conf::(&*addr_of!(ngx::ffi::ngx_http_core_module)) + .unwrap() + }; + + let authority = if request.get_inner().headers_in.server.len > 0 { + request.get_inner().headers_in.server + } else { + cscf.server_name + }; + + let scheme = if request.get_inner().schema.len > 0 { + request.get_inner().schema.to_str() + } else { + "http" + }; + + ngx_log_debug_http!(request, "authority: {}", authority.to_str()); + + let uri = request.unparsed_uri(); + let uri = http::Uri::builder() + .scheme(scheme) + .authority(authority.as_bytes()) + .path_and_query(uri.as_bytes()) + .build()?; + + let ver = match request.get_inner().http_version as u32 { + ngx::ffi::NGX_HTTP_VERSION_9 => http::Version::HTTP_09, + ngx::ffi::NGX_HTTP_VERSION_10 => http::Version::HTTP_10, + ngx::ffi::NGX_HTTP_VERSION_11 => http::Version::HTTP_11, + ngx::ffi::NGX_HTTP_VERSION_20 => http::Version::HTTP_2, + ngx::ffi::NGX_HTTP_VERSION_30 => http::Version::HTTP_3, + _ => unreachable!("unsupported HTTP version"), + }; + + let mut req = http::Request::builder().method(method).uri(uri).version(ver); + for header in request.headers_in_iterator() { + let hn = http::HeaderName::try_from(header.key.as_bytes())?; + let hv = http::HeaderValue::try_from(header.value.as_bytes())?; + req = req.header(hn, hv); + } + + Ok(req) +} + +extern "C" fn ngx_http_async_location_handler(r: *mut ngx_http_request_t) -> ngx_int_t { + let r = unsafe { ngx::http::Request::from_ptr_mut(r) }; + let lcf = unsafe { r.get_module_loc_conf::(&*addr_of!(ngx_http_async_ngx_module)) }; + let lcf = lcf.expect("module config is none"); + + ngx_log_debug_http!(r, "async handler, enabled:{}", !lcf.endpoint.url.is_empty()); + + if lcf.endpoint.url.is_empty() { + return ngx::ffi::NGX_DECLINED as ngx_int_t; + } + + unsafe { ngx::ffi::ngx_http_read_client_request_body(r.as_ptr(), Some(ngx_http_async_req_body_handler)) } +} + +struct Timer { + ev: ngx::core::Event, + waker: Option, +} + +impl Timer { + pub fn new(c: *mut ffi::ngx_connection_t) -> Self { + let mut this = Self { + ev: unsafe { std::mem::zeroed() }, + waker: None, + }; + + this.ev.data = c.cast(); + this.ev.log = unsafe { *c }.log; + this.ev.set_cancelable(1); + this.ev.handler = Some(Timer::timer_handler); + + this + } + + unsafe extern "C" fn timer_handler(ev: *mut ffi::ngx_event_t) { + let off = std::mem::offset_of!(Timer, ev) as isize; + let timer = ev.offset(-off).cast::(); + + if let Some(waker) = (*timer).waker.take() { + waker.wake(); + } + } + + pub fn poll_sleep( + self: &mut Pin<&mut Self>, + duration: ffi::ngx_msec_t, + context: &mut std::task::Context<'_>, + ) -> Poll> { + if self.ev.timedout() != 0 { + Poll::Ready(Ok(())) + } else if self.ev.timer_set() != 0 { + Poll::Pending + } else { + self.ev.add_timer(duration); + self.waker = Some(context.waker().clone()); + Poll::Pending + } + } +} + +struct ModuleContext { + #[allow(dead_code)] + task: ngx_async::runtime::Task<()>, +} + +unsafe extern "C" fn ngx_http_async_req_body_handler(r: *mut ngx_http_request_t) { + let req = unsafe { ngx::http::Request::from_ptr_mut(r) }; + ngx_log_debug_http!(req, "async req body handler"); + + let lcf = unsafe { req.get_module_loc_conf::(&*addr_of!(ngx_http_async_ngx_module)) }; + let lcf = lcf.expect("module loc conf"); + + if let Some(rb) = unsafe { (*req.as_ptr()).request_body.as_mut() } { + unsafe { + ffi::ngx_log_error_core( + ffi::NGX_LOG_INFO as usize, + req.log(), + 0, + c"request body read: %p %p %L".as_ptr(), + rb.buf, + rb.bufs, + rb.received, + ) + }; + } + + let task = ngx_async::runtime::spawn(async move { + let req = unsafe { ngx::http::Request::from_ptr_mut(r) }; + let pool = req.pool().as_ptr(); + let mut peer = Box::pin(NgxPeerConnection::default()); + + if let Err(err) = async { + std::future::poll_fn(|cx| peer.as_mut().poll_connect(pool, &lcf.endpoint, cx)).await?; + let (mut sender, conn) = hyper::client::conn::http1::handshake(peer).await?; + + let _conn = ngx_async::runtime::spawn(async move { + if let Err(err) = conn.await { + ngx_log_debug!((*pool).log, "connection failed: {:?}", err); + } + })?; + + /* + ngx_log_debug_http!(req, "sleeping"); + + let mut timer = std::pin::pin!(Timer::new(req.connection())); + std::future::poll_fn(|cx| timer.poll_sleep(1000, cx)).await?; + + ngx_log_debug_http!(req, "sleep done"); + */ + + let mut body = RequestBody::default(); + body.update(unsafe { &mut *req.as_ptr() }); + + let http_req = to_request_builder(req)?.body(body)?; + + let response = sender.send_request(http_req).await?; + + let status = ngx::http::HTTPStatus::from_u16(response.status().as_u16()).expect("valid status code"); + req.set_status(status); + + for (name, value) in response.headers() { + // always in lower case + match name.as_str() { + "content-length" => { + let value = value.to_str().unwrap_or_default(); + if let Ok(len) = value.parse::() { + req.set_content_length_n(len); + } + } + _ => { + req.add_header_out(name.as_str(), value); + } + } + } + + let rc = req.send_header(); + if rc == core::Status::NGX_ERROR || rc > core::Status::NGX_OK || req.header_only() { + anyhow::bail!("header send failed"); + } + + use hyper::body::Body; + + let mut body = std::pin::pin!(response.into_body()); + + while let Some(res) = std::future::poll_fn(|cx| body.as_mut().poll_frame(cx)).await { + if let Ok(data) = res?.into_data() { + assert_eq!(write_buf(req, &data, body.is_end_stream()), 0); + } + } + + unsafe { ngx::ffi::ngx_http_finalize_request(req.as_ptr(), ngx::core::Status::NGX_OK.into()) }; + + Ok::<_, anyhow::Error>(()) + } + .await + { + ngx_log_debug_http!(req, "request failed: {:?}", err); + unsafe { + ngx::ffi::ngx_http_finalize_request( + req.as_ptr(), + ngx::http::HTTPStatus::INTERNAL_SERVER_ERROR.0 as ngx_int_t, + ) + }; + } + }) + .expect("task"); + + if !task.is_finished() { + let ctx = ModuleContext { task }; + let ctx = req.pool().allocate::(ctx); + req.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_ngx_module) }); + + ngx_log_debug_http!(req, "save task to ctx"); + } +} + +fn write_buf(r: &mut ngx::http::Request, data: &[u8], last: bool) -> isize { + let buf = r.pool().alloc_type_zeroed::(); + let last = if last { 1 } else { 0 }; + unsafe { + (*buf).set_memory(1); + (*buf).set_last_buf(last); + (*buf).set_last_in_chain(last); + (*buf).start = data.as_ptr() as *mut u8; + (*buf).end = (*buf).start.add(data.len()); + (*buf).pos = (*buf).start; + (*buf).last = (*buf).end; + } + let mut chain = ngx_chain_t { + buf, + next: std::ptr::null_mut(), + }; + unsafe { + ngx::ffi::ngx_http_output_filter(r.as_ptr(), &mut chain); + } + + unsafe { (*buf).last.offset_from((*buf).pos) } +} + +struct NgxPeerConnection { + pub pc: ffi::ngx_peer_connection_t, + pub cev: Option, + pub rev: Option, + pub wev: Option, +} + +impl Default for NgxPeerConnection { + fn default() -> Self { + unsafe { std::mem::zeroed() } + } +} + +unsafe extern "C" fn ngx_peer_conn_connect_handler(ev: *mut ffi::ngx_event_t) { + let ev = ngx::core::EventRef::from_ptr_mut(ev); + + ngx_log_debug!(ev.log, "connect handler"); + + let c: &mut ffi::ngx_connection_t = &mut *ev.data.cast(); + let this: &mut NgxPeerConnection = &mut *c.data.cast(); + + if let Some(waker) = this.cev.take() { + waker.wake(); + } + /* + if !ev.write { + ffi::ngx_handle_read_event() + ngx_handle_write_event + } + */ +} + +unsafe extern "C" fn ngx_peer_conn_read_handler(ev: *mut ffi::ngx_event_t) { + ngx_log_debug!((*ev).log, "read handler"); + + let c: *mut ffi::ngx_connection_t = (*ev).data.cast(); + let this: *mut NgxPeerConnection = (*c).data.cast(); + + if let Some(waker) = (*this).rev.take() { + waker.wake(); + } +} + +unsafe extern "C" fn ngx_peer_conn_write_handler(ev: *mut ffi::ngx_event_t) { + ngx_log_debug!((*ev).log, "write handler"); + + let c: *mut ffi::ngx_connection_t = (*ev).data.cast(); + let this: *mut NgxPeerConnection = (*c).data.cast(); + + if let Some(waker) = (*this).wev.take() { + waker.wake(); + } +} + +impl hyper::rt::Read for NgxPeerConnection { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let c = self.connection().unwrap(); + + if c.read().timedout() != 0 { + return Poll::Ready(Err(std::io::ErrorKind::TimedOut.into())); + } + + let n = c.recv(unsafe { buf.as_mut() }); + + if n == ffi::NGX_ERROR as isize { + return Poll::Ready(Err(std::io::Error::last_os_error())); + } + + let rev = c.read(); + + if rev.handle_read(0).is_err() { + return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())); + } + + if rev.active() != 0 { + rev.add_timer(5000); + } else if rev.timer_set() != 0 { + rev.del_timer(); + } + + if n == ffi::NGX_AGAIN as isize { + self.rev = Some(cx.waker().clone()); + return Poll::Pending; + } + + if n > 0 { + unsafe { buf.advance(n as _) }; + } + + Poll::Ready(Ok(())) + } +} + +impl hyper::rt::Write for NgxPeerConnection { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + let c = self.connection().unwrap(); + let n = c.send(buf); + + ngx_log_debug!(c.log, "sent: {n}"); + + if n == ffi::NGX_AGAIN as ngx_int_t { + self.wev = Some(cx.waker().clone()); + Poll::Pending + } else if n > 0 { + Poll::Ready(Ok(n as usize)) + } else { + Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())) + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + if let Some(c) = self.connection() { + c.shutdown(libc::SHUT_WR); + } + Poll::Ready(Ok(())) + } +} + +impl NgxPeerConnection { + pub fn connect(&mut self, pool: *mut ffi::ngx_pool_t, url: &ngx_url_t) -> core::Status { + let addrs = unsafe { std::slice::from_raw_parts(url.addrs, url.naddrs) }; + assert!(!addrs.is_empty()); + + let pc = &mut self.pc; + + pc.sockaddr = addrs[0].sockaddr; + pc.socklen = addrs[0].socklen; + pc.name = std::ptr::addr_of!(addrs[0].name).cast_mut(); + pc.get = Some(ffi::ngx_event_get_peer); + pc.log = unsafe { *pool }.log; + pc.set_log_error(1); // FIXME + + let rc = unsafe { ffi::ngx_event_connect_peer(pc) }; + let rc = core::Status(rc); + + if rc == core::Status::NGX_ERROR || rc == core::Status::NGX_BUSY || rc == core::Status::NGX_DECLINED { + ngx_log_debug!(unsafe { (*pool).log }, "connect failed"); + return rc; + } + + let c = unsafe { &mut *pc.connection }; + c.pool = pool; + c.data = std::ptr::from_mut(self).cast(); + + unsafe { *c.read }.handler = Some(ngx_peer_conn_read_handler); + unsafe { *c.write }.handler = Some(ngx_peer_conn_write_handler); + + rc + } + + pub fn poll_connect( + self: &mut Pin<&mut Self>, + pool: *mut ffi::ngx_pool_t, + url: &ngx_url_t, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + if let Some(c) = self.connection() { + ngx_log_debug!(c.log, "connect callback"); + + if c.read().timedout() != 0 || c.write().timedout() != 0 { + c.close(); + return Poll::Ready(Err(std::io::ErrorKind::TimedOut.into())); + } + + if let Err(err) = c.test_connect() { + return Poll::Ready(Err(std::io::Error::from_raw_os_error(err))); + } + + c.read().handler = Some(ngx_peer_conn_read_handler); + c.write().handler = Some(ngx_peer_conn_write_handler); + + return Poll::Ready(Ok(())); + } + + match self.connect(pool, url) { + core::Status::NGX_OK => Poll::Ready(Ok(())), + core::Status::NGX_ERROR | core::Status::NGX_BUSY | core::Status::NGX_DECLINED => { + Poll::Ready(Err(std::io::ErrorKind::ConnectionRefused.into())) + } + core::Status::NGX_AGAIN => { + let c = self.connection().unwrap(); + ngx_log_debug!(c.log, "connect returned NGX_AGAIN"); + c.read().handler = Some(ngx_peer_conn_connect_handler); + c.read().set_timer_set(5000); + c.write().handler = Some(ngx_peer_conn_connect_handler); + self.cev = Some(cx.waker().clone()); + + Poll::Pending + } + _ => unreachable!("should not be here"), + } + } + + pub fn connection(&mut self) -> Option<&mut ngx::core::Connection> { + if self.pc.connection.is_null() { + None + } else { + Some(unsafe { ngx::core::Connection::from_ptr_mut(self.pc.connection) }) + } + } +} + +impl Drop for NgxPeerConnection { + fn drop(&mut self) { + ngx_log_debug!(unsafe { (*ffi::ngx_cycle).log }, "closing peer connection"); + if let Some(c) = self.connection() { + c.close() + } + } +} + +extern "C" fn ngx_http_async_commands_async_pass( + cf: *mut ngx_conf_t, + _cmd: *mut ngx_command_t, + conf: *mut c_void, +) -> *mut c_char { + let clcf = unsafe { &mut *ngx_http_conf_get_module_loc_conf(cf, &*addr_of!(ngx_http_core_module)) }; + let lcf = unsafe { &mut *(conf as *mut ModuleConfig) }; + let cf = unsafe { NgxConfRef::from_ptr(cf) }; + + if !lcf.endpoint.url.is_empty() { + return "is duplicate\0".as_ptr() as *const c_char as *mut _; + } + + clcf.handler = Some(ngx_http_async_location_handler); + + lcf.endpoint.url = *cf.args()[1].as_ref(); + lcf.endpoint.default_port = 80; + + let rc = unsafe { ffi::ngx_parse_url(cf.pool, &mut lcf.endpoint) }; + if rc != ffi::NGX_OK as isize && !lcf.endpoint.err.is_null() { + unsafe { + ffi::ngx_conf_log_error( + ffi::NGX_LOG_EMERG as usize, + cf.as_ptr(), + 0, + c"%s in resolver \"%V\"".as_ptr(), + lcf.endpoint.err, + &lcf.endpoint.url, + ) + }; + + return core::NGX_CONF_ERROR as _; + } + + std::ptr::null_mut() +} diff --git a/examples/config b/examples/config index 744825c..ca807d4 100644 --- a/examples/config +++ b/examples/config @@ -23,6 +23,14 @@ if [ $HTTP = YES ]; then ngx_rust_module fi + if :; then + ngx_module_name=ngx_http_async_ngx_module + ngx_module_libs="-lm" + ngx_rust_target_name=async_ngx + + ngx_rust_module + fi + if :; then ngx_module_name=ngx_http_awssigv4_module ngx_module_libs="-lm" diff --git a/examples/t/async_ngx.t b/examples/t/async_ngx.t new file mode 100644 index 0000000..71aca12 --- /dev/null +++ b/examples/t/async_ngx.t @@ -0,0 +1,275 @@ +#!/usr/bin/perl + +# (C) Nginx, Inc. + +# Tests for http resolver. + +############################################################################### + +use warnings; +use strict; + +use Test::More; + +use IO::Select; +use Socket qw/ CRLF /; + +BEGIN { use FindBin; chdir($FindBin::Bin); } + +use lib 'lib'; +use Test::Nginx qw/ :DEFAULT http_end /; + +############################################################################### + +select STDERR; $| = 1; +select STDOUT; $| = 1; + +my $t = Test::Nginx->new()->has(qw/http proxy rewrite/); + +$t->write_file_expand('nginx.conf', <<'EOF'); + +%%TEST_GLOBALS%% + +daemon off; + +events { +} + +http { + %%TEST_GLOBALS_HTTP%% + + server { + listen 127.0.0.1:8080; + server_name localhost; + + location / { + resolver 127.0.0.1:%%PORT_8981_UDP%%; + resolver_timeout 1s; + + async_pass 127.0.0.1:%%PORT_8081%%; + } + } +} + +EOF + +$t->run_daemon(\&dns_daemon, port(8981), $t); +$t->run_daemon(\&http_daemon, port(8081)); + +$t->run()->plan(1); + +$t->waitforfile($t->testdir . '/' . port(8981)); + +############################################################################### + +# like(get('example.com', '/'), qr/200 OK/, 'get'); + +like(http_post('example.com', '/', "Hello World!"), qr/200 OK/, 'post'); + +############################################################################### + +sub get { + my ($host, $uri, %extra) = @_; + return http(< 0; + + use constant A => 1; + use constant CNAME => 5; + use constant AAAA => 28; + + use constant IN => 1; + + # default values + + my ($hdr, $rcode, $ttl) = (0x8180, NOERROR, 3600); + + # decode name + + my ($len, $offset) = (undef, 12); + while (1) { + $len = unpack("\@$offset C", $recv_data); + last if $len == 0; + $offset++; + push @name, unpack("\@$offset A$len", $recv_data); + $offset += $len; + } + + $offset -= 1; + my ($id, $type, $class) = unpack("n x$offset n2", $recv_data); + + my $name = join('.', @name); + if (($name eq 'example.com')) { + if ($type == A || $type == CNAME) { + push @rdata, rd_addr($ttl, '127.0.0.1'); + } + if ($type = AAAA) { + push @rdata, rd_addr6($ttl, "::1"); + } + } + + $len = @name; + pack("n6 (C/a*)$len x n2", $id, $hdr | $rcode, 1, scalar @rdata, + 0, 0, @name, $type, $class) . join('', @rdata); +} + +sub rd_addr { + my ($ttl, $addr) = @_; + + my $code = 'split(/\./, $addr)'; + + pack 'n3N nC4', 0xc00c, A, IN, $ttl, eval "scalar $code", eval($code); +} + +sub expand_ip6 { + my ($addr) = @_; + + substr ($addr, index($addr, "::"), 2) = + join "0", map { ":" } (0 .. 8 - (split /:/, $addr) + 1); + map { hex "0" x (4 - length $_) . "$_" } split /:/, $addr; +} + +sub rd_addr6 { + my ($ttl, $addr) = @_; + + pack 'n3N nn8', 0xc00c, AAAA, IN, $ttl, 16, expand_ip6($addr); +} + +sub dns_daemon { + my ($port, $t, %extra) = @_; + + my ($data, $recv_data); + my $socket = IO::Socket::INET->new( + LocalAddr => '127.0.0.1', + LocalPort => $port, + Proto => 'udp', + ) + or die "Can't create listening socket: $!\n"; + + my $sel = IO::Select->new($socket); + my $tcp = 0; + + if ($extra{tcp}) { + $tcp = port(8983, socket => 1); + $sel->add($tcp); + } + + local $SIG{PIPE} = 'IGNORE'; + + # track number of relevant queries + + my %state = ( + cnamecnt => 0, + twocnt => 0, + ttlcnt => 0, + ttl0cnt => 0, + cttlcnt => 0, + cttl2cnt => 0, + manycnt => 0, + casecnt => 0, + idcnt => 0, + fecnt => 0, + ); + + # signal we are ready + + open my $fh, '>', $t->testdir() . '/' . $port; + close $fh; + + while (my @ready = $sel->can_read) { + foreach my $fh (@ready) { + if ($tcp == $fh) { + my $new = $fh->accept; + $new->autoflush(1); + $sel->add($new); + + } elsif ($socket == $fh) { + $fh->recv($recv_data, 65536); + $data = reply_handler($recv_data, $port, + \%state); + $fh->send($data); + + } else { + $fh->recv($recv_data, 65536); + unless (length $recv_data) { + $sel->remove($fh); + $fh->close; + next; + } + +again: + my $len = unpack("n", $recv_data); + $data = substr $recv_data, 2, $len; + $data = reply_handler($data, $port, \%state, + tcp => 1); + $data = pack("n", length $data) . $data; + $fh->send($data); + $recv_data = substr $recv_data, 2 + $len; + goto again if length $recv_data; + } + } + } +} + +sub http_daemon { + my ($port) = @_; + my $server = IO::Socket::INET->new( + Proto => 'tcp', + LocalHost => '127.0.0.1', + LocalPort => $port, + Listen => 5, + Reuse => 1 + ) + or die "Can't create listening socket: $!\n"; + + local $SIG{PIPE} = 'IGNORE'; + + while (my $client = $server->accept()) { + $client->autoflush(1); + + my $headers = ''; + + while (<$client>) { + $headers .= $_; + last if (/^\x0d?\x0a?$/); + } + + my $body = "TEST-OK-IF-YOU-SEE-THIS"; + + my $p = "HTTP/1.1 200 OK" . CRLF . + "Connection: close" . CRLF . + "Content-Length: " . length($body) . CRLF . CRLF; + + print $client $p; + print $client $body unless $headers =~ /^HEAD/i; + + close $client; + } +} + +############################################################################### diff --git a/src/async/http.rs b/src/async/http.rs new file mode 100644 index 0000000..a4f6ac9 --- /dev/null +++ b/src/async/http.rs @@ -0,0 +1,121 @@ +use std::task::Poll; + +use foreign_types::{ForeignType, ForeignTypeRef}; +use http_body::Frame; +use pin_project_lite::pin_project; + +use crate::ffi::{ngx_buf_t, ngx_chain_t, ngx_http_request_body_t, ngx_http_request_t}; + +foreign_types::foreign_type! { + /// Wrapper struct for an [`ngx_buf_t`] + pub unsafe type Buf: Send { + type CType = ngx_buf_t; + // No cleanup required for pool-allocated structs + fn drop = |_|(); + } +} + +impl AsRef for Buf { + fn as_ref(&self) -> &ngx_buf_t { + unsafe { &*self.as_ptr() } + } +} + +impl AsMut for Buf { + fn as_mut(&mut self) -> &mut ngx_buf_t { + unsafe { &mut *self.as_ptr() } + } +} + +impl AsRef for BufRef { + fn as_ref(&self) -> &ngx_buf_t { + unsafe { &*self.as_ptr() } + } +} + +impl AsMut for BufRef { + fn as_mut(&mut self) -> &mut ngx_buf_t { + unsafe { &mut *self.as_ptr() } + } +} + +impl bytes::Buf for Buf { + fn remaining(&self) -> usize { + let buf: &ngx_buf_t = self.as_ref(); + unsafe { buf.last.offset_from(buf.pos) as usize } + } + + fn chunk(&self) -> &[u8] { + let buf: &ngx_buf_t = self.as_ref(); + unsafe { std::slice::from_raw_parts(buf.pos, self.remaining()) } + } + + fn advance(&mut self, cnt: usize) { + let buf: &mut ngx_buf_t = self.as_mut(); + unsafe { buf.pos = buf.pos.add(cnt) }; + } +} + +pin_project! { +pub struct RequestBody { + request_body: *mut ngx_http_request_body_t, + chain: *mut ngx_chain_t, + waker: Option, +} +} + +impl Default for RequestBody { + fn default() -> Self { + RequestBody { + request_body: std::ptr::null_mut(), + chain: std::ptr::null_mut(), + waker: None, + } + } +} + +impl RequestBody { + pub fn update(&mut self, r: &mut ngx_http_request_t) { + self.request_body = r.request_body; + self.chain = unsafe { (*self.request_body).bufs }; + + if let Some(waker) = self.waker.take() { + waker.wake() + } + } +} + +impl http_body::Body for RequestBody { + type Data = Buf; + type Error = std::io::Error; + + fn poll_frame( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>>> { + let this = self.project(); + + if this.request_body.is_null() { + *this.waker = Some(cx.waker().clone()); + return Poll::Pending; + } + + if this.chain.is_null() { + return Poll::Ready(None); + } + + let cl = unsafe { **this.chain }; + *this.chain = cl.next; + + let buf = unsafe { Buf::from_ptr(cl.buf) }; + Poll::Ready(Some(Ok(Frame::data(buf)))) + } + + fn is_end_stream(&self) -> bool { + !self.request_body.is_null() && self.chain.is_null() + } + + fn size_hint(&self) -> http_body::SizeHint { + http_body::SizeHint::default() + } +} diff --git a/src/async/mod.rs b/src/async/mod.rs new file mode 100644 index 0000000..6a770e4 --- /dev/null +++ b/src/async/mod.rs @@ -0,0 +1,3 @@ +#![allow(missing_docs)] +pub mod http; +pub mod runtime; diff --git a/src/async/runtime.rs b/src/async/runtime.rs new file mode 100644 index 0000000..b148c52 --- /dev/null +++ b/src/async/runtime.rs @@ -0,0 +1,69 @@ +use std::future::Future; +use std::panic::catch_unwind; +use std::thread_local; + +use async_task::{Runnable, ScheduleInfo, WithInfo}; +use flume::{Receiver, Sender}; + +use crate::core::Event; +use crate::ffi::{ngx_cycle, ngx_event_t, ngx_posted_next_events}; +use crate::ngx_log_debug; + +pub use async_task::Task; + +#[derive(Debug)] +pub struct RuntimeError; + +impl std::error::Error for RuntimeError {} + +impl std::fmt::Display for RuntimeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Error: task panicked") + } +} + +thread_local! { + static QUEUE: (Sender, Receiver) = flume::unbounded(); + static EVENT: std::cell::UnsafeCell = Event::default().into(); +} + +fn schedule(runnable: Runnable, info: ScheduleInfo) { + if info.woken_while_running { + QUEUE.with(|(s, _)| s.send(runnable).unwrap()); + // FIXME: attach pinned ngx_event_t to the task to avoid using flume + EVENT.with(|ev| unsafe { + let ev = &mut *ev.get(); + ev.handler = Some(ngx_async_posted_event_handler); + ev.post_event(std::ptr::addr_of_mut!(ngx_posted_next_events)); + }); + ngx_log_debug!(unsafe { (*ngx_cycle).log }, "task woken while running"); + } else if let Err(err) = catch_unwind(|| runnable.run()) { + ngx_log_debug!(unsafe { (*ngx_cycle).log }, "runtime::run failed {:?}", err); + } +} + +pub fn spawn(future: F) -> Result, RuntimeError> +where + F: Future + 'static, + T: 'static, +{ + ngx_log_debug!(unsafe { (*ngx_cycle).log }, "runtime::spawn enter"); + let scheduler = WithInfo(schedule); + let (runnable, task) = async_task::spawn_local(future, scheduler); + + runnable.schedule(); + + ngx_log_debug!(unsafe { (*ngx_cycle).log }, "runtime::spawn exit"); + Ok(task) +} + +extern "C" fn ngx_async_posted_event_handler(_ev: *mut ngx_event_t) { + ngx_log_debug!(unsafe { (*ngx_cycle).log }, "runtime::step enter"); + QUEUE.with(|(_, receiver)| { + while let Ok(runnable) = receiver.try_recv() { + ngx_log_debug!(unsafe { (*ngx_cycle).log }, "runtime::step iter"); + let _ignore = catch_unwind(|| runnable.run()); + } + }); + ngx_log_debug!(unsafe { (*ngx_cycle).log }, "runtime::step exit"); +} diff --git a/src/async/sleep.rs b/src/async/sleep.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/async/time.rs b/src/async/time.rs new file mode 100644 index 0000000..86e03c6 --- /dev/null +++ b/src/async/time.rs @@ -0,0 +1,86 @@ +use std::ptr::addr_of_mut; +use std::time::{Duration, Instant}; + +use ngx::ffi::{ngx_current_msec, ngx_event_t, ngx_msec_int_t, ngx_msec_t}; +use pin_project_lite::pin_project; + +/// Waits until `duration` has elapsed +pub fn sleep(duration: Duration) -> Sleep { + let deadline = Instant::now() + duration; + + Sleep::new_timeout(deadline) +} + +#[inline] +pub fn ngx_add_timer(event: &mut ngx_event_t, timer: ngx_msec_t) { + let key = unsafe { ngx_current_msec as ngx_msec_int_t } + timer as ngx_msec_int_t; + + if event.timer_set() != 0 { + /* + * Use a previous timer value if difference between it and a new + * value is less than NGX_TIMER_LAZY_DELAY milliseconds: this allows + * to minimize the rbtree operations for fast connections. + */ + + let diff = key - event.timer.key as ngx_msec_int_t; + + if diff.abs() < ngx::ffi::NGX_TIMER_LAZY_DELAY as ngx_msec_int_t { + return; + } + + ngx_del_timer(event); + } + + event.timer.key = key as ngx_msec_t; + + unsafe { + ngx::ffi::ngx_rbtree_insert( + addr_of_mut!(ngx::ffi::ngx_event_timer_rbtree), + &mut event.timer, + ); + } + + event.set_timer_set(1); +} + +#[inline] +pub fn ngx_del_timer(event: &mut ngx_event_t) { + unsafe { + ngx::ffi::ngx_rbtree_delete( + addr_of_mut!(ngx::ffi::ngx_event_timer_rbtree), + &mut event.timer, + ) + }; + #[cfg(debug_assertions)] + { + event.timer.left = std::ptr::null_mut(); + event.timer.right = std::ptr::null_mut(); + event.timer.parent = std::ptr::null_mut(); + } + + event.set_timer_set(0); +} + +pin_project! { + pub struct Sleep { + #[pin] + event: ngx_event_t, + } + + impl PinnedDrop for Sleep { + fn drop(this: Pin<&mut Self>) { + let event = this.project().event; + if event.timer_set() != 0 { + ngx_del_timer(unsafe { event.get_unchecked_mut() }); + } + } + } +} + +impl Sleep { + unsafe extern "C" fn handler(ev: *mut ngx_event_t) { + assert!((*ev).timedout() > 0); + } + + // pub fn new_timeout(deadline: Instant) -> Sleep {} +} diff --git a/src/lib.rs b/src/lib.rs index 78465da..0672e96 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,9 @@ extern crate alloc; #[cfg(feature = "std")] extern crate std; +/// The async runtime implementation around the NGINX event loop +#[cfg(feature = "async")] +pub mod r#async; /// The core module. /// /// This module provides fundamental utilities needed to interface with many NGINX primitives.