Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Connector to deprecate ClientBuilder #29

Merged
merged 1 commit into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 207 additions & 83 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use std::fmt::Write as _;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::sync::Arc;
use std::time::Duration;

use const_format::formatcp;
Expand Down Expand Up @@ -227,14 +226,20 @@

/// Connects to ZooKeeper cluster.
pub async fn connect(cluster: &str) -> Result<Self> {
Self::builder().connect(cluster).await
Self::connector().connect(cluster).await
}

/// Creates a builder with configurable options in connecting to ZooKeeper cluster.
#[deprecated(since = "0.7.0", note = "use Client::connector instead")]
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}

/// Creates a builder with configurable options in connecting to ZooKeeper cluster.
pub fn connector() -> Connector {
Connector::new()
}

pub(crate) fn new(
chroot: OwnedChroot,
version: Version,
Expand Down Expand Up @@ -1528,32 +1533,96 @@
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub(crate) struct Version(u32, u32, u32);

/// Builder for [Client] with more options than [Client::connect].
/// Options for tls connection.
#[derive(Debug)]
pub struct TlsOptions {
identity: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
ca_certs: RootCertStore,
}

impl Clone for TlsOptions {
fn clone(&self) -> Self {
Self {
identity: self.identity.as_ref().map(|id| (id.0.clone(), id.1.clone_key())),
ca_certs: self.ca_certs.clone(),
}
}

Check warning on line 1549 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1544-L1549

Added lines #L1544 - L1549 were not covered by tests
}

impl Default for TlsOptions {
/// Tls options with well-known ca roots.
fn default() -> Self {
let mut options = Self::no_ca();
options.ca_certs.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
options
}
}

impl TlsOptions {
/// Tls options with no ca certificates. Use [TlsOptions::default] if well-known ca roots is
/// desirable.
pub fn no_ca() -> Self {
Self { ca_certs: RootCertStore::empty(), identity: None }
}

/// Adds new ca certificates.
pub fn with_pem_ca_certs(mut self, certs: &str) -> Result<Self> {
for r in rustls_pemfile::certs(&mut certs.as_bytes()) {
let cert = match r {
Ok(cert) => cert,
Err(err) => return Err(Error::other(format!("fail to read cert {}", err), err)),

Check warning on line 1573 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1573

Added line #L1573 was not covered by tests
};
if let Err(err) = self.ca_certs.add(cert) {
return Err(Error::other(format!("fail to add cert {}", err), err));

Check warning on line 1576 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1576

Added line #L1576 was not covered by tests
}
}
Ok(self)
}

/// Specifies client identity for server to authenticate.
pub fn with_pem_identity(mut self, cert: &str, key: &str) -> Result<Self> {
let r: std::result::Result<Vec<_>, _> = rustls_pemfile::certs(&mut cert.as_bytes()).collect();
let certs = match r {
Err(err) => return Err(Error::other(format!("fail to read cert {}", err), err)),

Check warning on line 1586 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1586

Added line #L1586 was not covered by tests
Ok(certs) => certs,
};
let key = match rustls_pemfile::private_key(&mut key.as_bytes()) {
Err(err) => return Err(Error::other(format!("fail to read client private key {err}"), err)),
Ok(None) => return Err(Error::BadArguments(&"no client private key")),

Check warning on line 1591 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1590-L1591

Added lines #L1590 - L1591 were not covered by tests
Ok(Some(key)) => key,
};
self.identity = Some((certs, key));
Ok(self)
}

fn take_roots(&mut self) -> RootCertStore {
std::mem::replace(&mut self.ca_certs, RootCertStore::empty())
}
}

/// A builder for [Client].
#[derive(Clone, Debug)]
pub struct ClientBuilder {
tls: bool,
trusted_certs: RootCertStore,
client_certs: Option<(Vec<CertificateDer<'static>>, Arc<PrivateKeyDer<'static>>)>,
pub struct Connector {
tls: Option<TlsOptions>,
authes: Vec<AuthPacket>,
version: Version,
session: Option<(SessionId, Vec<u8>)>,
readonly: bool,
detached: bool,
server_version: Version,
session_timeout: Duration,
connection_timeout: Duration,
}

impl ClientBuilder {
/// Builder for [Client] with more options than [Client::connect].
impl Connector {
fn new() -> Self {
Self {
tls: false,
trusted_certs: RootCertStore::empty(),
client_certs: None,
tls: None,
authes: Default::default(),
version: Version(u32::MAX, u32::MAX, u32::MAX),
session: None,
readonly: false,
detached: false,
server_version: Version(u32::MAX, u32::MAX, u32::MAX),
session_timeout: Duration::ZERO,
connection_timeout: Duration::ZERO,
}
Expand All @@ -1562,75 +1631,38 @@
/// Specifies target session timeout to negotiate with ZooKeeper server.
///
/// Defaults to 6s.
pub fn with_session_timeout(&mut self, timeout: Duration) -> &mut Self {
pub fn session_timeout(&mut self, timeout: Duration) -> &mut Self {

Check warning on line 1634 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1634

Added line #L1634 was not covered by tests
self.session_timeout = timeout;
self
}

/// Specifies idle timeout to conclude a connection as loss.
///
/// Defaults to `2/5` of session timeout.
pub fn with_connection_timeout(&mut self, timeout: Duration) -> &mut Self {
pub fn connection_timeout(&mut self, timeout: Duration) -> &mut Self {

Check warning on line 1642 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1642

Added line #L1642 was not covered by tests
self.connection_timeout = timeout;
self
}

/// Specifies whether readonly server is allowed.
pub fn with_readonly(&mut self, readonly: bool) -> &mut ClientBuilder {
pub fn readonly(&mut self, readonly: bool) -> &mut Self {

Check warning on line 1648 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1648

Added line #L1648 was not covered by tests
self.readonly = readonly;
self
}

/// Specifies auth info for given authentication scheme.
pub fn with_auth(&mut self, scheme: String, auth: Vec<u8>) -> &mut ClientBuilder {
pub fn auth(&mut self, scheme: String, auth: Vec<u8>) -> &mut Self {
self.authes.push(AuthPacket { scheme, auth });
self
}

/// Specifies session to reestablish.
pub fn with_session(&mut self, id: SessionId, password: Vec<u8>) -> &mut Self {
pub fn session(&mut self, id: SessionId, password: Vec<u8>) -> &mut Self {
self.session = Some((id, password));
self
}

/// Assumes tls for server in connection string if no protocol specified individually.
/// See [Self::connect] for syntax to specify protocol individually.
pub fn assume_tls(&mut self) -> &mut Self {
self.tls = true;
self
}

/// Trusts certificates signed by given ca certificates.
pub fn trust_ca_pem_certs(&mut self, certs: &str) -> Result<&mut Self> {
for r in rustls_pemfile::certs(&mut certs.as_bytes()) {
let cert = match r {
Ok(cert) => cert,
Err(err) => return Err(Error::other(format!("fail to read cert {}", err), err)),
};
if let Err(err) = self.trusted_certs.add(cert) {
return Err(Error::other(format!("fail to add cert {}", err), err));
}
}
Ok(self)
}

/// Identifies client itself to server with given cert chain and private key.
pub fn use_client_pem_cert(&mut self, cert: &str, key: &str) -> Result<&mut Self> {
let r: std::result::Result<Vec<_>, _> = rustls_pemfile::certs(&mut cert.as_bytes()).collect();
let certs = match r {
Err(err) => return Err(Error::other(format!("fail to read cert {}", err), err)),
Ok(certs) => certs,
};
let key = match rustls_pemfile::private_key(&mut key.as_bytes()) {
Err(err) => return Err(Error::other(format!("fail to read client private key {err}"), err)),
Ok(None) => return Err(Error::BadArguments(&"no client private key")),
Ok(Some(key)) => key,
};
self.client_certs = Some((certs, Arc::new(key)));
Ok(self)
}

/// Specifies client assumed server version of ZooKeeper cluster.
/// Specifies target server version of ZooKeeper cluster.
///
/// Client will issue server compatible protocol to avoid [Error::Unimplemented] for some
/// operations. See [Client::create] for an example.
Expand All @@ -1639,30 +1671,25 @@
///
/// [ZOOKEEPER-1381]: https://issues.apache.org/jira/browse/ZOOKEEPER-1381
/// [ZOOKEEPER-3762]: https://issues.apache.org/jira/browse/ZOOKEEPER-3762
pub fn assume_server_version(&mut self, major: u32, minor: u32, patch: u32) -> &mut Self {
self.version = Version(major, minor, patch);
pub fn server_version(&mut self, major: u32, minor: u32, patch: u32) -> &mut Self {
self.server_version = Version(major, minor, patch);
self
}

/// Detaches creating session so it will not be closed after all client instances dropped.
pub fn detach(&mut self) -> &mut Self {
/// Detaches created session so it will not be closed after all client instances dropped.
pub fn detached(&mut self) -> &mut Self {
self.detached = true;
self
}

/// Connects to ZooKeeper cluster.
///
/// Parameter `cluster` specifies connection string to ZooKeeper cluster. It has same syntax as
/// Java client except that you can specifies protocol for server individually. For example,
/// `tcp://server1,tcp+tls://server2:port,server3`. This claims that `server1` uses plaintext
/// protocol, `server2` uses tls encrypted protocol while `server3` uses tls if
/// [Self::assume_tls] is specified or plaintext otherwise.
///
/// # Notable errors
/// * [Error::NoHosts] if no host is available
/// * [Error::SessionExpired] if specified session expired
pub async fn connect(&mut self, cluster: &str) -> Result<Client> {
let (hosts, chroot) = util::parse_connect_string(cluster, self.tls)?;
/// Specifies tls options for connections to ZooKeeper.
pub fn tls(&mut self, options: TlsOptions) -> &mut Self {
self.tls = Some(options);
self
}

async fn connect_internally(&mut self, secure: bool, cluster: &str) -> Result<Client> {
let (hosts, chroot) = util::parse_connect_string(cluster, secure)?;
if let Some((id, password)) = &self.session {
if id.0 == 0 {
return Err(Error::BadArguments(&"session id must not be 0"));
Expand All @@ -1678,19 +1705,15 @@
} else if self.connection_timeout < Duration::ZERO {
return Err(Error::BadArguments(&"connection timeout must not be negative"));
}
self.trusted_certs.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let tls_config = if let Some((certs, private_key)) = self.client_certs.take() {
match ClientConfig::builder()
.with_root_certificates(std::mem::replace(&mut self.trusted_certs, RootCertStore::empty()))
.with_client_auth_cert(certs, Arc::try_unwrap(private_key).unwrap_or_else(|k| k.clone_key()))
{
let mut tls_options = self.tls.take().unwrap_or_default();
let tls_builder = ClientConfig::builder().with_root_certificates(tls_options.take_roots());
let tls_config = if let Some((client_cert, client_key)) = tls_options.identity.take() {
match tls_builder.with_client_auth_cert(client_cert, client_key) {
Ok(config) => config,
Err(err) => return Err(Error::other(format!("invalid client private key {err}"), err)),
}
} else {
ClientConfig::builder()
.with_root_certificates(std::mem::replace(&mut self.trusted_certs, RootCertStore::empty()))
.with_no_client_auth()
tls_builder.with_no_client_auth()
};
let (mut session, state_receiver) = Session::new(
self.session.take(),
Expand All @@ -1713,9 +1736,110 @@
session.serve(servers, conn, buf, connecting_depot, receiver).await;
});
let client =
Client::new(chroot.to_owned(), self.version, session_info, session_timeout, sender, state_receiver);
Client::new(chroot.to_owned(), self.server_version, session_info, session_timeout, sender, state_receiver);
Ok(client)
}

/// Connects to ZooKeeper cluster.
///
/// Same to [Self::connect] except that `server1` will use tls encrypted protocol given
/// the connection string `server1,tcp://server2,tcp+tls://server3`.
pub async fn secure_connect(&mut self, cluster: &str) -> Result<Client> {
self.connect_internally(true, cluster).await
}

/// Connects to ZooKeeper cluster.
///
/// Parameter `cluster` specifies connection string to ZooKeeper cluster. It has same syntax as
/// Java client except that you can specifies protocol for server individually. For example,
/// `server1,tcp://server2,tcp+tls://server3`. This claims that `server1` and `server2` use
/// plaintext protocol, while `server3` uses tls encrypted protocol.
///
/// # Notable errors
/// * [Error::NoHosts] if no host is available
/// * [Error::SessionExpired] if specified session expired
///
/// # Notable behaviors
/// The state of this connector is undefined after connection attempt no matter whether it is
/// success or not.
pub async fn connect(&mut self, cluster: &str) -> Result<Client> {
self.connect_internally(false, cluster).await
}
}

/// Builder for [Client] with more options than [Client::connect].
#[derive(Clone, Debug)]
pub struct ClientBuilder {
connector: Connector,
}

impl ClientBuilder {
fn new() -> Self {
Self { connector: Connector::new() }
}

Check warning on line 1779 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1777-L1779

Added lines #L1777 - L1779 were not covered by tests

/// Specifies target session timeout to negotiate with ZooKeeper server.
///
/// Defaults to 6s.
pub fn with_session_timeout(&mut self, timeout: Duration) -> &mut Self {
self.connector.session_timeout(timeout);
self
}

Check warning on line 1787 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1784-L1787

Added lines #L1784 - L1787 were not covered by tests

/// Specifies idle timeout to conclude a connection as loss.
///
/// Defaults to `2/5` of session timeout.
pub fn with_connection_timeout(&mut self, timeout: Duration) -> &mut Self {
self.connector.connection_timeout(timeout);
self
}

Check warning on line 1795 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1792-L1795

Added lines #L1792 - L1795 were not covered by tests

/// Specifies whether readonly server is allowed.
pub fn with_readonly(&mut self, readonly: bool) -> &mut ClientBuilder {
self.connector.readonly = readonly;
self
}

Check warning on line 1801 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1798-L1801

Added lines #L1798 - L1801 were not covered by tests

/// Specifies auth info for given authentication scheme.
pub fn with_auth(&mut self, scheme: String, auth: Vec<u8>) -> &mut ClientBuilder {
self.connector.auth(scheme, auth);
self
}

Check warning on line 1807 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1804-L1807

Added lines #L1804 - L1807 were not covered by tests

/// Specifies session to reestablish.
pub fn with_session(&mut self, id: SessionId, password: Vec<u8>) -> &mut Self {
self.connector.session(id, password);
self
}

Check warning on line 1813 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1810-L1813

Added lines #L1810 - L1813 were not covered by tests

/// Specifies client assumed server version of ZooKeeper cluster.
///
/// Client will issue server compatible protocol to avoid [Error::Unimplemented] for some
/// operations. See [Client::create] for an example.
///
/// See [ZOOKEEPER-1381][] and [ZOOKEEPER-3762][] for references.
///
/// [ZOOKEEPER-1381]: https://issues.apache.org/jira/browse/ZOOKEEPER-1381
/// [ZOOKEEPER-3762]: https://issues.apache.org/jira/browse/ZOOKEEPER-3762
pub fn assume_server_version(&mut self, major: u32, minor: u32, patch: u32) -> &mut Self {
self.connector.server_version(major, minor, patch);
self
}

Check warning on line 1827 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1824-L1827

Added lines #L1824 - L1827 were not covered by tests

/// Detaches creating session so it will not be closed after all client instances dropped.
pub fn detach(&mut self) -> &mut Self {
self.connector.detached();
self
}

Check warning on line 1833 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1830-L1833

Added lines #L1830 - L1833 were not covered by tests

/// Connects to ZooKeeper cluster.
///
/// # Notable errors
/// * [Error::NoHosts] if no host is available
/// * [Error::SessionExpired] if specified session expired
pub async fn connect(&mut self, cluster: &str) -> Result<Client> {
self.connector.connect(cluster).await
}

Check warning on line 1842 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L1840-L1842

Added lines #L1840 - L1842 were not covered by tests
}

trait MultiBuffer {
Expand Down
Loading
Loading