Skip to content

Commit

Permalink
change pkey on splits (#5426)
Browse files Browse the repository at this point in the history
* change pkey on splits
  • Loading branch information
trinity-1686a authored Sep 13, 2024
1 parent 54473bb commit 2d4f5a6
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS splits_index_uid_idx ON splits USING HASH(index_uid);
ALTER TABLE splits DROP CONSTRAINT splits_pkey, ADD PRIMARY KEY (split_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE splits DROP CONSTRAINT splits_pkey, ADD PRIMARY KEY (index_uid, split_id);
DROP INDEX IF EXISTS splits_index_uid_idx;
44 changes: 20 additions & 24 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,9 +870,7 @@ impl MetastoreService for PostgresqlMetastore {
) -> MetastoreResult<MetastoreServiceStream<ListSplitsResponse>> {
let list_splits_query = request.deserialize_list_splits_query()?;
let mut sql_query_builder = Query::select();
sql_query_builder
.column((Splits::Table, Asterisk))
.from(Splits::Table);
sql_query_builder.column(Asterisk).from(Splits::Table);
append_query_filters(&mut sql_query_builder, &list_splits_query);

let (sql_query, values) = sql_query_builder.build_sqlx(PostgresQueryBuilder);
Expand Down Expand Up @@ -1929,7 +1927,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Staged')"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Staged')"#
)
);

Expand All @@ -1943,7 +1941,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Published')"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published')"#
)
);

Expand All @@ -1956,7 +1954,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Published', 'MarkedForDeletion')"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published', 'MarkedForDeletion')"#
)
);

Expand All @@ -1968,7 +1966,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51)"#
)
);

Expand All @@ -1980,7 +1978,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "create_timestamp" <= TO_TIMESTAMP(55)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "create_timestamp" <= TO_TIMESTAMP(55)"#
)
);

Expand All @@ -1995,7 +1993,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"#
)
);

Expand All @@ -2008,7 +2006,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "maturity_timestamp" > TO_TIMESTAMP(55)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "maturity_timestamp" > TO_TIMESTAMP(55)"#
)
);

Expand All @@ -2020,7 +2018,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "delete_opstamp" >= 4"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "delete_opstamp" >= 4"#
)
);

Expand All @@ -2032,7 +2030,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"#
)
);

Expand All @@ -2044,7 +2042,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"#
)
);

Expand All @@ -2061,7 +2059,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))"#
)
);

Expand All @@ -2074,22 +2072,20 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "split_id" ASC OFFSET 4"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "split_id" ASC OFFSET 4"#
)
);

let mut select_statement = Query::select();
let sql = select_statement
.column((Splits::Table, Asterisk))
.from(Splits::Table);
let sql = select_statement.column(Asterisk).from(Splits::Table);

let query = ListSplitsQuery::for_index(index_uid.clone()).sort_by_index_uid();
append_query_filters(sql, &query);

assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT "splits".* FROM "splits" JOIN "indexes" ON "splits"."index_uid" = "indexes"."index_uid" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"#
)
);
}
Expand All @@ -2107,7 +2103,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"#
)
);

Expand All @@ -2121,7 +2117,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"#
)
);

Expand All @@ -2135,7 +2131,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"#
)
);

Expand All @@ -2152,7 +2148,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"#
)
);

Expand All @@ -2167,7 +2163,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}', '{index_uid_2}')"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}', '{index_uid_2}')"#
)
);
}
Expand Down
28 changes: 4 additions & 24 deletions quickwit/quickwit-metastore/src/metastore/postgres/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use std::time::Duration;

use quickwit_common::uri::Uri;
use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
use sea_query::{any, Expr, Func, JoinType, Order, SelectStatement};
use sea_query::{any, Expr, Func, Order, SelectStatement};
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use sqlx::{ConnectOptions, Postgres};
use tracing::error;
use tracing::log::LevelFilter;

use super::model::{Indexes, Splits, ToTimestampFunc};
use super::model::{Splits, ToTimestampFunc};
use super::pool::TrackedPool;
use super::tags::generate_sql_condition;
use crate::metastore::{FilterRange, SortBy};
Expand Down Expand Up @@ -104,7 +104,7 @@ pub(super) fn append_range_filters<V: Display>(
pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplitsQuery) {
// Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list.

sql.cond_where(Expr::col((Splits::Table, Splits::IndexUid)).is_in(&query.index_uids));
sql.cond_where(Expr::col(Splits::IndexUid).is_in(&query.index_uids));

if let Some(node_id) = &query.node_id {
sql.cond_where(Expr::col(Splits::NodeId).eq(node_id));
Expand Down Expand Up @@ -195,27 +195,7 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits
);
}
SortBy::IndexUid => {
// this order by can be fairly costly,
// from testing, adding a join here was way faster, because we do an index-only scan on
// indexes.index_uid, nested-loop merged with a bitmap index scan on splits.index_uid,
// filter for our conditions, and just take the first N results. This is guaranteed to
// return correct result because indexes.index_uid is a non-null foreign key
//
// We also need to do .column((Splits::Table, Asterisk)) from the caller side, to not
// return unexpected columns
//
// On the other hand, without join, we do a seq scan on splits, sort everything, and
// truncate.
//
// Or we could just add a btree index to splits.index_uid. That might be the better
// long term solution.
sql.join(
JoinType::Join,
Indexes::Table,
Expr::col((Splits::Table, Splits::IndexUid))
.equals((Indexes::Table, Indexes::IndexUid)),
)
.order_by(Splits::IndexUid, Order::Asc);
sql.order_by(Splits::IndexUid, Order::Asc);
}
SortBy::None => (),
}
Expand Down

0 comments on commit 2d4f5a6

Please sign in to comment.