Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Cluster Sandbox #3764

Merged
merged 2 commits into from
Aug 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,53 +139,10 @@ pub async fn ingest_with_retry(
}

impl ClusterSandbox {
// Starts one node that runs all the services.
pub async fn start_standalone_node() -> anyhow::Result<Self> {
let temp_dir = tempfile::tempdir()?;
let services = QuickwitService::supported_services();
let node_configs = build_node_configs(temp_dir.path().to_path_buf(), &[services]);
let storage_resolver = StorageResolver::unconfigured();
let metastore_resolver = MetastoreResolver::unconfigured();
// There is exactly one node.
let node_config = node_configs[0].clone();
let node_config_clone = node_config.clone();
let runtimes_config = RuntimesConfig::light_for_tests();
let shutdown_trigger = ClusterShutdownTrigger::new();
let shutdown_signal = shutdown_trigger.shutdown_signal();
let join_handles = vec![tokio::spawn(async move {
let result = serve_quickwit(
node_config_clone.node_config,
runtimes_config,
storage_resolver,
metastore_resolver,
shutdown_signal,
)
.await?;
Result::<_, anyhow::Error>::Ok(result)
})];
wait_for_server_ready(node_config.node_config.grpc_listen_addr).await?;
Ok(Self {
node_configs,
indexer_rest_client: QuickwitClientBuilder::new(transport_url(
node_config.node_config.rest_listen_addr,
))
.build(),
searcher_rest_client: QuickwitClientBuilder::new(transport_url(
node_config.node_config.rest_listen_addr,
))
.build(),
_temp_dir: temp_dir,
join_handles,
shutdown_trigger,
})
}

// Starts nodes with corresponding services given by `nodes_services`.
pub async fn start_cluster_nodes(
nodes_services: &[HashSet<QuickwitService>],
pub async fn start_cluster_with_configs(
temp_dir: TempDir,
node_configs: Vec<TestNodeConfig>,
) -> anyhow::Result<Self> {
let temp_dir = tempfile::tempdir()?;
let node_configs = build_node_configs(temp_dir.path().to_path_buf(), nodes_services);
let runtimes_config = RuntimesConfig::light_for_tests();
let storage_resolver = StorageResolver::unconfigured();
let metastore_resolver = MetastoreResolver::unconfigured();
Expand Down Expand Up @@ -220,9 +177,14 @@ impl ClusterSandbox {
.find(|node_config| node_config.services.contains(&QuickwitService::Indexer))
.cloned()
.unwrap();
// Wait for a duration greater than chitchat GOSSIP_INTERVAL (50ms) so that the cluster is
// formed.
tokio::time::sleep(Duration::from_millis(100)).await;
if node_configs.len() == 1 {
// We have only one node, so we can just wait for it to get started
wait_for_server_ready(node_configs[0].node_config.grpc_listen_addr).await?;
} else {
// Wait for a duration greater than chitchat GOSSIP_INTERVAL (50ms) so that the cluster
// is formed.
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(Self {
node_configs,
searcher_rest_client: QuickwitClientBuilder::new(transport_url(
Expand All @@ -239,6 +201,23 @@ impl ClusterSandbox {
})
}

// Starts one node that runs all the services.
pub async fn start_standalone_node() -> anyhow::Result<Self> {
let temp_dir = tempfile::tempdir()?;
let services = QuickwitService::supported_services();
let node_configs = build_node_configs(temp_dir.path().to_path_buf(), &[services]);
Self::start_cluster_with_configs(temp_dir, node_configs).await
}

// Starts nodes with corresponding services given by `nodes_services`.
pub async fn start_cluster_nodes(
nodes_services: &[HashSet<QuickwitService>],
) -> anyhow::Result<Self> {
let temp_dir = tempfile::tempdir()?;
let node_configs = build_node_configs(temp_dir.path().to_path_buf(), nodes_services);
Self::start_cluster_with_configs(temp_dir, node_configs).await
}

pub async fn wait_for_cluster_num_ready_nodes(
&self,
expected_num_ready_nodes: usize,
Expand Down