From 309672d29c22879fe5d81f90a97bf01ec62efa8e Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Thu, 21 Mar 2024 04:35:09 +0800 Subject: [PATCH] test: refactor tests to use tls connection --- Cargo.toml | 1 - src/session/mod.rs | 2 +- tests/zookeeper.rs | 643 ++++++++++++++++++++++----------------------- 3 files changed, 319 insertions(+), 327 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bfe4259..da2a849 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,5 +44,4 @@ testcontainers = { git = "https://github.com/kezhuw/testcontainers-rs.git", bran assertor = "0.0.2" assert_matches = "1.5.0" tempfile = "3.6.0" -maplit = "1.0.2" rcgen = { version = "0.12.1", features = ["default", "x509-parser"] } diff --git a/src/session/mod.rs b/src/session/mod.rs index a2ca05c..02a01f3 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -443,7 +443,7 @@ impl Session { tick.set_missed_tick_behavior(time::MissedTickBehavior::Skip); let mut channel_closed = false; depot.start(); - while !(channel_closed && depot.is_empty()) { + while !(channel_closed && depot.is_empty() && !conn.wants_write()) { select! { _ = conn.readable() => { self.read_connection(conn, buf)?; diff --git a/tests/zookeeper.rs b/tests/zookeeper.rs index 9f14365..df18246 100644 --- a/tests/zookeeper.rs +++ b/tests/zookeeper.rs @@ -5,7 +5,6 @@ use std::{fs, future}; use assert_matches::assert_matches; use assertor::*; -use maplit::hashmap; use pretty_assertions::assert_eq; use rand::distributions::Standard; use rand::Rng; @@ -14,11 +13,12 @@ use rcgen::{Certificate, CertificateParams}; use tempfile::{tempdir, TempDir}; use test_case::test_case; use testcontainers::clients::Cli as DockerCli; -use testcontainers::core::{Healthcheck, RunnableImage, WaitFor}; +use testcontainers::core::{Container, Healthcheck, RunnableImage, WaitFor}; use testcontainers::images::generic::GenericImage; use tokio::select; use zookeeper_client as zk; +static ZK_IMAGE_TAG: &'static str = "3.9.0"; static PERSISTENT_OPEN: &zk::CreateOptions<'static> = &zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()); static CONTAINER_OPEN: &zk::CreateOptions<'static> = &zk::CreateMode::Container.with_acls(zk::Acls::anyone_all()); @@ -27,56 +27,37 @@ fn random_data() -> Vec { rng.sample_iter(Standard).take(32).collect() } -fn zookeeper_image_with_version_and_properties<'a>(version: &'a str, mut properties: Vec<&'a str>) -> GenericImage { +fn zookeeper_image<'a>(options: ContainerOptions<'a>) -> RunnableImage { + let mut properties = options.properties; properties.insert(0, "-Dzookeeper.DigestAuthenticationProvider.superDigest=super:D/InIHSb7yEEbrWz8b9l71RjZJU="); properties.insert(0, "-Dzookeeper.enableEagerACLCheck=true"); - let jvm_properties = properties.join(" "); + let server_jvmflags = properties.join(" "); + let client_jvmflags = options.healthcheck.join(" "); let healthcheck = Healthcheck::default() .with_cmd(["./bin/zkServer.sh", "status"].iter()) .with_interval(Duration::from_secs(2)) .with_retries(60); - GenericImage::new("zookeeper", version) - .with_env_var("SERVER_JVMFLAGS", jvm_properties) + let mut image: RunnableImage<_> = GenericImage::new("zookeeper", options.tag) + .with_env_var("SERVER_JVMFLAGS", server_jvmflags) + .with_env_var("CLIENT_JVMFLAGS", client_jvmflags) .with_healthcheck(healthcheck) .with_wait_for(WaitFor::Healthcheck) -} - -fn zookeeper_image_with_properties<'a>(properties: Vec<&'a str>) -> GenericImage { - zookeeper_image_with_version_and_properties("3.9.0", properties) -} - -fn zookeeper_image_with_port_and_volumes<'a>( - port: u16, - volumes: HashMap<&'a str, &'a Path>, -) -> RunnableImage { - let mut image: RunnableImage<_> = zookeeper_image().with_exposed_port(port).into(); - for (dest, source) in volumes { + .into(); + for (dest, source) in options.volumes { image = image.with_volume((source.to_str().unwrap(), dest)); } image } -fn zookeeper_image() -> GenericImage { - zookeeper_image_with_properties(Vec::default()) -} - -fn zookeeper34_image() -> GenericImage { - zookeeper_image_with_version_and_properties("3.4", Vec::default()) -} - async fn example() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); + let server = Server::new(); + let client = server.client(None).await; let path = "/abc"; let data = "path_data".as_bytes().to_vec(); let child_path = "/abc/efg"; let child_data = "child_path_data".as_bytes().to_vec(); - let client = zk::Client::connect(&cluster).await.unwrap(); let (_, stat_watcher) = client.check_and_watch_stat(path).await.unwrap(); let (stat, _) = client.create(path, &data, PERSISTENT_OPEN).await.unwrap(); @@ -139,8 +120,8 @@ async fn test_example() { tokio::spawn(async move { example().await }).await.unwrap() } -async fn connect(cluster: &str, chroot: &str) -> zk::Client { - let client = zk::Client::connect(cluster).await.unwrap(); +async fn connect(server: &Server, chroot: &str) -> zk::Client { + let client = server.client(None).await; if chroot.len() <= 1 { return client; } @@ -167,32 +148,236 @@ async fn test_connect_nohosts() { #[test_log::test(tokio::test)] async fn test_connect_session_expired() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::builder().detach().connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.custom_client(None, |builder| builder.detach()).await.unwrap(); let timeout = client.session_timeout(); let (id, password) = client.into_session(); tokio::time::sleep(timeout * 2).await; - assert_that!(zk::Client::builder().with_session(id, password).connect(&cluster).await.unwrap_err()) + assert_that!(server.custom_client(None, |builder| builder.with_session(id, password)).await.unwrap_err()) .is_equal_to(zk::Error::SessionExpired); } +struct Tls { + ca: String, + cert: String, + key: String, +} + +struct Server { + tls: Option, + _dir: Option, + _docker: Box, + container: Container<'static, GenericImage>, +} + +unsafe impl Send for Server {} +unsafe impl Sync for Server {} + +struct ContainerOptions<'a> { + tag: &'a str, + volumes: HashMap<&'a str, &'a Path>, + properties: Vec<&'a str>, + healthcheck: Vec<&'a str>, +} + +impl Default for ContainerOptions<'_> { + fn default() -> Self { + Self { tag: ZK_IMAGE_TAG, volumes: HashMap::new(), properties: vec![], healthcheck: vec![] } + } +} + +impl<'a> From> for ContainerOptions<'a> { + fn from(options: ServerOptions<'a>) -> Self { + Self { + tag: options.tag, + volumes: Default::default(), + properties: options.properties, + healthcheck: Default::default(), + } + } +} + +struct ServerOptions<'a> { + tls: Option, + tag: &'a str, + properties: Vec<&'a str>, +} + +impl Default for ServerOptions<'_> { + fn default() -> Self { + Self { tls: None, tag: ZK_IMAGE_TAG, properties: vec![] } + } +} + +impl Server { + pub fn new() -> Self { + Self::with_options(Default::default()) + } + + pub fn with_properties(properties: Vec<&'_ str>) -> Self { + Self::with_options(ServerOptions { properties, ..Default::default() }) + } + + pub fn with_options(options: ServerOptions<'_>) -> Self { + let tls = options.tls.unwrap_or_else(|| match std::env::var("ZK_TEST_TLS") { + Ok(v) if v == "true" => true, + Ok(v) if v == "false" => false, + _ => rand::random(), + }); + if tls { + println!("starting tls zookeeper server ..."); + Self::tls(options) + } else { + println!("starting plaintext zookeeper server ..."); + Self::plaintext(options) + } + } + + pub fn stop(&self) { + self.container.stop(); + } + + fn tls(options: ServerOptions<'_>) -> Self { + let dir = tempdir().unwrap(); + + let (ca_cert, ca_cert_pem) = generate_ca_cert(); + let server_cert = generate_server_cert(); + let signed_server_cert = server_cert.serialize_pem_with_signer(&ca_cert).unwrap(); + + // ZooKeeper needs a keystore with both key and signed cert. + let server_pem = server_cert.serialize_private_key_pem() + &signed_server_cert; + + let ca_cert_file = dir.path().join("ca.cert.pem"); + fs::write(&ca_cert_file, &ca_cert_pem).unwrap(); + + let server_pem_file = dir.path().join("server.pem"); + fs::write(&server_pem_file, &server_pem).unwrap(); + + let client_cert = generate_client_cert(); + let signed_client_cert = client_cert.serialize_pem_with_signer(&ca_cert).unwrap(); + let client_key = client_cert.serialize_private_key_pem(); + let client_pem = client_key.clone() + &signed_client_cert; + + let client_pem_file = dir.path().join("client.pem"); + fs::write(&client_pem_file, &client_pem).unwrap(); + + let config = r" +dataDir=/data +dataLogDir=/datalog +tickTime=2000 +initLimit=5 +syncLimit=2 +autopurge.snapRetainCount=3 +autopurge.purgeInterval=0 +maxClientCnxns=60 + +secureClientPort=2181 +ssl.clientAuth=need +ssl.keyStore.location=/certs/server.pem +ssl.trustStore.location=/certs/ca.cert.pem +serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory + "; + let config_file = dir.path().join("zoo.cfg"); + fs::write(&config_file, &config).unwrap(); + + let options = ContainerOptions { + tag: options.tag, + volumes: HashMap::from([ + ("/conf/zoo.cfg", config_file.as_path()), + ("/certs/client.pem", client_pem_file.as_path()), + ("/certs/server.pem", server_pem_file.as_path()), + ("/certs/ca.cert.pem", ca_cert_file.as_path()), + ]), + properties: options.properties, + healthcheck: vec![ + "-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty", + "-Dzookeeper.ssl.trustStore.location=/certs/ca.cert.pem", + "-Dzookeeper.ssl.keyStore.location=/certs/client.pem", + "-Dzookeeper.client.secure=true", + ], + }; + + let docker = Box::new(DockerCli::default()); + let image = zookeeper_image(options); + let container = unsafe { std::mem::transmute(docker.run(image)) }; + + Self { + tls: Some(Tls { ca: ca_cert_pem, cert: signed_client_cert, key: client_key }), + _dir: Some(dir), + _docker: docker, + container, + } + } + + fn plaintext(options: ServerOptions<'_>) -> Self { + let docker = Box::new(DockerCli::default()); + let image = zookeeper_image(options.into()); + let container = unsafe { std::mem::transmute(docker.run(image)) }; + Self { tls: None, _dir: None, _docker: docker, container } + } + + pub fn port(&self) -> u16 { + self.container.get_host_port(2181) + } + + pub fn url(&self) -> String { + let protocol = match (self.tls.is_some(), rand::random()) { + (true, _) => "tcp+tls://", + (false, true) => "tcp://", + (false, false) => "", + }; + format!("{}127.0.0.1:{}", protocol, self.port()) + } + + pub fn builder(&self) -> zk::ClientBuilder { + let Some(tls) = self.tls.as_ref() else { + return zk::Client::builder(); + }; + let mut builder = zk::Client::builder(); + builder.trust_ca_pem_certs(&tls.ca).unwrap().use_client_pem_cert(&tls.cert, &tls.key).unwrap(); + builder + } + + pub async fn client(&self, chroot: Option<&str>) -> zk::Client { + self.custom_client(chroot, |builder| builder).await.unwrap() + } + + pub async fn custom_client( + &self, + chroot: Option<&str>, + custom: impl FnOnce(&mut zk::ClientBuilder) -> &mut zk::ClientBuilder, + ) -> Result { + let mut builder = self.builder(); + custom(&mut builder); + let chroot = chroot.unwrap_or(""); + let Some(tls) = self.tls.as_ref() else { + let url = self.url() + chroot; + return builder.connect(&url).await; + }; + let assume_tls: bool = rand::random(); + let protocol = if !assume_tls || rand::random() { "tcp+tls://" } else { "" }; + if assume_tls { + builder.assume_tls(); + } + builder + .trust_ca_pem_certs(&tls.ca) + .unwrap() + .use_client_pem_cert(&tls.cert, &tls.key) + .unwrap() + .connect(&format!("{}127.0.0.1:{}{}", protocol, self.port(), chroot)) + .await + } +} + #[test_case("/"; "no_chroot")] #[test_case("/x"; "chroot_x")] #[test_case("/x/y"; "chroot_x_y")] #[test_log::test(tokio::test)] async fn test_multi(chroot: &str) { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = connect(&cluster, chroot).await; + let server = Server::new(); + let client = connect(&server, chroot).await; let mut writer = client.new_multi_writer(); assert_that!(writer.commit().await.unwrap()).is_empty(); @@ -292,12 +477,8 @@ async fn test_multi(chroot: &str) { #[test_log::test(tokio::test)] async fn test_multi_async_order() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; client.create("/a", "a0".as_bytes(), PERSISTENT_OPEN).await.unwrap(); @@ -321,12 +502,8 @@ async fn test_multi_async_order() { #[test_log::test(tokio::test)] async fn test_check_writer() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let mut check_writer = client.new_check_writer("/check", None).unwrap(); check_writer.add_create("/a", Default::default(), PERSISTENT_OPEN).unwrap(); @@ -357,13 +534,10 @@ async fn test_check_writer() { #[test_case("/x/y"; "chroot_x_y")] #[test_log::test(tokio::test)] async fn test_lock_shared(chroot: &str) { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); + let server = Server::new(); test_lock_with_path( - &cluster, + &server, chroot, zk::LockPrefix::new_shared("/locks/shared/n-").unwrap(), zk::LockPrefix::new_shared("/locks/shared/n-").unwrap(), @@ -375,50 +549,36 @@ async fn test_lock_shared(chroot: &str) { #[test_case("/x/y"; "chroot_x_y")] #[test_log::test(tokio::test)] async fn test_lock_custom(chroot: &str) { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); + let server = Server::new(); let lock1_prefix = zk::LockPrefix::new_custom("/locks/custom/n-abc-".to_string(), "n-").unwrap(); let lock2_prefix = zk::LockPrefix::new_custom("/locks/custom/n-def-".to_string(), "n-").unwrap(); - test_lock_with_path(&cluster, chroot, lock1_prefix, lock2_prefix).await; + test_lock_with_path(&server, chroot, lock1_prefix, lock2_prefix).await; } #[test_case("/x"; "chroot_x")] #[test_case("/x/y"; "chroot_x_y")] #[test_log::test(tokio::test)] async fn test_lock_curator(chroot: &str) { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); + let server = Server::new(); let lock1_prefix = zk::LockPrefix::new_curator("/locks/curator", "latch-").unwrap(); let lock2_prefix = zk::LockPrefix::new_curator("/locks/curator", "latch-").unwrap(); - test_lock_with_path(&cluster, chroot, lock1_prefix, lock2_prefix).await; + test_lock_with_path(&server, chroot, lock1_prefix, lock2_prefix).await; } #[test_log::test(tokio::test)] async fn test_lock_no_node() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let prefix = zk::LockPrefix::new_curator("/a/locks", "latch-").unwrap(); assert_eq!(client.lock(prefix, b"", zk::Acls::anyone_all()).await.unwrap_err(), zk::Error::NoNode); } #[test_log::test(tokio::test)] async fn test_lock_curator_filter() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let options = zk::LockOptions::new(zk::Acls::anyone_all()).with_ancestor_options(CONTAINER_OPEN.clone()).unwrap(); let latch_prefix = zk::LockPrefix::new_curator("/locks/curator", "latch-").unwrap(); @@ -430,13 +590,13 @@ async fn test_lock_curator_filter() { #[allow(unused_must_use)] // semi-asynchronous future async fn test_lock_with_path( - cluster: &str, + server: &Server, chroot: &str, lock1_prefix: zk::LockPrefix<'static>, lock2_prefix: zk::LockPrefix<'static>, ) { - let client1 = connect(cluster, chroot).await; - let client2 = connect(cluster, chroot).await; + let client1 = connect(server, chroot).await; + let client2 = connect(server, chroot).await; let options = zk::LockOptions::new(zk::Acls::anyone_all()).with_ancestor_options(CONTAINER_OPEN.clone()).unwrap(); @@ -477,13 +637,8 @@ async fn test_lock_with_path( #[test_log::test(tokio::test)] async fn test_no_node() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; assert_eq!(client.check_stat("/nonexistent").await.unwrap(), None); assert_eq!(client.get_data("/nonexistent").await.unwrap_err(), zk::Error::NoNode); @@ -500,12 +655,8 @@ async fn test_no_node() { #[test_log::test(tokio::test)] async fn test_request_order() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let path = "/abc"; let child_path = "/abc/efg"; @@ -532,12 +683,8 @@ async fn test_request_order() { #[test_log::test(tokio::test)] async fn test_data_node() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let path = "/abc"; let data = random_data(); @@ -555,12 +702,8 @@ async fn test_data_node() { #[test_log::test(tokio::test)] async fn test_create_root() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap().chroot("/a").unwrap(); + let server = Server::new(); + let client = server.client(None).await.chroot("/a").unwrap(); assert_that!(client.create("/", &vec![], PERSISTENT_OPEN).await.unwrap_err()) .is_equal_to(zk::Error::BadArguments(&"can not create root node")); assert_that!(client.mkdir("/", PERSISTENT_OPEN).await.unwrap_err()) @@ -569,12 +712,8 @@ async fn test_create_root() { #[test_log::test(tokio::test)] async fn test_create_sequential() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let prefix = "/PREFIX-"; let data = random_data(); @@ -597,15 +736,11 @@ async fn test_create_sequential() { #[test_log::test(tokio::test)] async fn test_create_ttl() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image_with_properties(vec![ + let server = Server::with_properties(vec![ "-Dzookeeper.extendedTypesEnabled=true", "-Dznode.container.checkIntervalMs=1000", - ])); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + ]); + let client = server.client(None).await; let ttl_options = PERSISTENT_OPEN.clone().with_ttl(Duration::from_millis(500)); client.create("/ttl", &vec![], &ttl_options).await.unwrap(); @@ -618,15 +753,11 @@ async fn test_create_ttl() { #[test_log::test(tokio::test)] async fn test_create_container() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image_with_properties(vec![ + let server = Server::with_properties(vec![ "-Dzookeeper.extendedTypesEnabled=true", "-Dznode.container.checkIntervalMs=1000", - ])); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + ]); + let client = server.client(None).await; client.create("/container", &vec![], &zk::CreateMode::Container.with_acls(zk::Acls::anyone_all())).await.unwrap(); client.create("/container/child", &vec![], PERSISTENT_OPEN).await.unwrap(); @@ -638,11 +769,9 @@ async fn test_create_container() { #[test_log::test(tokio::test)] async fn test_zookeeper34() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper34_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::builder().assume_server_version(3, 4, u32::MAX).connect(&cluster).await.unwrap(); + let server = Server::with_options(ServerOptions { tls: Some(false), tag: "3.4", ..Default::default() }); + + let client = server.custom_client(None, |builder| builder.assume_server_version(3, 4, u32::MAX)).await.unwrap(); let (stat, _sequence) = client.create("/a", b"a1", PERSISTENT_OPEN).await.unwrap(); assert_that!(stat.is_invalid()).is_true(); @@ -676,12 +805,8 @@ async fn test_zookeeper34() { #[test_log::test(tokio::test)] async fn test_mkdir() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; client.mkdir("/a/b/c/d", PERSISTENT_OPEN).await.unwrap(); let _stat = client.check_stat("/a/b/c/d").await.unwrap().unwrap(); @@ -696,12 +821,8 @@ async fn test_mkdir() { #[test_log::test(tokio::test)] async fn test_descendants_number() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let path = "/abc"; let child_path = "/abc/efg"; @@ -748,12 +869,8 @@ where #[test_log::test(tokio::test)] async fn test_ephemerals() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let path = "/abc"; let child_path = "/abc/efg"; @@ -814,12 +931,8 @@ async fn test_ephemerals() { #[test_log::test(tokio::test)] async fn test_chroot() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; assert_eq!(client.path(), "/"); let client = client.chroot("abc").unwrap_err(); @@ -863,12 +976,8 @@ async fn test_chroot() { #[test_log::test(tokio::test)] async fn test_auth() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let scheme = "digest"; let user = "bob"; @@ -879,22 +988,18 @@ async fn test_auth() { let authed_users = client.list_auth_users().await.unwrap(); assert!(authed_users.contains(&authed_user)); - let built_client = - zk::Client::builder().with_auth(scheme.to_string(), auth.to_vec()).connect(&cluster).await.unwrap(); + let authed_client = + server.custom_client(None, |builder| builder.with_auth(scheme.to_string(), auth.to_vec())).await.unwrap(); - built_client.auth(scheme.to_string(), auth.to_vec()).await.unwrap(); + authed_client.auth(scheme.to_string(), auth.to_vec()).await.unwrap(); let authed_users = client.list_auth_users().await.unwrap(); assert!(authed_users.contains(&authed_user)); } #[test_log::test(tokio::test)] async fn test_no_auth() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::with_options(ServerOptions { tls: Some(false), ..Default::default() }); + let client = server.client(None).await; let scheme = "digest"; let auth = b"bob:xyz"; @@ -906,7 +1011,7 @@ async fn test_no_auth() { .unwrap(); assert_eq!(client.get_data("/acl_test").await.unwrap().0, b"my_data".to_vec()); - let no_auth_client = zk::Client::connect(&cluster).await.unwrap(); + let no_auth_client = server.client(None).await; assert_eq!(no_auth_client.get_data("/acl_test").await.unwrap_err(), zk::Error::NoAuth); assert_eq!(no_auth_client.set_data("/acl_test", b"set_my_data", None).await.unwrap_err(), zk::Error::NoAuth); @@ -923,12 +1028,8 @@ async fn test_no_auth() { #[test_log::test(tokio::test)] async fn test_delete() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let path = "/abc"; let (stat, _) = client.create(path, Default::default(), PERSISTENT_OPEN).await.unwrap(); @@ -945,12 +1046,8 @@ async fn test_delete() { #[test_log::test(tokio::test)] async fn test_oneshot_watcher() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let path = "/abc"; let child_path = "/abc/efg"; @@ -1121,17 +1218,12 @@ async fn test_oneshot_watcher() { #[test_log::test(tokio::test)] async fn test_config_watch() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); + let server = Server::new(); - let connect = format!("{}/root", cluster); - let client1 = zk::Client::connect(&connect).await.unwrap(); + let client1 = server.client(Some("/root")).await; let (config_bytes, stat, watcher) = client1.get_and_watch_config().await.unwrap(); - let client2 = zk::Client::connect(&cluster).await.unwrap(); + let client2 = server.client(None).await; client2.auth("digest".to_string(), b"super:test".to_vec()).await.unwrap(); client2.set_data("/zookeeper/config", &config_bytes, Some(stat.version)).await.unwrap(); @@ -1142,13 +1234,8 @@ async fn test_config_watch() { #[test_log::test(tokio::test)] async fn test_persistent_watcher_passive_remove() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let path = "/abc"; let child_path = "/abc/efg"; @@ -1181,12 +1268,8 @@ async fn test_persistent_watcher_passive_remove() { #[test_log::test(tokio::test)] async fn test_fail_watch_with_multiple_unwatching() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let (_, exist_watcher1) = client.check_and_watch_stat("/a1").await.unwrap(); let (_, exist_watcher2) = client.check_and_watch_stat("/a2").await.unwrap(); @@ -1210,12 +1293,8 @@ async fn test_fail_watch_with_multiple_unwatching() { #[test_log::test(tokio::test)] async fn test_fail_watch_with_concurrent_passive_remove() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let recursive_watcher = client.watch("/a", zk::AddWatchMode::PersistentRecursive).await.unwrap(); let data_watching = client.get_and_watch_data("/a"); @@ -1234,13 +1313,8 @@ async fn test_fail_watch_with_concurrent_passive_remove() { #[test_log::test(tokio::test)] async fn test_persistent_watcher() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let path = "/abc"; let child_path = "/abc/efg"; @@ -1359,12 +1433,8 @@ async fn test_persistent_watcher() { #[test_log::test(tokio::test)] async fn test_watcher_coexist_on_same_path() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let (_, exist_watcher) = client.check_and_watch_stat("/a").await.unwrap(); let mut persistent_watcher = client.watch("/a", zk::AddWatchMode::Persistent).await.unwrap(); @@ -1422,12 +1492,8 @@ async fn test_watcher_coexist_on_same_path() { // Use "current_thread" explicitly. #[test_log::test(tokio::test(flavor = "current_thread"))] async fn test_remove_no_watcher() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let (_, exist_watcher) = client.check_and_watch_stat("/a").await.unwrap(); let create = client.create("/a", &vec![], PERSISTENT_OPEN); @@ -1449,13 +1515,8 @@ async fn test_remove_no_watcher() { #[test_log::test(tokio::test)] async fn test_session_event() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let (_, oneshot_watcher1) = client.check_and_watch_stat("/").await.unwrap(); let (_, _, oneshot_watcher2) = client.get_and_watch_data("/").await.unwrap(); @@ -1464,7 +1525,7 @@ async fn test_session_event() { let mut persistent_watcher = client.watch("/", zk::AddWatchMode::PersistentRecursive).await.unwrap(); - zookeeper.stop(); + server.stop(); let event = persistent_watcher.changed().await; assert_eq!(event.event_type, zk::EventType::Session); @@ -1485,13 +1546,8 @@ async fn test_session_event() { #[test_log::test(tokio::test)] async fn test_state_watcher() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let mut state_watcher = client.state_watcher(); select! { biased; @@ -1510,34 +1566,27 @@ async fn test_state_watcher() { #[test_log::test(tokio::test)] async fn test_client_drop() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::builder().connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.client(None).await; let mut state_watcher = client.state_watcher(); + tokio::time::sleep(Duration::from_secs(20)).await; let (id, password) = client.into_session(); assert_eq!(zk::SessionState::Closed, state_watcher.changed().await); - zk::Client::builder().with_session(id, password).connect(&cluster).await.unwrap_err(); + server.custom_client(None, |builder| builder.with_session(id, password)).await.unwrap_err(); } #[test_log::test(tokio::test)] async fn test_client_detach() { - let docker = DockerCli::default(); - let zookeeper = docker.run(zookeeper_image()); - let zk_port = zookeeper.get_host_port(2181); - let cluster = format!("127.0.0.1:{}", zk_port); - - let client = zk::Client::builder().detach().connect(&cluster).await.unwrap(); + let server = Server::new(); + let client = server.custom_client(None, |builder| builder.detach()).await.unwrap(); let mut state_watcher = client.state_watcher(); let (id, password) = client.into_session(); assert_eq!(zk::SessionState::Closed, state_watcher.changed().await); - zk::Client::builder().with_session(id, password).connect(&cluster).await.unwrap(); + server.custom_client(None, |builder| builder.with_session(id, password)).await.unwrap(); } fn generate_ca_cert() -> (Certificate, String) { @@ -1567,64 +1616,8 @@ fn generate_client_cert() -> Certificate { #[test_log::test(tokio::test)] async fn test_tls() { - let dir = tempdir().unwrap(); - - let (ca_cert, ca_cert_pem) = generate_ca_cert(); - let server_cert = generate_server_cert(); - let signed_server_cert = server_cert.serialize_pem_with_signer(&ca_cert).unwrap(); - - // ZooKeeper needs a keystore with both key and signed cert. - let server_pem = server_cert.serialize_private_key_pem() + &signed_server_cert; - - let ca_cert_file = dir.path().join("ca.cert.pem"); - fs::write(&ca_cert_file, &ca_cert_pem).unwrap(); - - let server_pem_file = dir.path().join("server.pem"); - fs::write(&server_pem_file, &server_pem).unwrap(); - - let config = r" -dataDir=/data -dataLogDir=/datalog -tickTime=2000 -initLimit=5 -syncLimit=2 -autopurge.snapRetainCount=3 -autopurge.purgeInterval=0 -maxClientCnxns=60 - -# for healthcheck -clientPort=2181 -secureClientPort=2182 -ssl.clientAuth=need -ssl.keyStore.location=/certs/server.pem -ssl.trustStore.location=/certs/ca.cert.pem -serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory -"; - let config_file = dir.path().join("zoo.cfg"); - fs::write(&config_file, &config).unwrap(); - - let docker = DockerCli::default(); - let image = zookeeper_image_with_port_and_volumes(2182, hashmap! { - "/conf/zoo.cfg" => config_file.as_path(), - "/certs/server.pem" => server_pem_file.as_path(), - "/certs/ca.cert.pem" => ca_cert_file.as_path(), - }); - let zookeeper = docker.run(image); - let zk_port = zookeeper.get_host_port(2182); - - let client_cert = generate_client_cert(); - let signed_client_cert = client_cert.serialize_pem_with_signer(&ca_cert).unwrap(); - let client_key = client_cert.serialize_private_key_pem(); - - let client = zk::Client::builder() - .assume_tls() - .trust_ca_pem_certs(&ca_cert_pem) - .unwrap() - .use_client_pem_cert(&signed_client_cert, &client_key) - .unwrap() - .connect(&format!("tcp+tls://127.0.0.1:{}", zk_port)) - .await - .unwrap(); + let server = Server::with_options(ServerOptions { tls: Some(true), ..Default::default() }); + let client = server.client(None).await; let children = client.list_children("/").await.unwrap(); assert_that!(children).contains("zookeeper".to_owned()); } @@ -1646,7 +1639,7 @@ admin.enableServer=true"; let myid_path = dir.path().join(format!("zoo{server_id}.myid")); fs::write(&cfg_path, format!("{options}\n{}\n", servers.join("\n"))).unwrap(); fs::write(&myid_path, format!("{server_id}\n")).unwrap(); - RunnableImage::from(zookeeper_image()) + RunnableImage::from(zookeeper_image(Default::default())) .with_network("host") .with_volume((cfg_path.as_path().to_str().unwrap(), "/conf/zoo.cfg")) .with_volume((myid_path.as_path().to_str().unwrap(), "/data/myid"))