Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a quickwit-integration-tests crate #3200

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move is_live and is_ready methods to rest client
  • Loading branch information
imotov committed Apr 20, 2023
commit 1e8c7ea3cea4ab976c748dcc7b93e3e4b666f6c3
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,11 @@ use quickwit_config::QuickwitConfig;
use quickwit_rest_client::rest_client::{QuickwitClient, Transport, DEFAULT_BASE_URL};
use quickwit_search::{create_search_service_client, SearchServiceClient};
use quickwit_serve::serve_quickwit;
use rand::seq::IteratorRandom;
use reqwest::Url;
use tempfile::TempDir;
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::task::JoinHandle;

use super::TestClient;

/// Configuration of a node made of a [`QuickwitConfig`] and a
/// set of services.
#[derive(Clone)]
Expand Down Expand Up @@ -83,11 +80,8 @@ impl ClusterShutdownTrigger {
/// dropped by the first running test and the other tests will fail.
pub struct ClusterSandbox {
pub node_configs: Vec<NodeConfig>,
pub grpc_search_clients: HashMap<SocketAddr, SearchServiceClient>,
pub searcher_rest_client: QuickwitClient,
pub indexer_rest_client: QuickwitClient,
pub searcher_rest_test_client: TestClient,
pub indexer_rest_test_client: TestClient,
_temp_dir: TempDir,
join_handles: Vec<JoinHandle<Result<HashMap<String, ActorExitStatus>, anyhow::Error>>>,
shutdown_trigger: ClusterShutdownTrigger,
Expand Down Expand Up @@ -116,23 +110,14 @@ impl ClusterSandbox {
Result::<_, anyhow::Error>::Ok(result)
})];
wait_for_server_ready(node_config.quickwit_config.grpc_listen_addr).await?;
let mut grpc_search_clients = HashMap::new();
let search_client =
create_search_service_client(node_config.quickwit_config.grpc_listen_addr).await?;
grpc_search_clients.insert(node_config.quickwit_config.grpc_listen_addr, search_client);
Ok(Self {
node_configs,
grpc_search_clients,
indexer_rest_client: QuickwitClient::new(Transport::new(transport_url(
node_config.quickwit_config.rest_listen_addr,
))),
searcher_rest_client: QuickwitClient::new(Transport::new(transport_url(
node_config.quickwit_config.rest_listen_addr,
))),
indexer_rest_test_client: TestClient::new(node_config.quickwit_config.rest_listen_addr),
searcher_rest_test_client: TestClient::new(
node_config.quickwit_config.rest_listen_addr,
),
_temp_dir: temp_dir,
join_handles,
shutdown_trigger,
Expand Down Expand Up @@ -166,33 +151,17 @@ impl ClusterSandbox {
.find(|node_config| node_config.services.contains(&QuickwitService::Indexer))
.cloned()
.unwrap();
let mut grpc_search_clients = HashMap::new();
for node_config in node_configs.iter() {
if !node_config.services.contains(&QuickwitService::Searcher) {
continue;
}
let search_client =
create_search_service_client(node_config.quickwit_config.grpc_listen_addr).await?;
grpc_search_clients.insert(search_client.grpc_addr(), search_client);
}
// Wait for a duration greater than chitchat GOSSIP_INTERVAL (50ms) so that the cluster is
// formed.
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(Self {
node_configs,
grpc_search_clients,
searcher_rest_client: QuickwitClient::new(Transport::new(transport_url(
searcher_config.quickwit_config.rest_listen_addr,
))),
indexer_rest_client: QuickwitClient::new(Transport::new(transport_url(
indexer_config.quickwit_config.rest_listen_addr,
))),
searcher_rest_test_client: TestClient::new(
searcher_config.quickwit_config.rest_listen_addr,
),
indexer_rest_test_client: TestClient::new(
indexer_config.quickwit_config.rest_listen_addr,
),
_temp_dir: temp_dir,
join_handles,
shutdown_trigger,
Expand All @@ -219,17 +188,10 @@ impl ClusterSandbox {
Ok(())
}

pub fn get_random_search_client(&self) -> SearchServiceClient {
let mut rng = rand::thread_rng();
let selected_addr = self.grpc_search_clients.keys().choose(&mut rng).unwrap();
self.grpc_search_clients.get(selected_addr).unwrap().clone()
}

pub async fn join(self) -> Result<Vec<HashMap<String, ActorExitStatus>>, anyhow::Error> {
pub async fn shutdown(self) -> Result<Vec<HashMap<String, ActorExitStatus>>, anyhow::Error> {
self.shutdown_trigger.shutdown();
let result = future::join_all(self.join_handles).await;
let mut statuses = Vec::new();

for node in result {
statuses.push(node??);
}
Expand Down
2 changes: 0 additions & 2 deletions quickwit/quickwit-integration-tests/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,5 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

mod cluster_sandbox;
mod test_client;

pub use cluster_sandbox::{build_node_configs, ClusterSandbox};
pub use test_client::TestClient;
71 changes: 0 additions & 71 deletions quickwit/quickwit-integration-tests/src/test_utils/test_client.rs

This file was deleted.

127 changes: 66 additions & 61 deletions quickwit/quickwit-integration-tests/src/tests/basic_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use std::time::Duration;

use hyper::{Body, Method, Request, StatusCode};
use quickwit_config::service::QuickwitService;
use quickwit_proto::SearchRequest;
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::rest_client::CommitType;
use quickwit_serve::SearchRequestQueryString;

use crate::test_utils::ClusterSandbox;

Expand All @@ -42,9 +42,19 @@ fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String {
async fn test_ui_redirect_on_get() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandbox::start_standalone_node().await.unwrap();
let client = sandbox.indexer_rest_test_client.client();
assert!(sandbox.indexer_rest_test_client.is_ready().await.unwrap());
let root_uri = format!("{}/", sandbox.indexer_rest_test_client.root_url())
assert!(sandbox
.indexer_rest_client
.node_health()
.is_ready()
.await
.unwrap());

let node_config = sandbox.node_configs.first().unwrap();
let client = hyper::Client::builder()
.pool_idle_timeout(Duration::from_secs(30))
.http2_only(true)
.build_http();
let root_uri = format!("http://{}/", node_config.quickwit_config.rest_listen_addr)
.parse::<hyper::Uri>()
.unwrap();
let response = client.get(root_uri.clone()).await.unwrap();
Expand All @@ -56,38 +66,30 @@ async fn test_ui_redirect_on_get() {
.unwrap();
let response = client.request(post_request).await.unwrap();
assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
sandbox.join().await.unwrap();
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_standalone_server() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandbox::start_standalone_node().await.unwrap();
let state = sandbox
assert!(sandbox
.indexer_rest_client
.cluster()
.snapshot()
.node_health()
.is_ready()
.await
.unwrap();
assert_eq!(
sandbox
.node_configs
.get(0)
.unwrap()
.quickwit_config
.cluster_id,
state.cluster_id
);
.unwrap());
{
// The indexing service should be running.
let counters = sandbox
.indexer_rest_client
.stats()
.node_stats()
.indexing()
.await
.unwrap();
assert_eq!(counters.num_running_pipelines, 0);
}

{
// Create an dynamic index.
sandbox
Expand All @@ -110,33 +112,32 @@ async fn test_standalone_server() {
.unwrap();

// Index should be searchable
let mut search_client = sandbox.get_random_search_client();
search_client
.root_search(SearchRequest {
index_id: "my-new-index".to_string(),
query: "body:test".to_string(),
search_fields: Vec::new(),
snippet_fields: Vec::new(),
start_timestamp: None,
end_timestamp: None,
aggregation_request: None,
max_hits: 10,
sort_by_field: None,
sort_order: None,
start_offset: 0,
})
.await
.unwrap();
assert_eq!(
sandbox
.indexer_rest_client
.search(
"my-new-index",
SearchRequestQueryString {
query: "body:test".to_string(),
max_hits: 10,
..Default::default()
},
)
.await
.unwrap()
.num_hits,
0
);
tokio::time::sleep(Duration::from_millis(100)).await;
let counters = sandbox
.indexer_rest_client
.stats()
.node_stats()
.indexing()
.await
.unwrap();
assert_eq!(counters.num_running_pipelines, 1);
}
sandbox.join().await.unwrap();
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
Expand All @@ -160,7 +161,7 @@ async fn test_multi_nodes_cluster() {
tokio::time::sleep(Duration::from_secs(3)).await;
let indexing_service_counters = sandbox
.indexer_rest_client
.stats()
.node_stats()
.indexing()
.await
.unwrap();
Expand Down Expand Up @@ -188,35 +189,33 @@ async fn test_multi_nodes_cluster() {
)
.await
.unwrap();
assert!(sandbox.indexer_rest_test_client.is_live().await.unwrap());
assert!(sandbox
.indexer_rest_client
.node_health()
.is_live()
.await
.unwrap());

// Wait until indexing pipelines are started.
tokio::time::sleep(Duration::from_millis(100)).await;
let indexing_service_counters = sandbox
.indexer_rest_client
.stats()
.node_stats()
.indexing()
.await
.unwrap();
assert_eq!(indexing_service_counters.num_running_pipelines, 1);

// Check search is working.
let mut search_client = sandbox.get_random_search_client();
let search_request = SearchRequest {
index_id: "my-new-multi-node-index".to_string(),
query: "body:bar".to_string(),
search_fields: Vec::new(),
start_timestamp: None,
end_timestamp: None,
aggregation_request: None,
max_hits: 10,
sort_by_field: None,
sort_order: None,
start_offset: 0,
snippet_fields: Vec::new(),
};
let search_response_empty = search_client
.root_search(search_request.clone())
let search_response_empty = sandbox
.searcher_rest_client
.search(
"my-new-multi-node-index",
SearchRequestQueryString {
query: "body:bar".to_string(),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(search_response_empty.num_hits, 0);
Expand All @@ -236,11 +235,17 @@ async fn test_multi_nodes_cluster() {
.unwrap();
// Wait until split is commited and search.
tokio::time::sleep(Duration::from_secs(4)).await;
let mut search_client = sandbox.get_random_search_client();
let search_response_one_hit = search_client
.root_search(search_request.clone())
let search_response_one_hit = sandbox
.searcher_rest_client
.search(
"my-new-multi-node-index",
SearchRequestQueryString {
query: "body:bar".to_string(),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(search_response_one_hit.num_hits, 1);
sandbox.join().await.unwrap();
sandbox.shutdown().await.unwrap();
}
Loading