Skip to content

Commit

Permalink
Merge pull request #22 from MaterializeInc/hostaddrs
Browse files Browse the repository at this point in the history
Integrate upstream hostaddr/load balancing support
  • Loading branch information
benesch authored Mar 25, 2024
2 parents b759caa + ea22fd8 commit 91522e4
Show file tree
Hide file tree
Showing 17 changed files with 398 additions and 114 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- uses: actions/checkout@v3
- uses: sfackler/actions/rustup@master
- uses: sfackler/actions/rustfmt@master

clippy:
name: clippy
runs-on: ubuntu-latest
Expand Down Expand Up @@ -55,7 +55,7 @@ jobs:
- run: docker compose up -d
- uses: sfackler/actions/rustup@master
with:
version: 1.64.0
version: 1.77.0
- run: echo "version=$(rustc --version)" >> $GITHUB_OUTPUT
id: rust-version
- uses: actions/cache@v3
Expand Down
12 changes: 6 additions & 6 deletions postgres-protocol/src/types/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ fn ltree_str() {
let mut query = vec![1u8];
query.extend_from_slice("A.B.C".as_bytes());

assert!(matches!(ltree_from_sql(query.as_slice()), Ok(_)))
assert!(ltree_from_sql(query.as_slice()).is_ok())
}

#[test]
fn ltree_wrong_version() {
let mut query = vec![2u8];
query.extend_from_slice("A.B.C".as_bytes());

assert!(matches!(ltree_from_sql(query.as_slice()), Err(_)))
assert!(ltree_from_sql(query.as_slice()).is_err())
}

#[test]
Expand All @@ -202,15 +202,15 @@ fn lquery_str() {
let mut query = vec![1u8];
query.extend_from_slice("A.B.C".as_bytes());

assert!(matches!(lquery_from_sql(query.as_slice()), Ok(_)))
assert!(lquery_from_sql(query.as_slice()).is_ok())
}

#[test]
fn lquery_wrong_version() {
let mut query = vec![2u8];
query.extend_from_slice("A.B.C".as_bytes());

assert!(matches!(lquery_from_sql(query.as_slice()), Err(_)))
assert!(lquery_from_sql(query.as_slice()).is_err())
}

#[test]
Expand All @@ -230,13 +230,13 @@ fn ltxtquery_str() {
let mut query = vec![1u8];
query.extend_from_slice("a & b*".as_bytes());

assert!(matches!(ltree_from_sql(query.as_slice()), Ok(_)))
assert!(ltree_from_sql(query.as_slice()).is_ok())
}

#[test]
fn ltxtquery_wrong_version() {
let mut query = vec![2u8];
query.extend_from_slice("a & b*".as_bytes());

assert!(matches!(ltree_from_sql(query.as_slice()), Err(_)))
assert!(ltree_from_sql(query.as_slice()).is_err())
}
4 changes: 2 additions & 2 deletions postgres-types/src/chrono_04.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl ToSql for NaiveDateTime {
impl<'a> FromSql<'a> for DateTime<Utc> {
fn from_sql(type_: &Type, raw: &[u8]) -> Result<DateTime<Utc>, Box<dyn Error + Sync + Send>> {
let naive = NaiveDateTime::from_sql(type_, raw)?;
Ok(DateTime::from_utc(naive, Utc))
Ok(DateTime::from_naive_utc_and_offset(naive, Utc))
}

accepts!(TIMESTAMPTZ);
Expand Down Expand Up @@ -111,7 +111,7 @@ impl<'a> FromSql<'a> for NaiveDate {
let jd = types::date_from_sql(raw)?;
base()
.date()
.checked_add_signed(Duration::days(i64::from(jd)))
.checked_add_signed(Duration::try_days(i64::from(jd)).unwrap())
.ok_or_else(|| "value too large to decode".into())
}

Expand Down
33 changes: 33 additions & 0 deletions postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::connection::Connection;
use crate::Client;
use log::info;
use std::fmt;
use std::net::IpAddr;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -43,6 +44,19 @@ use tokio_postgres::{Error, Socket};
/// path to the directory containing Unix domain sockets. Otherwise, it is treated as a hostname. Multiple hosts
/// can be specified, separated by commas. Each host will be tried in turn when connecting. Required if connecting
/// with the `connect` method.
/// * `hostaddr` - Numeric IP address of host to connect to. This should be in the standard IPv4 address format,
/// e.g., 172.28.40.9. If your machine supports IPv6, you can also use those addresses.
/// If this parameter is not specified, the value of `host` will be looked up to find the corresponding IP address,
/// - or if host specifies an IP address, that value will be used directly.
/// Using `hostaddr` allows the application to avoid a host name look-up, which might be important in applications
/// with time constraints. However, a host name is required for verify-full SSL certificate verification.
/// Specifically:
/// * If `hostaddr` is specified without `host`, the value for `hostaddr` gives the server network address.
/// The connection attempt will fail if the authentication method requires a host name;
/// * If `host` is specified without `hostaddr`, a host name lookup occurs;
/// * If both `host` and `hostaddr` are specified, the value for `hostaddr` gives the server network address.
/// The value for `host` is ignored unless the authentication method requires it,
/// in which case it will be used as the host name.
/// * `port` - The port to connect to. Multiple ports can be specified, separated by commas. The number of ports must be
/// either 1, in which case it will be used for all hosts, or the same as the number of hosts. Defaults to 5432 if
/// omitted or the empty string.
Expand Down Expand Up @@ -74,6 +88,10 @@ use tokio_postgres::{Error, Socket};
/// ```
///
/// ```not_rust
/// host=host1,host2,host3 port=1234,,5678 hostaddr=127.0.0.1,127.0.0.2,127.0.0.3 user=postgres target_session_attrs=read-write
/// ```
///
/// ```not_rust
/// host=host1,host2,host3 port=1234,,5678 user=postgres target_session_attrs=read-write
/// ```
///
Expand Down Expand Up @@ -250,6 +268,7 @@ impl Config {
///
/// Multiple hosts can be specified by calling this method multiple times, and each will be tried in order. On Unix
/// systems, a host starting with a `/` is interpreted as a path to a directory containing Unix domain sockets.
/// There must be either no hosts, or the same number of hosts as hostaddrs.
pub fn host(&mut self, host: &str) -> &mut Config {
self.config.host(host);
self
Expand All @@ -260,6 +279,11 @@ impl Config {
self.config.get_hosts()
}

/// Gets the hostaddrs that have been added to the configuration with `hostaddr`.
pub fn get_hostaddrs(&self) -> &[IpAddr] {
self.config.get_hostaddrs()
}

/// Adds a Unix socket host to the configuration.
///
/// Unlike `host`, this method allows non-UTF8 paths.
Expand All @@ -272,6 +296,15 @@ impl Config {
self
}

/// Adds a hostaddr to the configuration.
///
/// Multiple hostaddrs can be specified by calling this method multiple times, and each will be tried in order.
/// There must be either no hostaddrs, or the same number of hostaddrs as hosts.
pub fn hostaddr(&mut self, hostaddr: IpAddr) -> &mut Config {
self.config.hostaddr(hostaddr);
self
}

/// Adds a port to the configuration.
///
/// Multiple ports can be specified by calling this method multiple times. There must either be no ports, in which
Expand Down
2 changes: 1 addition & 1 deletion tokio-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ serde = { version = "1.0", optional = true }
socket2 = { version = "0.5", features = ["all"] }
tokio = { version = "1.27", features = ["io-util"] }
tokio-util = { version = "0.7", features = ["codec"] }
rand = "0.8.5"

[dev-dependencies]
futures-executor = "0.3"
Expand All @@ -78,7 +79,6 @@ eui48-04 = { version = "0.4", package = "eui48" }
eui48-1 = { version = "1.0", package = "eui48" }
geo-types-06 = { version = "0.6", package = "geo-types" }
geo-types-07 = { version = "0.7", package = "geo-types" }
serde-1 = { version = "1.0", package = "serde" }
serde_json-1 = { version = "1.0", package = "serde_json" }
smol_str-01 = { version = "0.1", package = "smol_str" }
uuid-08 = { version = "0.8", package = "uuid" }
Expand Down
16 changes: 6 additions & 10 deletions tokio-postgres/src/cancel_query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::client::SocketConfig;
use crate::config::{Host, SslMode};
use crate::config::SslMode;
use crate::tls::MakeTlsConnect;
use crate::{cancel_query_raw, connect_socket, Error, Socket};
use std::io;
Expand All @@ -24,24 +24,20 @@ where
}
};

let hostname = match &config.host {
Host::Tcp(host) => &**host,
// postgres doesn't support TLS over unix sockets, so the choice here doesn't matter
#[cfg(unix)]
Host::Unix(_) => "",
};
let tls = tls
.make_tls_connect(hostname)
.make_tls_connect(config.hostname.as_deref().unwrap_or(""))
.map_err(|e| Error::tls(e.into()))?;
let has_hostname = config.hostname.is_some();

let socket = connect_socket::connect_socket(
&config.host,
&config.addr,
config.port,
config.connect_timeout,
config.tcp_user_timeout,
config.keepalive.as_ref(),
)
.await?;

cancel_query_raw::cancel_query_raw(socket, ssl_mode, tls, process_id, secret_key).await
cancel_query_raw::cancel_query_raw(socket, ssl_mode, tls, has_hostname, process_id, secret_key)
.await
}
3 changes: 2 additions & 1 deletion tokio-postgres/src/cancel_query_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ pub async fn cancel_query_raw<S, T>(
stream: S,
mode: SslMode,
tls: T,
has_hostname: bool,
process_id: i32,
secret_key: i32,
) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
{
let mut stream = connect_tls::connect_tls(stream, mode, tls).await?;
let mut stream = connect_tls::connect_tls(stream, mode, tls, has_hostname).await?;

let mut buf = BytesMut::new();
frontend::cancel_request(process_id, secret_key, &mut buf);
Expand Down
1 change: 1 addition & 0 deletions tokio-postgres/src/cancel_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl CancelToken {
stream,
self.ssl_mode,
tls,
true,
self.process_id,
self.secret_key,
)
Expand Down
17 changes: 14 additions & 3 deletions tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use crate::codec::{BackendMessages, FrontendMessage};
#[cfg(feature = "runtime")]
use crate::config::Host;
use crate::config::SslMode;
use crate::connection::{Request, RequestMessages};
use crate::copy_both::CopyBothDuplex;
Expand Down Expand Up @@ -29,6 +27,10 @@ use postgres_protocol::message::{backend::Message, frontend};
use postgres_types::BorrowToSql;
use std::collections::HashMap;
use std::fmt;
#[cfg(feature = "runtime")]
use std::net::IpAddr;
#[cfg(feature = "runtime")]
use std::path::PathBuf;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -155,13 +157,22 @@ impl InnerClient {
#[cfg(feature = "runtime")]
#[derive(Clone)]
pub(crate) struct SocketConfig {
pub host: Host,
pub addr: Addr,
pub hostname: Option<String>,
pub port: u16,
pub connect_timeout: Option<Duration>,
pub tcp_user_timeout: Option<Duration>,
pub keepalive: Option<KeepaliveConfig>,
}

#[cfg(feature = "runtime")]
#[derive(Clone)]
pub(crate) enum Addr {
Tcp(IpAddr),
#[cfg(unix)]
Unix(PathBuf),
}

/// An asynchronous PostgreSQL client.
///
/// The client is one half of what is returned when a connection is established. Users interact with the database
Expand Down
Loading

0 comments on commit 91522e4

Please sign in to comment.