From 54a3c5e26680816dcf028a28b1b8ee948d7de9c6 Mon Sep 17 00:00:00 2001 From: Paul Hemberger Date: Wed, 22 Feb 2023 16:13:54 -0500 Subject: [PATCH] add tcp user timeout config --- tokio-postgres/Cargo.toml | 2 +- tokio-postgres/src/cancel_query.rs | 1 + tokio-postgres/src/client.rs | 1 + tokio-postgres/src/config.rs | 29 ++++++++++++++++++++++++++++ tokio-postgres/src/connect.rs | 2 ++ tokio-postgres/src/connect_socket.rs | 12 +++++++++++- 6 files changed, 45 insertions(+), 2 deletions(-) diff --git a/tokio-postgres/Cargo.toml b/tokio-postgres/Cargo.toml index 4020a96bf..b16d13e94 100644 --- a/tokio-postgres/Cargo.toml +++ b/tokio-postgres/Cargo.toml @@ -56,7 +56,7 @@ phf = "0.11" postgres-protocol = { version = "0.6.4", path = "../postgres-protocol" } postgres-types = { version = "0.2.4", path = "../postgres-types" } serde = { version = "1.0", optional = true } -socket2 = "0.4" +socket2 = { version = "0.4", features = ["all"] } tokio = { version = "1.0", features = ["io-util"] } tokio-util = { version = "0.7", features = ["codec"] } diff --git a/tokio-postgres/src/cancel_query.rs b/tokio-postgres/src/cancel_query.rs index b02729f85..d869b5824 100644 --- a/tokio-postgres/src/cancel_query.rs +++ b/tokio-postgres/src/cancel_query.rs @@ -38,6 +38,7 @@ where &config.host, config.port, config.connect_timeout, + config.tcp_user_timeout, config.keepalive.as_ref(), ) .await?; diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index eea779f77..3ca1b68c1 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -158,6 +158,7 @@ pub(crate) struct SocketConfig { pub host: Host, pub port: u16, pub connect_timeout: Option, + pub tcp_user_timeout: Option, pub keepalive: Option, } diff --git a/tokio-postgres/src/config.rs b/tokio-postgres/src/config.rs index 75cb896e4..0b8b9b63a 100644 --- a/tokio-postgres/src/config.rs +++ b/tokio-postgres/src/config.rs @@ -116,6 +116,9 @@ pub enum Host { /// omitted or the empty string. /// * `connect_timeout` - The time limit in seconds applied to each socket-level connection attempt. Note that hostnames /// can resolve to multiple IP addresses, and this limit is applied to each address. Defaults to no timeout. +/// * `tcp_user_timeout` - The time limit that transmitted data may remain unacknowledged before a connection is forcibly closed. +/// This is ignored for Unix domain socket connections. It is only supported on systems where TCP_USER_TIMEOUT is available +/// and will default to the system default if omitted or set to 0; on other systems, it has no effect. /// * `keepalives` - Controls the use of TCP keepalive. A value of 0 disables keepalive and nonzero integers enable it. /// This option is ignored when connecting with Unix sockets. Defaults to on. /// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server. @@ -183,6 +186,7 @@ pub struct Config { pub(crate) host: Vec, pub(crate) port: Vec, pub(crate) connect_timeout: Option, + pub(crate) tcp_user_timeout: Option, pub(crate) keepalives: bool, pub(crate) keepalive_config: KeepaliveConfig, pub(crate) target_session_attrs: TargetSessionAttrs, @@ -217,6 +221,7 @@ impl Config { host: vec![], port: vec![], connect_timeout: None, + tcp_user_timeout: None, keepalives: true, keepalive_config, target_session_attrs: TargetSessionAttrs::Any, @@ -407,6 +412,21 @@ impl Config { self.connect_timeout.as_ref() } + /// Sets the TCP user timeout. + /// + /// This is ignored for Unix domain socket connections. It is only supported on systems where + /// TCP_USER_TIMEOUT is available and will default to the system default if omitted or set to 0; + /// on other systems, it has no effect. + pub fn tcp_user_timeout(&mut self, tcp_user_timeout: Duration) -> &mut Config { + self.tcp_user_timeout = Some(tcp_user_timeout); + self + } + + /// Gets the TCP user timeout, if one has been set with the + /// `user_timeout` method. + pub fn get_tcp_user_timeout(&self) -> Option<&Duration> { + self.tcp_user_timeout.as_ref() + } /// Controls the use of TCP keepalive. /// /// This is ignored for Unix domain socket connections. Defaults to `true`. @@ -578,6 +598,14 @@ impl Config { self.connect_timeout(Duration::from_secs(timeout as u64)); } } + "tcp_user_timeout" => { + let timeout = value + .parse::() + .map_err(|_| Error::config_parse(Box::new(InvalidValue("tcp_user_timeout"))))?; + if timeout > 0 { + self.tcp_user_timeout(Duration::from_secs(timeout as u64)); + } + } "keepalives" => { let keepalives = value .parse::() @@ -713,6 +741,7 @@ impl fmt::Debug for Config { .field("host", &self.host) .field("port", &self.port) .field("connect_timeout", &self.connect_timeout) + .field("tcp_user_timeout", &self.tcp_user_timeout) .field("keepalives", &self.keepalives) .field("keepalives_idle", &self.keepalive_config.idle) .field("keepalives_interval", &self.keepalive_config.interval) diff --git a/tokio-postgres/src/connect.rs b/tokio-postgres/src/connect.rs index 97a00c812..ed7ecac66 100644 --- a/tokio-postgres/src/connect.rs +++ b/tokio-postgres/src/connect.rs @@ -65,6 +65,7 @@ where host, port, config.connect_timeout, + config.tcp_user_timeout, if config.keepalives { Some(&config.keepalive_config) } else { @@ -118,6 +119,7 @@ where host: host.clone(), port, connect_timeout: config.connect_timeout, + tcp_user_timeout: config.tcp_user_timeout, keepalive: if config.keepalives { Some(config.keepalive_config.clone()) } else { diff --git a/tokio-postgres/src/connect_socket.rs b/tokio-postgres/src/connect_socket.rs index 19d01d87a..9b3d31d72 100644 --- a/tokio-postgres/src/connect_socket.rs +++ b/tokio-postgres/src/connect_socket.rs @@ -14,6 +14,7 @@ pub(crate) async fn connect_socket( host: &Host, port: u16, connect_timeout: Option, + tcp_user_timeout: Option, keepalive_config: Option<&KeepaliveConfig>, ) -> Result { match host { @@ -35,8 +36,17 @@ pub(crate) async fn connect_socket( }; stream.set_nodelay(true).map_err(Error::connect)?; + + let sock_ref = SockRef::from(&stream); + #[cfg(target_os = "linux")] + { + sock_ref + .set_tcp_user_timeout(tcp_user_timeout) + .map_err(Error::connect)?; + } + if let Some(keepalive_config) = keepalive_config { - SockRef::from(&stream) + sock_ref .set_tcp_keepalive(&TcpKeepalive::from(keepalive_config)) .map_err(Error::connect)?; }