Skip to content

Commit

Permalink
test: add metric table migration test
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 20, 2024
1 parent bf95b07 commit efdb6a8
Showing 1 changed file with 160 additions and 5 deletions.
165 changes: 160 additions & 5 deletions tests-integration/tests/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ macro_rules! region_migration_tests {
test_region_migration_all_regions,
test_region_migration_incorrect_from_peer,
test_region_migration_incorrect_region_id,
test_metric_table_region_migration_by_sql,
);
)*
};
Expand Down Expand Up @@ -129,7 +130,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();

// Prepares test table.
let table_id = prepare_testing_table(&cluster).await;
let table_id = prepare_testing_metric_table(&cluster).await;

// Inserts data
let results = insert_values(&cluster.frontend, logical_timer).await;
Expand Down Expand Up @@ -216,6 +217,131 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
assert!(procedure.is_none());
}

/// A naive metric table region migration test by SQL function
pub async fn test_metric_table_region_migration_by_sql(
store_type: StorageType,
endpoints: Vec<String>,
) {
let cluster_name = "test_region_migration";
let peer_factory = |id| Peer {
id,
addr: PEER_PLACEHOLDER_ADDR.to_string(),
};

// Prepares test cluster.
let (store_config, _guard) = get_test_store_config(&store_type);
let home_dir = create_temp_dir("test_migration_data_home");
let datanodes = 5u64;
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
let const_selector = Arc::new(ConstNodeSelector::new(vec![
peer_factory(1),
peer_factory(2),
peer_factory(3),
]));
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
broker_endpoints: endpoints.clone(),
..Default::default()
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.await;
// Prepares test metric tables.
let table_id = prepare_testing_metric_table(&cluster).await;
let query_ctx = QueryContext::arc();

// Inserts values
run_sql(
&cluster.frontend,
r#"INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1);"#,
query_ctx.clone(),
)
.await
.unwrap();

run_sql(
&cluster.frontend,
r#"INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);"#,
query_ctx.clone(),
)
.await
.unwrap();

// The region distribution
let mut distribution = find_region_distribution_by_sql(&cluster, "phy").await;
// Selecting target of region migration.
let (from_peer_id, from_regions) = distribution.pop_first().unwrap();
info!(
"Selecting from peer: {from_peer_id}, and regions: {:?}",
from_regions[0]
);
let to_peer_id = (from_peer_id + 1) % 3;
let region_id = RegionId::new(table_id, from_regions[0]);
// Trigger region migration.
let procedure_id =
trigger_migration_by_sql(&cluster, region_id.as_u64(), from_peer_id, to_peer_id).await;

info!("Started region procedure: {}!", procedure_id);

// Waits condition by checking procedure state
let frontend = cluster.frontend.clone();
wait_condition(
Duration::from_secs(10),
Box::pin(async move {
loop {
let state = query_procedure_by_sql(&frontend, &procedure_id).await;
if state == "{\"status\":\"Done\"}" {
info!("Migration done: {state}");
break;
} else {
info!("Migration not finished: {state}");
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
}),
)
.await;

let result = cluster
.frontend
.do_query(&format!("select * from t1"), query_ctx.clone())
.await
.remove(0);

let expected = "\
+-------+-------------------------+-----+
| host | ts | val |
+-------+-------------------------+-----+
| host2 | 1970-01-01T00:00:00.001 | 1.0 |
| host1 | 1970-01-01T00:00:00 | 0.0 |
+-------+-------------------------+-----+";
check_output_stream(result.unwrap().data, expected).await;

let result = cluster
.frontend
.do_query(&format!("select * from t2"), query_ctx)
.await
.remove(0);

let expected = "\
+------+-------------------------+-----+
| job | ts | val |
+------+-------------------------+-----+
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
| job1 | 1970-01-01T00:00:00 | 0.0 |
+------+-------------------------+-----+";
check_output_stream(result.unwrap().data, expected).await;
}

/// A naive region migration test by SQL function
pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Vec<String>) {
let cluster_name = "test_region_migration";
Expand Down Expand Up @@ -264,7 +390,7 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
}

// The region distribution
let mut distribution = find_region_distribution_by_sql(&cluster).await;
let mut distribution = find_region_distribution_by_sql(&cluster, TEST_TABLE_NAME).await;

let old_distribution = distribution.clone();

Expand Down Expand Up @@ -329,7 +455,7 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
.unwrap();
assert!(procedure.is_none());

let new_distribution = find_region_distribution_by_sql(&cluster).await;
let new_distribution = find_region_distribution_by_sql(&cluster, TEST_TABLE_NAME).await;

assert_ne!(old_distribution, new_distribution);
}
Expand Down Expand Up @@ -804,6 +930,32 @@ async fn assert_values(instance: &Arc<Instance>) {
check_output_stream(result.unwrap().data, expected).await;
}

async fn prepare_testing_metric_table(cluster: &GreptimeDbCluster) -> TableId {
let sql = r#"CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");"#;
let mut result = cluster.frontend.do_query(&sql, QueryContext::arc()).await;
let output = result.remove(0).unwrap();
assert!(matches!(output.data, OutputData::AffectedRows(0)));

let sql = r#"CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");"#;
let mut result = cluster.frontend.do_query(&sql, QueryContext::arc()).await;
let output = result.remove(0).unwrap();
assert!(matches!(output.data, OutputData::AffectedRows(0)));

let sql = r#"CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");"#;
let mut result = cluster.frontend.do_query(&sql, QueryContext::arc()).await;
let output = result.remove(0).unwrap();
assert!(matches!(output.data, OutputData::AffectedRows(0)));

let table = cluster
.frontend
.catalog_manager()
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy")
.await
.unwrap()
.unwrap();
table.table_info().table_id()
}

async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId {
let sql = format!(
r"
Expand Down Expand Up @@ -843,7 +995,10 @@ async fn find_region_distribution(
}

/// Find region distribution by SQL query
async fn find_region_distribution_by_sql(cluster: &GreptimeDbCluster) -> RegionDistribution {
async fn find_region_distribution_by_sql(
cluster: &GreptimeDbCluster,
table: &str,
) -> RegionDistribution {
let query_ctx = QueryContext::arc();

let OutputData::Stream(stream) = run_sql(
Expand All @@ -853,7 +1008,7 @@ async fn find_region_distribution_by_sql(cluster: &GreptimeDbCluster) -> RegionD
a.greptime_partition_id as region_id
from information_schema.partitions a left join information_schema.region_peers b
on a.greptime_partition_id = b.region_id
where a.table_name='{TEST_TABLE_NAME}' order by datanode_id asc"#
where a.table_name='{table}' order by datanode_id asc"#
),
query_ctx.clone(),
)
Expand Down

0 comments on commit efdb6a8

Please sign in to comment.