Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modify #1

Draft
wants to merge 1 commit into
base: test-merge-into
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fs::read_to_string;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use anyhow::Result;
use databend_driver::new_connection;
Expand All @@ -11,12 +11,13 @@ use log::info;
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();

let dsn = std::env::var("DATABEND_DSN")
.map_err(|_| {
"DATABEND_DSN is empty, please EXPORT DATABEND_DSN=<your-databend-dsn>".to_string()
})
.unwrap();

println!("DATABEND_DSN:{}", dsn);
let iterations = if let Some(num_of_iteration) = std::env::args().nth(1) {
num_of_iteration.parse::<u32>().expect("invalid number")
} else {
Expand Down Expand Up @@ -130,16 +131,21 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result<bool> {
.collect::<Vec<_>>()
.join(" or ");
sub_query.push_str(&filter);
let truncate_sql = "truncate table restore_source";
let store_sql = format!("insert into restore_source {sub_query}");

conn.exec(&truncate_sql).await?;
conn.exec(&store_sql).await?;
// replace these history data into the table (itself). while table being compacted and re-clustered
// this may lead to partial and total block update.
let sql = format!("merge into test_order as t
using ({sub_query}) as s
let sql = format!(
"merge into test_order as t
using (select * from restore_source) as s
on t.id = s.id and t.insert_time = s.insert_time
when matched then update *
when not matched then insert *
");

"
);

match conn.exec(&sql).await {
Ok(_) => {
Expand All @@ -148,6 +154,12 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result<bool> {
}
Err(e) => {
// replace may be failed due to concurrent mutations (compact, purge, recluster)
if e.to_string()
.contains("block_idx < segment_info.blocks.len()")
{
info!("Err. merge-into batch (with conflict) : [{}]. {e}", ids);
panic!()
}
info!("Err. merge-into batch (with conflict) : [{}]. {e}", ids);
Ok(false)
}
Expand Down Expand Up @@ -176,7 +188,7 @@ async fn exec_replace(dsn: &str, batch_id: u32) -> Result<bool> {
insert_time2,
insert_time3,
i
from random_source limit 1000
from random_source limit 100
) as s

on t.id = s.id and t.insert_time = s.insert_time
Expand Down Expand Up @@ -249,7 +261,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> {

let mut rows = conn.query_iter("select count() from test_order").await?;
let r = rows.next().await.unwrap().unwrap();
let count: (u32, ) = r.try_into()?;
let count: (u32,) = r.try_into()?;
info!(
"CHECK: value of successfully executed merge-into statements: client {}, server {}",
success_replace_stmts * 1000,
Expand All @@ -264,11 +276,11 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> {
"
select count() from
(select count() a, id1 from test_order group by id1)
where a != 1000",
where a != 100",
)
.await?;
let r = rows.next().await.unwrap().unwrap();
let count: (u32, ) = r.try_into()?;
let count: (u32,) = r.try_into()?;
assert_eq!(0, count.0);

// show the number of distinct value of id2
Expand All @@ -277,7 +289,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> {
.query_iter("select count(distinct(id2)) from test_order")
.await?;
let r = rows.next().await.unwrap().unwrap();
let count: (u32, ) = r.try_into()?;
let count: (u32,) = r.try_into()?;

assert_eq!(success_replace_stmts, count.0);
info!(
Expand All @@ -293,7 +305,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> {
.query_iter("select count() from test_order where id2 != id1 * 7")
.await?;
let r = rows.next().await.unwrap().unwrap();
let count: (i64, ) = r.try_into()?;
let count: (i64,) = r.try_into()?;

info!("CHECK: value of correlated column");

Expand Down
43 changes: 43 additions & 0 deletions tests/sql/setup.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
drop table if exists test_order;
drop table if exists random_source;
drop table if exists restore_source;
create table test_order (
id bigint,
id1 bigint,
Expand Down Expand Up @@ -90,7 +91,49 @@ create table random_source(
) Engine = Random;


create table restore_source(
id bigint not null,
id1 bigint,
id2 bigint,
id3 bigint,
id4 bigint,
id5 bigint,
id6 bigint,
id7 bigint,

s1 varchar,
s2 varchar,
s3 varchar,
s4 varchar,
s5 varchar,
s6 varchar,
s7 varchar,
s8 varchar,
s9 varchar,
s10 varchar,
s11 varchar,
s12 varchar,
s13 varchar,

d1 DECIMAL(20, 8),
d2 DECIMAL(20, 8),
d3 DECIMAL(20, 8),
d4 DECIMAL(20, 8),
d5 DECIMAL(20, 8),
d6 DECIMAL(30, 8),
d7 DECIMAL(30, 8),
d8 DECIMAL(30, 8),
d9 DECIMAL(30, 8),
d10 DECIMAL(30, 8),

insert_time datetime not null,
insert_time1 datetime,
insert_time2 datetime,
insert_time3 datetime,

i int

);

truncate table system.metrics;