Skip to content

Commit

Permalink
feat: implement naive fuzz test for region migration (#4252)
Browse files Browse the repository at this point in the history
* fix(fuzz): adapt for new partition rules

* feat: implement naive fuzz test for region migration

* chore(ci): add ci cfg

* chore: apply suggestions from CR

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Jul 3, 2024
1 parent 705b224 commit 76fac35
Show file tree
Hide file tree
Showing 13 changed files with 670 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ jobs:
timeout-minutes: 60
strategy:
matrix:
target: ["fuzz_failover_mito_regions"]
target: ["fuzz_migrate_mito_regions", "fuzz_failover_mito_regions"]
mode:
- name: "Remote WAL"
minio: true
Expand Down
7 changes: 7 additions & 0 deletions tests-fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,10 @@ path = "targets/failover/fuzz_failover_mito_regions.rs"
test = false
bench = false
doc = false

[[bin]]
name = "fuzz_migrate_mito_regions"
path = "targets/migration/fuzz_migrate_mito_regions.rs"
test = false
bench = false
doc = false
5 changes: 5 additions & 0 deletions tests-fuzz/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ impl From<&CreateTableExpr> for TableContext {
}

impl TableContext {
/// Returns the timestamp column
pub fn timestamp_column(&self) -> Option<Column> {
self.columns.iter().find(|c| c.is_time_index()).cloned()
}

/// Applies the [AlterTableExpr].
pub fn alter(mut self, expr: AlterTableExpr) -> Result<TableContext> {
match expr.alter_options {
Expand Down
64 changes: 48 additions & 16 deletions tests-fuzz/src/generator/create_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use derive_builder::Builder;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::partition::{PartitionBound, PartitionDef};
use rand::seq::SliceRandom;
use rand::Rng;
Expand All @@ -29,7 +30,7 @@ use crate::fake::{random_capitalize_map, MappedGenerator, WordGenerator};
use crate::generator::{ColumnOptionGenerator, ConcreteDataTypeGenerator, Random};
use crate::ir::create_expr::{ColumnOption, CreateDatabaseExprBuilder, CreateTableExprBuilder};
use crate::ir::{
column_options_generator, generate_columns, generate_random_value,
column_options_generator, generate_columns, generate_partition_bounds, generate_random_value,
partible_column_options_generator, primary_key_options_generator, ts_column_options_generator,
Column, ColumnTypeGenerator, CreateDatabaseExpr, CreateTableExpr, Ident,
PartibleColumnTypeGenerator, StringColumnTypeGenerator, TsColumnTypeGenerator,
Expand Down Expand Up @@ -141,20 +142,12 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreateTableExprGenerato
.remove(0);

// Generates partition bounds.
let mut partition_bounds = Vec::with_capacity(self.partition);
for _ in 0..self.partition - 1 {
partition_bounds.push(PartitionBound::Value(generate_random_value(
rng,
&column.column_type,
None,
)));
partition_bounds.sort();
}
partition_bounds.push(PartitionBound::MaxValue);
builder.partition(PartitionDef::new(
vec![name.value.to_string()],
partition_bounds,
));
let partition_def = generate_partition_def(
self.partition,
column.column_type.clone(),
name.clone(),
);
builder.partition(partition_def);
columns.push(column);
}
// Generates the ts column.
Expand Down Expand Up @@ -203,6 +196,45 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreateTableExprGenerato
}
}

fn generate_partition_def(
partitions: usize,
column_type: ConcreteDataType,
column_name: Ident,
) -> PartitionDef {
let bounds = generate_partition_bounds(&column_type, partitions - 1);
let mut partition_bounds = Vec::with_capacity(partitions);

let first_bound = bounds[0].clone();
partition_bounds.push(PartitionBound::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::Lt,
Operand::Value(first_bound),
)));
for bound_idx in 1..bounds.len() {
partition_bounds.push(PartitionBound::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::GtEq,
Operand::Value(bounds[bound_idx - 1].clone()),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::Lt,
Operand::Value(bounds[bound_idx].clone()),
)),
)));
}
let last_bound = bounds.last().unwrap().clone();
partition_bounds.push(PartitionBound::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::GtEq,
Operand::Value(last_bound),
)));

PartitionDef::new(vec![column_name.to_string()], partition_bounds)
}

/// Generate a physical table with 2 columns: ts of TimestampType::Millisecond as time index and val of Float64Type.
#[derive(Builder)]
#[builder(pattern = "owned")]
Expand Down Expand Up @@ -400,7 +432,7 @@ mod tests {
.unwrap();

let serialized = serde_json::to_string(&expr).unwrap();
let expected = r#"{"table_name":{"value":"tEmporIbUS","quote_style":null},"columns":[{"name":{"value":"IMpEdIT","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey","NotNull"]},{"name":{"value":"natuS","quote_style":null},"column_type":{"Timestamp":{"Nanosecond":null}},"options":["TimeIndex"]},{"name":{"value":"ADIPisCI","quote_style":null},"column_type":{"Int16":{}},"options":[{"DefaultValue":{"Int16":4864}}]},{"name":{"value":"EXpEdita","quote_style":null},"column_type":{"Int64":{}},"options":["PrimaryKey"]},{"name":{"value":"cUlpA","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"MOLeStIAs","quote_style":null},"column_type":{"Boolean":null},"options":["Null"]},{"name":{"value":"cUmquE","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.21569687}}]},{"name":{"value":"toTAm","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"deBitIs","quote_style":null},"column_type":{"Float32":{}},"options":["Null"]},{"name":{"value":"QUi","quote_style":null},"column_type":{"Int64":{}},"options":["Null"]}],"if_not_exists":true,"partition":{"partition_columns":["IMpEdIT"],"partition_bounds":[{"Value":{"String":"򟘲"}},{"Value":{"String":"򴥫"}},"MaxValue"]},"engine":"mito2","options":{},"primary_keys":[0,3]}"#;
let expected = r#"{"table_name":{"value":"animI","quote_style":null},"columns":[{"name":{"value":"IMpEdIT","quote_style":null},"column_type":{"Float64":{}},"options":["PrimaryKey","NotNull"]},{"name":{"value":"natuS","quote_style":null},"column_type":{"Timestamp":{"Millisecond":null}},"options":["TimeIndex"]},{"name":{"value":"ADIPisCI","quote_style":null},"column_type":{"Float64":{}},"options":["Null"]},{"name":{"value":"EXpEdita","quote_style":null},"column_type":{"Int16":{}},"options":[{"DefaultValue":{"Int16":4864}}]},{"name":{"value":"cUlpA","quote_style":null},"column_type":{"Int64":{}},"options":["PrimaryKey"]},{"name":{"value":"MOLeStIAs","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"cUmquE","quote_style":null},"column_type":{"Boolean":null},"options":["Null"]},{"name":{"value":"toTAm","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.21569687}}]},{"name":{"value":"deBitIs","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"QUi","quote_style":null},"column_type":{"Float32":{}},"options":["Null"]}],"if_not_exists":true,"partition":{"partition_columns":["IMpEdIT"],"partition_bounds":[{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"Lt","rhs":{"Value":{"Float64":5.992310449541053e307}}}},{"Expr":{"lhs":{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"GtEq","rhs":{"Value":{"Float64":5.992310449541053e307}}}},"op":"And","rhs":{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"Lt","rhs":{"Value":{"Float64":1.1984620899082105e308}}}}}},{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"GtEq","rhs":{"Value":{"Float64":1.1984620899082105e308}}}}]},"engine":"mito2","options":{},"primary_keys":[0,4]}"#;
assert_eq!(expected, serialized);
}

Expand Down
85 changes: 75 additions & 10 deletions tests-fuzz/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ pub(crate) mod select_expr;

use core::fmt;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

pub use alter_expr::AlterTableExpr;
use common_time::timestamp::TimeUnit;
use common_time::{Date, DateTime, Timestamp};
pub use create_expr::{CreateDatabaseExpr, CreateTableExpr};
use datatypes::data_type::ConcreteDataType;
Expand Down Expand Up @@ -65,8 +66,6 @@ lazy_static! {
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
];
pub static ref STRING_DATA_TYPES: Vec<ConcreteDataType> =
vec![ConcreteDataType::string_datatype()];
Expand Down Expand Up @@ -102,6 +101,43 @@ pub struct MySQLTsColumnTypeGenerator;
pub struct PartibleColumnTypeGenerator;
pub struct StringColumnTypeGenerator;

/// FIXME(weny): Waits for https://github.com/GreptimeTeam/greptimedb/issues/4247
macro_rules! generate_values {
($data_type:ty, $bounds:expr) => {{
let base = 0 as $data_type;
let step = <$data_type>::MAX / ($bounds as $data_type + 1 as $data_type) as $data_type;
(1..=$bounds)
.map(|i| Value::from(base + step * i as $data_type as $data_type))
.collect::<Vec<Value>>()
}};
}

/// Generates partition bounds.
pub fn generate_partition_bounds(datatype: &ConcreteDataType, bounds: usize) -> Vec<Value> {
match datatype {
ConcreteDataType::Int16(_) => generate_values!(i16, bounds),
ConcreteDataType::Int32(_) => generate_values!(i32, bounds),
ConcreteDataType::Int64(_) => generate_values!(i64, bounds),
ConcreteDataType::Float32(_) => generate_values!(f32, bounds),
ConcreteDataType::Float64(_) => generate_values!(f64, bounds),
ConcreteDataType::String(_) => {
let base = b'A';
let range = b'z' - b'A';
let step = range / (bounds as u8 + 1);
(1..=bounds)
.map(|i| {
Value::from(
char::from(base + step * i as u8)
.escape_default()
.to_string(),
)
})
.collect()
}
_ => unimplemented!("unsupported type: {datatype}"),
}
}

/// Generates a random [Value].
pub fn generate_random_value<R: Rng>(
rng: &mut R,
Expand All @@ -128,15 +164,19 @@ pub fn generate_random_value<R: Rng>(

/// Generate monotonically increasing timestamps for MySQL.
pub fn generate_unique_timestamp_for_mysql<R: Rng>(base: i64) -> TsValueGenerator<R> {
let base = Arc::new(AtomicI64::new(base));
let base = Timestamp::new_millisecond(base);
let clock = Arc::new(Mutex::new(base));

Box::new(move |_rng, ts_type| -> Value {
let value = base.fetch_add(1, Ordering::Relaxed);
let mut clock = clock.lock().unwrap();
let ts = clock.add_duration(Duration::from_secs(1)).unwrap();
*clock = ts;

let v = match ts_type {
TimestampType::Second(_) => Timestamp::new_second(1 + value),
TimestampType::Millisecond(_) => Timestamp::new_millisecond(1000 + value),
TimestampType::Microsecond(_) => Timestamp::new_microsecond(1_000_000 + value),
TimestampType::Nanosecond(_) => Timestamp::new_nanosecond(1_000_000_000 + value),
TimestampType::Second(_) => ts.convert_to(TimeUnit::Second).unwrap(),
TimestampType::Millisecond(_) => ts.convert_to(TimeUnit::Millisecond).unwrap(),
TimestampType::Microsecond(_) => ts.convert_to(TimeUnit::Microsecond).unwrap(),
TimestampType::Nanosecond(_) => ts.convert_to(TimeUnit::Nanosecond).unwrap(),
};
Value::from(v)
})
Expand Down Expand Up @@ -496,6 +536,31 @@ pub fn replace_default(
new_rows
}

/// Sorts a vector of rows based on the values in the specified primary key columns.
pub fn sort_by_primary_keys(rows: &mut [RowValues], primary_keys_idx: Vec<usize>) {
rows.sort_by(|a, b| {
let a_keys: Vec<_> = primary_keys_idx.iter().map(|&i| &a[i]).collect();
let b_keys: Vec<_> = primary_keys_idx.iter().map(|&i| &b[i]).collect();
for (a_key, b_key) in a_keys.iter().zip(b_keys.iter()) {
match a_key.cmp(b_key) {
Some(std::cmp::Ordering::Equal) => continue,
non_eq => return non_eq.unwrap(),
}
}
std::cmp::Ordering::Equal
});
}

/// Formats a slice of columns into a comma-separated string of column names.
pub fn format_columns(columns: &[Column]) -> String {
columns
.iter()
.map(|c| c.name.to_string())
.collect::<Vec<_>>()
.join(", ")
.to_string()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
39 changes: 39 additions & 0 deletions tests-fuzz/src/ir/insert_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,45 @@ pub struct InsertIntoExpr {
pub values_list: Vec<RowValues>,
}

impl InsertIntoExpr {
/// Returns the timestamp column
pub fn timestamp_column(&self) -> Option<Column> {
self.columns.iter().find(|c| c.is_time_index()).cloned()
}

/// Returns index of the timestamp column
pub fn timestamp_column_idx(&self) -> Option<usize> {
self.columns
.iter()
.enumerate()
.find_map(|(idx, c)| if c.is_time_index() { Some(idx) } else { None })
}

/// Returns a vector of columns that are primary keys or time indices.
pub fn primary_key_columns(&self) -> Vec<Column> {
self.columns
.iter()
.filter(|c| c.is_primary_key() || c.is_time_index())
.cloned()
.collect::<Vec<_>>()
}

/// Returns the indices of columns that are primary keys or time indices.
pub fn primary_key_column_idx(&self) -> Vec<usize> {
self.columns
.iter()
.enumerate()
.filter_map(|(i, c)| {
if c.is_primary_key() || c.is_time_index() {
Some(i)
} else {
None
}
})
.collect::<Vec<_>>()
}
}

pub type RowValues = Vec<RowValue>;

#[derive(PartialEq, PartialOrd, Clone)]
Expand Down
Loading

0 comments on commit 76fac35

Please sign in to comment.