diff --git a/src/main.rs b/src/main.rs index 65297ad..f219e59 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::fs::read_to_string; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use anyhow::Result; use databend_driver::new_connection; @@ -11,12 +11,13 @@ use log::info; #[tokio::main] async fn main() -> Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + let dsn = std::env::var("DATABEND_DSN") .map_err(|_| { "DATABEND_DSN is empty, please EXPORT DATABEND_DSN=".to_string() }) .unwrap(); - + println!("DATABEND_DSN:{}", dsn); let iterations = if let Some(num_of_iteration) = std::env::args().nth(1) { num_of_iteration.parse::().expect("invalid number") } else { @@ -130,16 +131,21 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result { .collect::>() .join(" or "); sub_query.push_str(&filter); + let truncate_sql = "truncate table restore_source"; + let store_sql = format!("insert into restore_source {sub_query}"); + conn.exec(&truncate_sql).await?; + conn.exec(&store_sql).await?; // replace these history data into the table (itself). while table being compacted and re-clustered // this may lead to partial and total block update. - let sql = format!("merge into test_order as t - using ({sub_query}) as s + let sql = format!( + "merge into test_order as t + using (select * from restore_source) as s on t.id = s.id and t.insert_time = s.insert_time when matched then update * when not matched then insert * - "); - + " + ); match conn.exec(&sql).await { Ok(_) => { @@ -148,6 +154,12 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result { } Err(e) => { // replace may be failed due to concurrent mutations (compact, purge, recluster) + if e.to_string() + .contains("block_idx < segment_info.blocks.len()") + { + info!("Err. merge-into batch (with conflict) : [{}]. {e}", ids); + panic!() + } info!("Err. merge-into batch (with conflict) : [{}]. {e}", ids); Ok(false) } @@ -176,7 +188,7 @@ async fn exec_replace(dsn: &str, batch_id: u32) -> Result { insert_time2, insert_time3, i - from random_source limit 1000 + from random_source limit 100 ) as s on t.id = s.id and t.insert_time = s.insert_time @@ -249,7 +261,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { let mut rows = conn.query_iter("select count() from test_order").await?; let r = rows.next().await.unwrap().unwrap(); - let count: (u32, ) = r.try_into()?; + let count: (u32,) = r.try_into()?; info!( "CHECK: value of successfully executed merge-into statements: client {}, server {}", success_replace_stmts * 1000, @@ -264,11 +276,11 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { " select count() from (select count() a, id1 from test_order group by id1) - where a != 1000", + where a != 100", ) .await?; let r = rows.next().await.unwrap().unwrap(); - let count: (u32, ) = r.try_into()?; + let count: (u32,) = r.try_into()?; assert_eq!(0, count.0); // show the number of distinct value of id2 @@ -277,7 +289,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { .query_iter("select count(distinct(id2)) from test_order") .await?; let r = rows.next().await.unwrap().unwrap(); - let count: (u32, ) = r.try_into()?; + let count: (u32,) = r.try_into()?; assert_eq!(success_replace_stmts, count.0); info!( @@ -293,7 +305,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { .query_iter("select count() from test_order where id2 != id1 * 7") .await?; let r = rows.next().await.unwrap().unwrap(); - let count: (i64, ) = r.try_into()?; + let count: (i64,) = r.try_into()?; info!("CHECK: value of correlated column"); diff --git a/tests/sql/setup.sql b/tests/sql/setup.sql index a901a24..4546c3a 100644 --- a/tests/sql/setup.sql +++ b/tests/sql/setup.sql @@ -1,5 +1,6 @@ drop table if exists test_order; drop table if exists random_source; +drop table if exists restore_source; create table test_order ( id bigint, id1 bigint, @@ -90,7 +91,49 @@ create table random_source( ) Engine = Random; +create table restore_source( + id bigint not null, + id1 bigint, + id2 bigint, + id3 bigint, + id4 bigint, + id5 bigint, + id6 bigint, + id7 bigint, + + s1 varchar, + s2 varchar, + s3 varchar, + s4 varchar, + s5 varchar, + s6 varchar, + s7 varchar, + s8 varchar, + s9 varchar, + s10 varchar, + s11 varchar, + s12 varchar, + s13 varchar, + + d1 DECIMAL(20, 8), + d2 DECIMAL(20, 8), + d3 DECIMAL(20, 8), + d4 DECIMAL(20, 8), + d5 DECIMAL(20, 8), + d6 DECIMAL(30, 8), + d7 DECIMAL(30, 8), + d8 DECIMAL(30, 8), + d9 DECIMAL(30, 8), + d10 DECIMAL(30, 8), + + insert_time datetime not null, + insert_time1 datetime, + insert_time2 datetime, + insert_time3 datetime, + + i int +); truncate table system.metrics;