diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 0b39d44d85e..8c180909c46 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4614,6 +4614,36 @@ dependencies = [ "utoipa", ] +[[package]] +name = "quickwit-integration-tests" +version = "0.5.0" +dependencies = [ + "anyhow", + "bytes", + "chitchat", + "futures-util", + "hyper", + "itertools", + "quickwit-actors", + "quickwit-cluster", + "quickwit-common", + "quickwit-config", + "quickwit-indexing", + "quickwit-metastore", + "quickwit-proto", + "quickwit-rest-client", + "quickwit-search", + "quickwit-serve", + "rand 0.8.5", + "reqwest", + "serde", + "serde_json", + "tempfile", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "quickwit-jaeger" version = "0.5.0" @@ -4790,6 +4820,7 @@ dependencies = [ "anyhow", "bytes", "quickwit-actors", + "quickwit-cluster", "quickwit-common", "quickwit-config", "quickwit-indexing", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index f3faf7fbf68..9228c0f4629 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -17,6 +17,7 @@ members = [ "quickwit-grpc-clients", "quickwit-indexing", "quickwit-ingest", + "quickwit-integration-tests", "quickwit-jaeger", "quickwit-janitor", "quickwit-macros", @@ -210,6 +211,7 @@ quickwit-doc-mapper = { version = "0.5.0", path = "./quickwit-doc-mapper" } quickwit-grpc-clients = { version = "0.5.0", path = "./quickwit-grpc-clients" } quickwit-indexing = { version = "0.5.0", path = "./quickwit-indexing" } quickwit-ingest = { version = "0.5.0", path = "./quickwit-ingest" } +quickwit-integration-tests = { version = "0.5.0", path = "./quickwit-integration-tests" } quickwit-jaeger = { version = "0.5.0", path = "./quickwit-jaeger" } quickwit-janitor = { version = "0.5.0", path = "./quickwit-janitor" } quickwit-macros = { version = "0.5.0", path = "./quickwit-macros" } diff --git a/quickwit/quickwit-actors/src/registry.rs b/quickwit/quickwit-actors/src/registry.rs index 4d8fd7c387a..cb647fb2504 100644 --- a/quickwit/quickwit-actors/src/registry.rs +++ b/quickwit/quickwit-actors/src/registry.rs @@ -181,16 +181,18 @@ impl ActorRegistry { } } - pub async fn quit(&self) -> Vec { + pub async fn quit(&self) -> HashMap { let mut obs_futures = Vec::new(); + let mut actor_ids = Vec::new(); for registry_for_type in self.actors.read().unwrap().values() { for obs in ®istry_for_type.observables { let obs_clone = obs.clone(); obs_futures.push(async move { obs_clone.quit().await }); + actor_ids.push(obs.actor_instance_id().to_string()); } } let res = future::join_all(obs_futures).await; - res.into_iter().collect() + actor_ids.into_iter().zip(res).collect() } pub fn is_empty(&self) -> bool { diff --git a/quickwit/quickwit-actors/src/universe.rs b/quickwit/quickwit-actors/src/universe.rs index d5317c448b1..40b1737c3bf 100644 --- a/quickwit/quickwit-actors/src/universe.rs +++ b/quickwit/quickwit-actors/src/universe.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::HashMap; use std::thread; use std::time::Duration; @@ -120,7 +121,7 @@ impl Universe { } /// Gracefully quits all registered actors. - pub async fn quit(&self) -> Vec { + pub async fn quit(&self) -> HashMap { self.spawn_ctx.registry.quit().await } @@ -132,7 +133,7 @@ impl Universe { assert!(!self .quit() .await - .into_iter() + .values() .any(|status| matches!(status, ActorExitStatus::Panicked))); } } @@ -233,7 +234,10 @@ mod tests { universe.sleep(Duration::from_secs(200)).await; let res = universe.quit().await; assert_eq!(res.len(), 1); - assert!(matches!(res.first().unwrap(), ActorExitStatus::Quit)); + assert!(matches!( + res.values().next().unwrap(), + ActorExitStatus::Quit + )); assert!(matches!(handler.quit().await, (ActorExitStatus::Quit, 4))); } @@ -246,7 +250,7 @@ mod tests { assert!(!universe .quit() .await - .into_iter() + .values() .any(|status| matches!(status, ActorExitStatus::Panicked))); } @@ -260,7 +264,7 @@ mod tests { assert!(universe .quit() .await - .into_iter() + .values() .any(|status| matches!(status, ActorExitStatus::Panicked))); } diff --git a/quickwit/quickwit-cli/src/service.rs b/quickwit/quickwit-cli/src/service.rs index 67840e5bcc6..5fe026ffef7 100644 --- a/quickwit/quickwit-cli/src/service.rs +++ b/quickwit/quickwit-cli/src/service.rs @@ -26,6 +26,7 @@ use quickwit_common::uri::Uri; use quickwit_config::service::QuickwitService; use quickwit_serve::serve_quickwit; use quickwit_telemetry::payload::TelemetryEvent; +use tokio::signal; use tracing::debug; use crate::{config_cli_arg, load_quickwit_config, start_actor_runtimes}; @@ -80,7 +81,12 @@ impl RunCliCommand { quickwit_telemetry::send_telemetry_event(telemetry_event).await; // TODO move in serve quickwit? start_actor_runtimes(&config.enabled_services)?; - serve_quickwit(config).await?; + let _ = serve_quickwit(config, async move { + signal::ctrl_c() + .await + .expect("Failure listening for CTRL+C signal") + }) + .await?; Ok(()) } } diff --git a/quickwit/quickwit-integration-tests/Cargo.toml b/quickwit/quickwit-integration-tests/Cargo.toml new file mode 100644 index 00000000000..b85bdb8f071 --- /dev/null +++ b/quickwit/quickwit-integration-tests/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "quickwit-integration-tests" +version = "0.5.0" +authors = ["Quickwit, Inc. "] +edition = "2021" +license = "AGPL-3.0-or-later" # For a commercial, license, contact hello@quickwit.io +description = "Quickwit's integration tests" +repository = "https://github.com/quickwit-oss/quickwit" +homepage = "https://quickwit.io/" +documentation = "https://quickwit.io/docs/" + +[dependencies] + +[dev-dependencies] +anyhow = { workspace = true } +bytes = { workspace = true } +chitchat = { workspace = true } +futures-util = { workspace = true } +hyper = { workspace = true } +itertools = { workspace = true } +rand = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true } +tokio-stream = { workspace = true } +tracing = { workspace = true } + +quickwit-actors = { workspace = true, features = ["testsuite"] } +quickwit-cluster = { workspace = true, features = ["testsuite"] } +quickwit-common = { workspace = true, features = ["testsuite"] } +quickwit-config = { workspace = true, features = ["testsuite"] } +quickwit-indexing = { workspace = true, features = ["testsuite"] } +quickwit-metastore = { workspace = true, features = ["testsuite"] } +quickwit-search = { workspace = true, features = ["testsuite"] } +quickwit-rest-client = { workspace = true } +quickwit-serve = { workspace = true } +quickwit-proto = { workspace = true } diff --git a/quickwit/quickwit-integration-tests/resources/tests/documents_to_ingest.json b/quickwit/quickwit-integration-tests/resources/tests/documents_to_ingest.json new file mode 100644 index 00000000000..4411bc8f61e --- /dev/null +++ b/quickwit/quickwit-integration-tests/resources/tests/documents_to_ingest.json @@ -0,0 +1,3 @@ +{"body":"foo"} +{"body":"bar"} +{"body":"baz"} diff --git a/quickwit/quickwit-integration-tests/src/lib.rs b/quickwit/quickwit-integration-tests/src/lib.rs new file mode 100644 index 00000000000..78367f7bcd8 --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/lib.rs @@ -0,0 +1,23 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#[cfg(test)] +mod test_utils; +#[cfg(test)] +mod tests; diff --git a/quickwit/quickwit-serve/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs similarity index 70% rename from quickwit/quickwit-serve/src/test_utils/cluster_sandbox.rs rename to quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 1aedf2b0d33..57bffc6b4e1 100644 --- a/quickwit/quickwit-serve/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -23,19 +23,20 @@ use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; +use futures_util::{future, Future}; use itertools::Itertools; +use quickwit_actors::ActorExitStatus; use quickwit_common::new_coolid; use quickwit_common::test_utils::wait_for_server_ready; use quickwit_common::uri::Uri as QuickwitUri; use quickwit_config::service::QuickwitService; use quickwit_config::QuickwitConfig; -use quickwit_search::{create_search_service_client, SearchServiceClient}; -use rand::seq::IteratorRandom; +use quickwit_rest_client::rest_client::{QuickwitClient, Transport, DEFAULT_BASE_URL}; +use quickwit_serve::serve_quickwit; +use reqwest::Url; use tempfile::TempDir; -use tracing::info; - -use super::rest_client::QuickwitRestClient; -use crate::serve_quickwit; +use tokio::sync::watch::{self, Receiver, Sender}; +use tokio::task::JoinHandle; /// Configuration of a node made of a [`QuickwitConfig`] and a /// set of services. @@ -45,6 +46,29 @@ pub struct NodeConfig { pub services: HashSet, } +struct ClusterShutdownTrigger { + sender: Sender, + receiver: Receiver, +} + +impl ClusterShutdownTrigger { + fn new() -> Self { + let (sender, receiver) = watch::channel(false); + Self { sender, receiver } + } + + fn shutdown_signal(&self) -> impl Future { + let mut receiver = self.receiver.clone(); + async move { + receiver.changed().await.unwrap(); + } + } + + fn shutdown(self) { + self.sender.send(true).unwrap(); + } +} + /// Creates a Cluster Test environment. /// /// The goal is to start several nodes and use the gRPC or REST clients to @@ -55,10 +79,18 @@ pub struct NodeConfig { /// dropped by the first running test and the other tests will fail. pub struct ClusterSandbox { pub node_configs: Vec, - pub grpc_search_clients: HashMap, - pub searcher_rest_client: QuickwitRestClient, - pub indexer_rest_client: QuickwitRestClient, + pub searcher_rest_client: QuickwitClient, + pub indexer_rest_client: QuickwitClient, _temp_dir: TempDir, + join_handles: Vec, anyhow::Error>>>, + shutdown_trigger: ClusterShutdownTrigger, +} + +fn transport_url(addr: SocketAddr) -> Url { + let mut url = Url::parse(DEFAULT_BASE_URL).unwrap(); + url.set_ip_host(addr.ip()).unwrap(); + url.set_port(Some(addr.port())).unwrap(); + url } impl ClusterSandbox { @@ -70,28 +102,24 @@ impl ClusterSandbox { // There is exactly one node. let node_config = node_configs[0].clone(); let node_config_clone = node_config.clone(); - // Creates an index before starting nodes as currently Quickwit does not support - // dynamic creation/deletion of indexes/sources. - tokio::spawn(async move { - let result = serve_quickwit(node_config_clone.quickwit_config).await; - println!("Quickwit server terminated: {result:?}"); - Result::<_, anyhow::Error>::Ok(()) - }); + let shutdown_trigger = ClusterShutdownTrigger::new(); + let shutdown_signal = shutdown_trigger.shutdown_signal(); + let join_handles = vec![tokio::spawn(async move { + let result = serve_quickwit(node_config_clone.quickwit_config, shutdown_signal).await?; + Result::<_, anyhow::Error>::Ok(result) + })]; wait_for_server_ready(node_config.quickwit_config.grpc_listen_addr).await?; - let mut grpc_search_clients = HashMap::new(); - let search_client = - create_search_service_client(node_config.quickwit_config.grpc_listen_addr).await?; - grpc_search_clients.insert(node_config.quickwit_config.grpc_listen_addr, search_client); Ok(Self { node_configs, - grpc_search_clients, - indexer_rest_client: QuickwitRestClient::new( + indexer_rest_client: QuickwitClient::new(Transport::new(transport_url( node_config.quickwit_config.rest_listen_addr, - ), - searcher_rest_client: QuickwitRestClient::new( + ))), + searcher_rest_client: QuickwitClient::new(Transport::new(transport_url( node_config.quickwit_config.rest_listen_addr, - ), + ))), _temp_dir: temp_dir, + join_handles, + shutdown_trigger, }) } @@ -101,13 +129,16 @@ impl ClusterSandbox { ) -> anyhow::Result { let temp_dir = tempfile::tempdir()?; let node_configs = build_node_configs(temp_dir.path().to_path_buf(), nodes_services); + let mut join_handles = Vec::new(); + let shutdown_trigger = ClusterShutdownTrigger::new(); for node_config in node_configs.iter() { let node_config_clone = node_config.clone(); - tokio::spawn(async move { - let result = serve_quickwit(node_config_clone.quickwit_config).await; - info!("Quickwit server terminated: {:?}", result); - Result::<_, anyhow::Error>::Ok(()) - }); + let shutdown_signal = shutdown_trigger.shutdown_signal(); + join_handles.push(tokio::spawn(async move { + let result = + serve_quickwit(node_config_clone.quickwit_config, shutdown_signal).await?; + Result::<_, anyhow::Error>::Ok(result) + })); } let searcher_config = node_configs .iter() @@ -119,28 +150,20 @@ impl ClusterSandbox { .find(|node_config| node_config.services.contains(&QuickwitService::Indexer)) .cloned() .unwrap(); - let mut grpc_search_clients = HashMap::new(); - for node_config in node_configs.iter() { - if !node_config.services.contains(&QuickwitService::Searcher) { - continue; - } - let search_client = - create_search_service_client(node_config.quickwit_config.grpc_listen_addr).await?; - grpc_search_clients.insert(search_client.grpc_addr(), search_client); - } // Wait for a duration greater than chitchat GOSSIP_INTERVAL (50ms) so that the cluster is // formed. tokio::time::sleep(Duration::from_millis(100)).await; Ok(Self { node_configs, - grpc_search_clients, - searcher_rest_client: QuickwitRestClient::new( + searcher_rest_client: QuickwitClient::new(Transport::new(transport_url( searcher_config.quickwit_config.rest_listen_addr, - ), - indexer_rest_client: QuickwitRestClient::new( + ))), + indexer_rest_client: QuickwitClient::new(Transport::new(transport_url( indexer_config.quickwit_config.rest_listen_addr, - ), + ))), _temp_dir: temp_dir, + join_handles, + shutdown_trigger, }) } @@ -152,7 +175,7 @@ impl ClusterSandbox { let max_num_attempts = 3; while num_attempts < max_num_attempts { tokio::time::sleep(Duration::from_millis(100 * (num_attempts + 1))).await; - let cluster_snapshot = self.indexer_rest_client.cluster_snapshot().await?; + let cluster_snapshot = self.indexer_rest_client.cluster().snapshot().await?; if cluster_snapshot.ready_nodes.len() == expected_num_alive_nodes { return Ok(()); } @@ -164,10 +187,14 @@ impl ClusterSandbox { Ok(()) } - pub fn get_random_search_client(&self) -> SearchServiceClient { - let mut rng = rand::thread_rng(); - let selected_addr = self.grpc_search_clients.keys().choose(&mut rng).unwrap(); - self.grpc_search_clients.get(selected_addr).unwrap().clone() + pub async fn shutdown(self) -> Result>, anyhow::Error> { + self.shutdown_trigger.shutdown(); + let result = future::join_all(self.join_handles).await; + let mut statuses = Vec::new(); + for node in result { + statuses.push(node??); + } + Ok(statuses) } } diff --git a/quickwit/quickwit-serve/src/test_utils/mod.rs b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs similarity index 94% rename from quickwit/quickwit-serve/src/test_utils/mod.rs rename to quickwit/quickwit-integration-tests/src/test_utils/mod.rs index 50e3af8abc1..c7b326cdb0e 100644 --- a/quickwit/quickwit-serve/src/test_utils/mod.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs @@ -18,7 +18,5 @@ // along with this program. If not, see . mod cluster_sandbox; -mod rest_client; pub use cluster_sandbox::{build_node_configs, ClusterSandbox}; -pub use rest_client::QuickwitRestClient; diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs new file mode 100644 index 00000000000..eea701efb3b --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -0,0 +1,251 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashSet; +use std::path::PathBuf; +use std::str::FromStr; +use std::time::Duration; + +use hyper::{Body, Method, Request, StatusCode}; +use quickwit_config::service::QuickwitService; +use quickwit_rest_client::models::IngestSource; +use quickwit_rest_client::rest_client::CommitType; +use quickwit_serve::SearchRequestQueryString; + +use crate::test_utils::ClusterSandbox; + +fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String { + format!( + "{}/resources/tests/{}", + env!("CARGO_MANIFEST_DIR"), + ndjson_dataset_filename + ) +} + +#[tokio::test] +async fn test_ui_redirect_on_get() { + quickwit_common::setup_logging_for_tests(); + let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + assert!(sandbox + .indexer_rest_client + .node_health() + .is_ready() + .await + .unwrap()); + + let node_config = sandbox.node_configs.first().unwrap(); + let client = hyper::Client::builder() + .pool_idle_timeout(Duration::from_secs(30)) + .http2_only(true) + .build_http(); + let root_uri = format!("http://{}/", node_config.quickwit_config.rest_listen_addr) + .parse::() + .unwrap(); + let response = client.get(root_uri.clone()).await.unwrap(); + assert_eq!(response.status(), StatusCode::MOVED_PERMANENTLY); + let post_request = Request::builder() + .uri(root_uri) + .method(Method::POST) + .body(Body::from("{}")) + .unwrap(); + let response = client.request(post_request).await.unwrap(); + assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); + sandbox.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_standalone_server() { + quickwit_common::setup_logging_for_tests(); + let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + assert!(sandbox + .indexer_rest_client + .node_health() + .is_ready() + .await + .unwrap()); + { + // The indexing service should be running. + let counters = sandbox + .indexer_rest_client + .node_stats() + .indexing() + .await + .unwrap(); + assert_eq!(counters.num_running_pipelines, 0); + } + + { + // Create an dynamic index. + sandbox + .indexer_rest_client + .indexes() + .create( + r#" + version: 0.5 + index_id: my-new-index + doc_mapping: + field_mappings: + - name: body + type: text + "# + .into(), + quickwit_config::ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + + // Index should be searchable + assert_eq!( + sandbox + .indexer_rest_client + .search( + "my-new-index", + SearchRequestQueryString { + query: "body:test".to_string(), + max_hits: 10, + ..Default::default() + }, + ) + .await + .unwrap() + .num_hits, + 0 + ); + tokio::time::sleep(Duration::from_millis(100)).await; + let counters = sandbox + .indexer_rest_client + .node_stats() + .indexing() + .await + .unwrap(); + assert_eq!(counters.num_running_pipelines, 1); + } + sandbox.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_multi_nodes_cluster() { + quickwit_common::setup_logging_for_tests(); + let nodes_services = vec![ + HashSet::from_iter([QuickwitService::Searcher]), + HashSet::from_iter([QuickwitService::Metastore]), + HashSet::from_iter([QuickwitService::Indexer]), + HashSet::from_iter([QuickwitService::ControlPlane]), + HashSet::from_iter([QuickwitService::Janitor]), + ]; + let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) + .await + .unwrap(); + sandbox.wait_for_cluster_num_ready_nodes(4).await.unwrap(); + + { + // Wait for indexer to fully start. + // The starting time is a bit long for a cluster. + tokio::time::sleep(Duration::from_secs(3)).await; + let indexing_service_counters = sandbox + .indexer_rest_client + .node_stats() + .indexing() + .await + .unwrap(); + assert_eq!(indexing_service_counters.num_running_pipelines, 0); + } + + // Create index + sandbox + .indexer_rest_client + .indexes() + .create( + r#" + version: 0.5 + index_id: my-new-multi-node-index + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + "# + .into(), + quickwit_config::ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + assert!(sandbox + .indexer_rest_client + .node_health() + .is_live() + .await + .unwrap()); + + // Wait until indexing pipelines are started. + tokio::time::sleep(Duration::from_millis(100)).await; + let indexing_service_counters = sandbox + .indexer_rest_client + .node_stats() + .indexing() + .await + .unwrap(); + assert_eq!(indexing_service_counters.num_running_pipelines, 1); + + // Check search is working. + let search_response_empty = sandbox + .searcher_rest_client + .search( + "my-new-multi-node-index", + SearchRequestQueryString { + query: "body:bar".to_string(), + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(search_response_empty.num_hits, 0); + + // Check that ingest request send to searcher is forwarded to indexer and thus indexed. + let ndjson_filepath = get_ndjson_filepath("documents_to_ingest.json"); + let ingest_source = IngestSource::File(PathBuf::from_str(&ndjson_filepath).unwrap()); + sandbox + .searcher_rest_client + .ingest( + "my-new-multi-node-index", + ingest_source, + None, + CommitType::Auto, + ) + .await + .unwrap(); + // Wait until split is commited and search. + tokio::time::sleep(Duration::from_secs(4)).await; + let search_response_one_hit = sandbox + .searcher_rest_client + .search( + "my-new-multi-node-index", + SearchRequestQueryString { + query: "body:bar".to_string(), + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(search_response_one_hit.num_hits, 1); + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/quickwit-integration-tests/src/tests/mod.rs b/quickwit/quickwit-integration-tests/src/tests/mod.rs new file mode 100644 index 00000000000..d39ebdc72f2 --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/tests/mod.rs @@ -0,0 +1,20 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod basic_tests; diff --git a/quickwit/quickwit-rest-client/Cargo.toml b/quickwit/quickwit-rest-client/Cargo.toml index c150f978b4b..d5a63ca314d 100644 --- a/quickwit/quickwit-rest-client/Cargo.toml +++ b/quickwit/quickwit-rest-client/Cargo.toml @@ -19,6 +19,8 @@ thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +quickwit-indexing = { workspace = true } +quickwit-cluster = { workspace = true } quickwit-common = { workspace = true } quickwit-config = { workspace = true } quickwit-ingest = { workspace = true } diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index 689ada37e1a..ac53f3ab1b9 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -20,8 +20,10 @@ use std::time::Duration; use bytes::Bytes; +use quickwit_cluster::ClusterSnapshot; use quickwit_common::FileEntry; use quickwit_config::{ConfigFormat, SourceConfig}; +use quickwit_indexing::actors::IndexingServiceCounters; pub use quickwit_ingest::CommitType; use quickwit_metastore::{IndexMetadata, Split}; use quickwit_search::SearchResponseRest; @@ -41,6 +43,7 @@ pub const INGEST_CONTENT_LENGTH_LIMIT: usize = 10 * 1024 * 1024; // 10MiB pub struct Transport { base_url: Url, + api_url: Url, client: Client, } @@ -53,11 +56,13 @@ impl Default for Transport { impl Transport { pub fn new(endpoint: Url) -> Self { - let base_url = endpoint + let base_url = endpoint; + let api_url = base_url .join("api/v1/") .expect("Endpoint should not be malformed."); Self { base_url, + api_url, client: Client::new(), } } @@ -66,6 +71,10 @@ impl Transport { &self.base_url } + pub fn api_url(&self) -> &Url { + &self.api_url + } + /// Creates an asynchronous request that can be awaited pub async fn send( &self, @@ -75,10 +84,12 @@ impl Transport { query_string: Option<&Q>, body: Option, ) -> Result { - let url = self - .base_url - .join(path.trim_start_matches('/')) - .map_err(|error| Error::UrlParse(error.to_string()))?; + let url = if path.starts_with('/') { + self.base_url.join(path) + } else { + self.api_url.join(path) + } + .map_err(|error| Error::UrlParse(error.to_string()))?; let mut request_builder = self.client.request(method, url); request_builder = request_builder.timeout(Duration::from_secs(10)); let mut request_headers = HeaderMap::new(); @@ -140,6 +151,18 @@ impl QuickwitClient { SourceClient::new(&self.transport, index_id) } + pub fn cluster(&self) -> ClusterClient { + ClusterClient::new(&self.transport) + } + + pub fn node_stats(&self) -> NodeStatsClient { + NodeStatsClient::new(&self.transport) + } + + pub fn node_health(&self) -> NodeHealthClient { + NodeHealthClient::new(&self.transport) + } + pub async fn ingest( &self, index_id: &str, @@ -187,6 +210,7 @@ impl QuickwitClient { event_fn(IngestEvent::IngestedDocBatch(batch.len())) } } + Ok(()) } } @@ -418,6 +442,77 @@ impl<'a, 'b> SourceClient<'a, 'b> { } } +/// Client for Cluster APIs. +pub struct ClusterClient<'a> { + transport: &'a Transport, +} + +impl<'a> ClusterClient<'a> { + pub fn new(transport: &'a Transport) -> Self { + Self { transport } + } + + pub async fn snapshot(&self) -> Result { + let response = self + .transport + .send::<()>(Method::GET, "cluster", None, None, None) + .await?; + let cluster_snapshot = response.deserialize().await?; + Ok(cluster_snapshot) + } +} + +/// Client for Node-level Stats APIs. +pub struct NodeStatsClient<'a> { + transport: &'a Transport, +} + +impl<'a> NodeStatsClient<'a> { + pub fn new(transport: &'a Transport) -> Self { + Self { transport } + } + + pub async fn indexing(&self) -> Result { + let response = self + .transport + .send::<()>(Method::GET, "indexing", None, None, None) + .await?; + let indexing_stats = response.deserialize().await?; + Ok(indexing_stats) + } +} + +/// Client for Node-level Health APIs. +pub struct NodeHealthClient<'a> { + transport: &'a Transport, +} + +impl<'a> NodeHealthClient<'a> { + pub fn new(transport: &'a Transport) -> Self { + Self { transport } + } + + /// Returns true if the node is healthy, returns false or an error otherwise. + pub async fn is_live(&self) -> Result { + let response = self + .transport + .send::<()>(Method::GET, "/health/livez", None, None, None) + .await?; + let result: bool = response.deserialize().await?; + Ok(result) + } + + /// Returns true if the node is ready, returns false or an error otherwise. + pub async fn is_ready(&self) -> Result { + let response = self + .transport + .send::<()>(Method::GET, "/health/readyz", None, None, None) + .await?; + let result: bool = response.deserialize().await?; + Ok(result) + } +} + fn header_from_config_format(config_format: ConfigFormat) -> HeaderMap { let mut header_map = HeaderMap::new(); let content_type_value = format!("application/{}", config_format.as_str()); @@ -459,8 +554,12 @@ mod test { let transport = Transport::default(); assert_eq!( transport.base_url(), + &Url::parse("http://127.0.0.1:7280/").unwrap() + ); + assert_eq!( + transport.api_url(), &Url::parse("http://127.0.0.1:7280/api/v1/").unwrap() - ) + ); } #[tokio::test] @@ -925,4 +1024,32 @@ mod test { .await .unwrap_err(); } + + #[tokio::test] + async fn test_health_endpoints() { + let mock_server = MockServer::start().await; + let server_url = Url::parse(&mock_server.uri()).unwrap(); + let qw_client = QuickwitClient::new(Transport::new(server_url)); + + assert!(qw_client.node_health().is_live().await.is_err()); + assert!(qw_client.node_health().is_ready().await.is_err()); + + // GET /health/livez + Mock::given(method("GET")) + .and(path("/health/livez")) + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_json(true)) + .expect(1) + .mount(&mock_server) + .await; + assert!(qw_client.node_health().is_live().await.unwrap()); + + // GET /health/readyz + Mock::given(method("GET")) + .and(path("/health/readyz")) + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_json(true)) + .expect(1) + .mount(&mock_server) + .await; + assert!(qw_client.node_health().is_ready().await.unwrap()); + } } diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index 83431b03f73..e00bb006e7a 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -20,6 +20,7 @@ use std::collections::BTreeSet; use std::net::SocketAddr; +use futures::Future; use quickwit_config::service::QuickwitService; use quickwit_control_plane::control_plane_service_grpc_server::ControlPlaneServiceGrpcServer; use quickwit_control_plane::ControlPlaneServiceGrpcServerAdapter; @@ -44,10 +45,14 @@ use crate::search_api::GrpcSearchAdapter; use crate::QuickwitServices; /// Starts gRPC services given a gRPC address. -pub(crate) async fn start_grpc_server( +pub(crate) async fn start_grpc_server( grpc_listen_addr: SocketAddr, services: &QuickwitServices, -) -> anyhow::Result<()> { + shutdown_signal: F, +) -> anyhow::Result<()> +where + F: Future, +{ let mut enabled_grpc_services = BTreeSet::new(); let mut server = Server::builder(); @@ -150,6 +155,8 @@ pub(crate) async fn start_grpc_server( .add_optional_service(jaeger_grpc_service); info!(enabled_grpc_services=?enabled_grpc_services, grpc_listen_addr=?grpc_listen_addr, "Starting gRPC server."); - server_router.serve(grpc_listen_addr).await?; + server_router + .serve_with_shutdown(grpc_listen_addr, shutdown_signal) + .await?; Ok(()) } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 0111de13bfc..f259b90e067 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -36,12 +36,10 @@ mod node_info_handler; mod openapi; mod search_api; #[cfg(test)] -mod test_utils; -#[cfg(test)] mod tests; mod ui_handler; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::convert::Infallible; use std::num::NonZeroUsize; use std::sync::Arc; @@ -50,9 +48,10 @@ use std::time::Duration; use anyhow::anyhow; use byte_unit::n_mib_bytes; use format::BodyFormat; +use futures::Future; use itertools::Itertools; use once_cell::sync::OnceCell; -use quickwit_actors::{Mailbox, Universe}; +use quickwit_actors::{ActorExitStatus, Mailbox, Universe}; use quickwit_cluster::{Cluster, ClusterMember}; use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle}; use quickwit_common::tower::{ @@ -78,6 +77,7 @@ use quickwit_opentelemetry::otlp::{OTEL_LOGS_INDEX_CONFIG, OTEL_TRACE_INDEX_CONF use quickwit_search::{start_searcher_service, SearchJobPlacer, SearchService}; use quickwit_storage::quickwit_storage_uri_resolver; use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot; use tower::ServiceBuilder; use tracing::{debug, error, warn}; use warp::{Filter, Rejection}; @@ -123,7 +123,13 @@ fn has_node_with_metastore_service(members: &[ClusterMember]) -> bool { }) } -pub async fn serve_quickwit(config: QuickwitConfig) -> anyhow::Result<()> { +pub async fn serve_quickwit( + config: QuickwitConfig, + shutdown_signal: F, +) -> anyhow::Result> +where + F: Future + Send + 'static, +{ let universe = Universe::new(); let event_broker = EventBroker::default(); let storage_resolver = quickwit_storage_uri_resolver().clone(); @@ -314,15 +320,41 @@ pub async fn serve_quickwit(config: QuickwitConfig) -> anyhow::Result<()> { index_service, services, }; - let grpc_server = grpc::start_grpc_server(grpc_listen_addr, &quickwit_services); - let rest_server = rest::start_rest_server(rest_listen_addr, &quickwit_services); + let (grpc_shutdown_trigger, grpc_shutdown_signal) = oneshot::channel::<()>(); + let grpc_server = grpc::start_grpc_server(grpc_listen_addr, &quickwit_services, async move { + grpc_shutdown_signal + .await + .expect("Failure to shutdown grpc sevice"); + }); + let (rest_shutdown_trigger, rest_shutdown_signal) = oneshot::channel::<()>(); + let rest_server = rest::start_rest_server(rest_listen_addr, &quickwit_services, async move { + rest_shutdown_signal + .await + .expect("Failure to shutdown rest service"); + }); // Node readiness indicates that the server is ready to receive requests. // Thus readiness task is started once gRPC and REST servers are started. tokio::spawn(node_readiness_reporting_task(cluster, metastore)); + let shutdown_handle = tokio::spawn(async move { + shutdown_signal.await; + + grpc_shutdown_trigger + .send(()) + .expect("Failure to send shutdown signal to grpc seservicerver"); + rest_shutdown_trigger + .send(()) + .expect("Failure to send shutdown signal to rest service"); + + universe.quit().await + }); + tokio::try_join!(grpc_server, rest_server)?; - Ok(()) + + let result = shutdown_handle.await?; + + Ok(result) } #[derive(Clone)] diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 37b7739f844..8887b1c724b 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -19,6 +19,7 @@ use std::net::SocketAddr; +use futures::Future; use hyper::http::HeaderValue; use hyper::{http, Method}; use quickwit_common::metrics; @@ -49,10 +50,14 @@ use crate::{BodyFormat, QuickwitServices}; const MINIMUM_RESPONSE_COMPRESSION_SIZE: u16 = 10 << 10; /// Starts REST services. -pub(crate) async fn start_rest_server( +pub(crate) async fn start_rest_server( rest_listen_addr: SocketAddr, quickwit_services: &QuickwitServices, -) -> anyhow::Result<()> { + shutdown_signal: F, +) -> anyhow::Result<()> +where + F: Future, +{ info!(rest_listen_addr = %rest_listen_addr, "Starting REST server."); let request_counter = warp::log::custom(|_| { crate::SERVE_METRICS.http_requests_total.inc(); @@ -140,6 +145,7 @@ pub(crate) async fn start_rest_server( hyper::Server::bind(&rest_listen_addr) .serve(Shared::new(service)) + .with_graceful_shutdown(shutdown_signal) .await?; Ok(()) } diff --git a/quickwit/quickwit-serve/src/test_utils/rest_client.rs b/quickwit/quickwit-serve/src/test_utils/rest_client.rs deleted file mode 100644 index b5258a5a1f1..00000000000 --- a/quickwit/quickwit-serve/src/test_utils/rest_client.rs +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright (C) 2023 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::net::SocketAddr; -use std::time::Duration; - -use anyhow::{bail, Context}; -use hyper::client::HttpConnector; -use hyper::{Body, Request, Response, StatusCode}; -use quickwit_cluster::ClusterSnapshot; -use quickwit_indexing::actors::IndexingServiceCounters; -use serde::de::DeserializeOwned; -use tokio_stream::StreamExt; - -pub struct QuickwitRestClient { - root_url: String, - client: hyper::Client, -} - -impl QuickwitRestClient { - pub fn new(addr: SocketAddr) -> Self { - let client = hyper::Client::builder() - .pool_idle_timeout(Duration::from_secs(30)) - .http2_only(true) - .build_http(); - let root_url = format!("http://{addr}"); - Self { root_url, client } - } - - pub async fn ingest_data(&self, index_id: &str, ndjson_doc: &str) -> anyhow::Result<()> { - let uri = format!("{}/api/v1/{index_id}/ingest", self.root_url) - .parse::() - .unwrap(); - let request = Request::builder() - .uri(uri) - .method("POST") - .header("content-type", "application/json") - .body(Body::from(ndjson_doc.to_string())) - .unwrap(); - let response = self - .client - .request(request) - .await - .context("Failed to emit request")?; - if response.status() != StatusCode::OK { - let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap(); - let body_str = String::from_utf8_lossy(&body_bytes); - bail!("error when creating index: {body_str}"); - } - Ok(()) - } - - pub async fn create_index(&self, index_config_yaml: &str) -> anyhow::Result<()> { - let uri = format!("{}/api/v1/indexes", self.root_url) - .parse::() - .unwrap(); - let request = Request::builder() - .uri(uri) - .method("POST") - .header("content-type", "application/yaml") - .body(Body::from(index_config_yaml.to_string())) - .unwrap(); - let response = self.client.request(request).await.unwrap(); - if response.status() == StatusCode::OK { - return Ok(()); - } - let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap(); - let body_string = String::from_utf8(body_bytes.to_vec()).unwrap(); - Err(anyhow::anyhow!("error when creating index: {body_string}")) - } - - pub async fn cluster_snapshot(&self) -> anyhow::Result { - let uri = format!("{}/api/v1/cluster", self.root_url) - .parse::() - .unwrap(); - let response = self.client.get(uri).await?; - let cluster_state = parse_body(response).await?; - Ok(cluster_state) - } - - pub async fn indexing_service_counters(&self) -> anyhow::Result { - let uri = format!("{}/api/v1/indexing", self.root_url) - .parse::() - .unwrap(); - let response = self.client.get(uri).await?; - let indexing_service_counters = parse_body(response).await?; - Ok(indexing_service_counters) - } - - pub async fn is_live(&self) -> anyhow::Result { - let uri = format!("{}/health/livez", self.root_url) - .parse::() - .unwrap(); - let response = self.client.get(uri).await?; - if response.status() == StatusCode::OK { - return Ok(true); - } - Ok(false) - } - - pub async fn is_ready(&self) -> anyhow::Result { - let uri = format!("{}/health/readyz", self.root_url) - .parse::() - .unwrap(); - let response = self.client.get(uri).await?; - if response.status() == StatusCode::OK { - return Ok(true); - } - Ok(false) - } - - pub fn client(&self) -> hyper::Client { - self.client.clone() - } - - pub fn root_url(&self) -> String { - self.root_url.clone() - } -} - -async fn parse_body(mut response: Response) -> anyhow::Result { - if response.status() != StatusCode::OK { - anyhow::bail!("Unexpected status {}", response.status()); - } - let mut body = Vec::new(); - while let Some(chunk) = response.body_mut().next().await { - body.extend_from_slice(&chunk?); - } - let deserialized_element: T = serde_json::from_slice(&body)?; - Ok(deserialized_element) -} diff --git a/quickwit/quickwit-serve/src/tests.rs b/quickwit/quickwit-serve/src/tests.rs index 30d742b9ba6..326216dc023 100644 --- a/quickwit/quickwit-serve/src/tests.rs +++ b/quickwit/quickwit-serve/src/tests.rs @@ -22,14 +22,11 @@ use std::sync::Arc; use std::time::Duration; use chitchat::transport::ChannelTransport; -use hyper::{Body, Method, Request, StatusCode}; use quickwit_cluster::create_cluster_for_test; use quickwit_common::uri::Uri; use quickwit_config::service::QuickwitService; use quickwit_metastore::{IndexMetadata, MockMetastore}; -use quickwit_proto::SearchRequest; -use crate::test_utils::ClusterSandbox; use crate::{check_cluster_configuration, node_readiness_reporting_task}; #[tokio::test] @@ -54,178 +51,6 @@ async fn test_check_cluster_configuration() { .unwrap(); } -#[tokio::test] -async fn test_standalone_server() { - quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); - sandbox - .indexer_rest_client - .cluster_snapshot() - .await - .unwrap(); - assert!(sandbox.indexer_rest_client.is_ready().await.unwrap()); - - { - let client = sandbox.indexer_rest_client.client(); - let root_uri = format!("{}/", sandbox.indexer_rest_client.root_url()) - .parse::() - .unwrap(); - let response = client.get(root_uri.clone()).await.unwrap(); - assert_eq!(response.status(), StatusCode::MOVED_PERMANENTLY); - let post_request = Request::builder() - .uri(root_uri) - .method(Method::POST) - .body(Body::from("{}")) - .unwrap(); - let response = client.request(post_request).await.unwrap(); - assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); - } - { - // The indexing service should bd running. - let counters = sandbox - .indexer_rest_client - .indexing_service_counters() - .await - .unwrap(); - assert_eq!(counters.num_running_pipelines, 0); - } - { - // Create an dynamic index. - sandbox - .indexer_rest_client - .create_index( - r#" - version: 0.5 - index_id: my-new-index - doc_mapping: - field_mappings: - - name: body - type: text - "#, - ) - .await - .unwrap(); - - // Index should be searchable - let mut search_client = sandbox.get_random_search_client(); - search_client - .root_search(SearchRequest { - index_id: "my-new-index".to_string(), - query: "body:test".to_string(), - search_fields: Vec::new(), - snippet_fields: Vec::new(), - start_timestamp: None, - end_timestamp: None, - aggregation_request: None, - max_hits: 10, - sort_by_field: None, - sort_order: None, - start_offset: 0, - }) - .await - .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - let counters = sandbox - .indexer_rest_client - .indexing_service_counters() - .await - .unwrap(); - assert_eq!(counters.num_running_pipelines, 1); - } -} - -#[tokio::test] -async fn test_multi_nodes_cluster() { - quickwit_common::setup_logging_for_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) - .await - .unwrap(); - sandbox.wait_for_cluster_num_ready_nodes(4).await.unwrap(); - - { - // Wait for indexer to fully start. - // The starting time is a bit long for a cluster. - tokio::time::sleep(Duration::from_secs(3)).await; - let indexing_service_counters = sandbox - .indexer_rest_client - .indexing_service_counters() - .await - .unwrap(); - assert_eq!(indexing_service_counters.num_running_pipelines, 0); - } - - // Create index - sandbox - .indexer_rest_client - .create_index( - r#" - version: 0.5 - index_id: my-new-multi-node-index - doc_mapping: - field_mappings: - - name: body - type: text - indexing_settings: - commit_timeout_secs: 1 - "#, - ) - .await - .unwrap(); - assert!(sandbox.indexer_rest_client.is_live().await.unwrap()); - - // Wait until indexing pipelines are started. - tokio::time::sleep(Duration::from_millis(100)).await; - let indexing_service_counters = sandbox - .indexer_rest_client - .indexing_service_counters() - .await - .unwrap(); - assert_eq!(indexing_service_counters.num_running_pipelines, 1); - - // Check search is working. - let mut search_client = sandbox.get_random_search_client(); - let search_request = SearchRequest { - index_id: "my-new-multi-node-index".to_string(), - query: "body:test".to_string(), - search_fields: Vec::new(), - start_timestamp: None, - end_timestamp: None, - aggregation_request: None, - max_hits: 10, - sort_by_field: None, - sort_order: None, - start_offset: 0, - snippet_fields: Vec::new(), - }; - let search_response_empty = search_client - .root_search(search_request.clone()) - .await - .unwrap(); - assert_eq!(search_response_empty.num_hits, 0); - - // Check that ingest request send to searcher is forwarded to indexer and thus indexed. - sandbox - .searcher_rest_client - .ingest_data("my-new-multi-node-index", "{\"body\": \"test\"}") - .await - .unwrap(); - // Wait until split is committed and search. - tokio::time::sleep(Duration::from_secs(4)).await; - let mut search_client = sandbox.get_random_search_client(); - let search_response_one_hit = search_client - .root_search(search_request.clone()) - .await - .unwrap(); - assert_eq!(search_response_one_hit.num_hits, 1); -} - #[tokio::test] async fn test_readiness_updates() -> anyhow::Result<()> { let transport = ChannelTransport::default();