Skip to content

Commit

Permalink
fixup: Simplify add_bounds
Browse files Browse the repository at this point in the history
Seeing as we don't support compound filters for events, we don't need to
go to the trouble of using CTEs just yet.
  • Loading branch information
amnn committed Aug 28, 2024
1 parent c0b6fab commit 9cf7d94
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 142 deletions.
21 changes: 1 addition & 20 deletions crates/sui-graphql-rpc/src/raw_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use diesel::{
};

use crate::data::DieselBackend;
use std::fmt::{Display, Write};

pub(crate) type RawSqlQuery = BoxedSqlQuery<'static, DieselBackend, SqlQuery>;

Expand All @@ -25,8 +24,6 @@ pub(crate) type RawSqlQuery = BoxedSqlQuery<'static, DieselBackend, SqlQuery>;
/// increases exposure to SQL injection attacks.
#[derive(Clone)]
pub(crate) struct RawQuery {
/// The `WITH` clause of the query.
with: Option<String>,
/// The `SELECT` and `FROM` clauses of the query.
select: String,
/// The `WHERE` clause of the query.
Expand All @@ -45,7 +42,6 @@ impl RawQuery {
/// Constructs a new `RawQuery` with the given `SELECT` clause and binds.
pub(crate) fn new(select: impl Into<String>, binds: Vec<String>) -> Self {
Self {
with: None,
select: select.into(),
where_: None,
order_by: Vec::new(),
Expand Down Expand Up @@ -76,17 +72,6 @@ impl RawQuery {
self
}

// Adds a `WITH` clause to the query.
pub(crate) fn with<T: Display>(mut self, ctes: T) -> Self {
if let Some(with) = &mut self.with {
// SAFETY: write! to String always succeeds
write!(with, ", {ctes}").unwrap();
} else {
self.with = Some(ctes.to_string())
}
self
}

/// Adds an `ORDER BY` clause to the query.
pub(crate) fn order_by<T: ToString>(mut self, order: T) -> Self {
self.order_by.push(order.to_string());
Expand Down Expand Up @@ -114,11 +99,7 @@ impl RawQuery {
/// function is not intended to be called directly, and instead should be used through the
/// `query!` macro.
pub(crate) fn finish(self) -> (String, Vec<String>) {
let mut select = if let Some(with) = self.with {
"WITH ".to_owned() + &with + " " + &self.select
} else {
self.select
};
let mut select = self.select;

if let Some(where_) = self.where_ {
select.push_str(" WHERE ");
Expand Down
150 changes: 29 additions & 121 deletions crates/sui-graphql-rpc/src/types/event/lookups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,138 +113,46 @@ pub(crate) fn select_emit_module(
}
}

/// Given a `RawQuery` representing a query for events, adds CTEs and corresponding filters to
/// constrain the query. By default, if neither a transaction digest nor an `after` cursor are
/// specified, then the events query will only have an upper bound, determined by the `before`
/// cursor or the current latest `tx_sequence_number`. If a transaction digest is specified, add a
/// `tx_eq` CTE that the lower and upper bounds will compare to. This also means an additional check
/// that the `tx_eq` CTE returns a non-empty result.
///
/// `tx_hi` represents the current latest `tx_sequence_number`.
/// Adds filters to bound an events query from above and below based on cursors and filters. The
/// query will always at least be bounded by `tx_hi`, the current exclusive upperbound on
/// transaction sequence numbers, based on the consistency cursor.
pub(crate) fn add_bounds(
mut query: RawQuery,
tx_digest_filter: &Option<Digest>,
page: &Page<Cursor>,
tx_hi: u64,
tx_hi: i64,
) -> RawQuery {
let mut ctes = vec![];

let mut has_digest_cte = false;
if let Some(digest) = tx_digest_filter {
ctes.push(format!(
r#"
tx_eq AS (
SELECT tx_sequence_number AS eq
FROM tx_digests
WHERE tx_digest = {}
query = filter!(query, format!("tx_sequence_number < {}", tx_hi));

if let Some(after) = page.after() {
query = filter!(
query,
format!(
"ROW(tx_sequence_number, event_sequence_number) >= ({}, {})",
after.tx, after.e
)
"#,
bytea_literal(digest.as_slice())
));
has_digest_cte = true;
};

let mut has_select_lo = false;
let select_lo = match (page.after(), has_digest_cte) {
(Some(after), _) => {
// If both a digest and after cursor are present, then select the larger tx sequence
// number
let select_tx_lo = if has_digest_cte {
format!("SELECT GREATEST({}, (SELECT eq FROM tx_eq))", after.tx)
} else {
format!("SELECT {}", after.tx)
};
// If `tx_lo` matches `after` cursor, then we should use the `after` cursor's event
// sequence number
let select_ev_lo = format!(
"SELECT CASE WHEN (SELECT lo FROM tx_lo) = {after_tx} THEN {after_ev} ELSE 0 END",
after_tx = after.tx,
after_ev = after.e
);
Some(format!(
r#"
tx_lo AS ({select_tx_lo} AS lo),
ev_lo AS ({select_ev_lo} AS lo)
"#
))
}
// No `after` cursor, has digest
(None, true) => Some(
r#"
tx_lo AS (SELECT eq AS lo FROM tx_eq),
ev_lo AS (SELECT 0 AS lo)
"#
.to_string(),
),
// Neither, don't need lower bounds
(None, false) => None,
};

if let Some(select_lo) = select_lo {
ctes.push(select_lo);
has_select_lo = true;
};

let (select_tx_hi, select_ev_hi) = match page.before() {
Some(before) => {
let tx_hi = if has_digest_cte {
// Select the smaller of the digest or before cursor, to exclude txs between the two,
// if any
format!("SELECT LEAST({}, (SELECT eq FROM tx_eq))", before.tx)
} else {
format!("SELECT {}", before.tx)
};
let ev_hi = format!(
// Check for equality so that if the digest and before cursor are the same tx, we
// don't miss the before cursor's event sequence number
"SELECT CASE WHEN (SELECT hi FROM tx_hi) = {before_tx} THEN {before_ev} ELSE {u64_max} END",
before_tx=before.tx,
u64_max=u64::MAX,
before_ev=before.e
);
(tx_hi, ev_hi)
}
None => (
if has_digest_cte {
format!("SELECT LEAST({}, (SELECT eq FROM tx_eq))", tx_hi)
} else {
format!("SELECT {}", tx_hi)
},
format!("SELECT {}", u64::MAX),
),
};

ctes.push(format!(
r#"
tx_hi AS ({select_tx_hi} AS hi),
ev_hi AS ({select_ev_hi} AS hi)
"#,
));

for cte in ctes {
query = query.with(cte);
);
}

// This is needed to make sure that if a transaction digest is specified, the corresponding
// `tx_eq` must yield a non-empty result. Otherwise, the CTE setup will fallback to defaults
// and we will return an unexpected response.
if has_digest_cte {
query = filter!(query, "EXISTS (SELECT 1 FROM tx_eq)");
if let Some(before) = page.before() {
query = filter!(
query,
format!(
"ROW(tx_sequence_number, event_sequence_number) <= ({}, {})",
before.tx, before.e
)
);
}

if has_select_lo {
query = query
.filter("(SELECT lo FROM tx_lo) <= tx_sequence_number")
.filter(
"(ROW(tx_sequence_number, event_sequence_number) >= \
((SELECT lo FROM tx_lo), (SELECT lo FROM ev_lo)))",
);
if let Some(digest) = tx_digest_filter {
query = filter!(
query,
format!(
"tx_sequence_number = (SELECT tx_sequence_number FROM tx_digests WHERE tx_digest = {})",
bytea_literal(digest.as_slice()),
)
);
}

query
.filter("tx_sequence_number <= (SELECT hi FROM tx_hi)")
.filter(
"(ROW(tx_sequence_number, event_sequence_number) <= \
((SELECT hi FROM tx_hi), (SELECT hi FROM ev_hi)))",
)
}
1 change: 0 additions & 1 deletion crates/sui-graphql-rpc/src/types/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl Event {
dsl::checkpoints.select(dsl::network_total_transactions)
.filter(dsl::sequence_number.eq(checkpoint_viewed_at as i64))
})?;
let tx_hi = tx_hi as u64 - 1;

let (prev, next, mut events): (bool, bool, Vec<StoredEvent>) =
if let Some(filter_query) = query_constraint {
Expand Down

0 comments on commit 9cf7d94

Please sign in to comment.