diff --git a/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp b/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp index a5130682191d9..959c49ae60421 100644 --- a/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp +++ b/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp @@ -831,7 +831,14 @@ Response: { "repr": "u64" } }, - "value": null + "value": { + "contents": { + "json": { + "id": "0x844185d7b145f503838a1d509845a40ee249534f723dd2f003d9efdfc581000d", + "count": "1" + } + } + } } } ] @@ -892,7 +899,14 @@ Response: { "repr": "u64" } }, - "value": null + "value": { + "contents": { + "json": { + "id": "0x844185d7b145f503838a1d509845a40ee249534f723dd2f003d9efdfc581000d", + "count": "1" + } + } + } } } ] @@ -928,7 +942,17 @@ task 34 'run-graphql'. lines 497-528: Response: { "data": { "parent_version_4": { - "dfAtParentVersion4_outside_range": null + "dfAtParentVersion4_outside_range": { + "name": { + "bcs": "A2RmMQ==", + "type": { + "repr": "0x0000000000000000000000000000000000000000000000000000000000000001::string::String" + } + }, + "value": { + "json": "df1" + } + } }, "parent_version_6": { "dfAtParentVersion6": null diff --git a/crates/sui-graphql-rpc/src/types/dynamic_field.rs b/crates/sui-graphql-rpc/src/types/dynamic_field.rs index 46b2cee59051c..df94455e0dd4b 100644 --- a/crates/sui-graphql-rpc/src/types/dynamic_field.rs +++ b/crates/sui-graphql-rpc/src/types/dynamic_field.rs @@ -10,7 +10,7 @@ use sui_types::dynamic_field::{derive_dynamic_field_id, DynamicFieldInfo, Dynami use super::available_range::AvailableRange; use super::cursor::{Page, Target}; -use super::object::{self, deserialize_move_struct, Object, ObjectKind, ObjectLookup}; +use super::object::{self, deserialize_move_struct, Object, ObjectKind}; use super::type_filter::ExactTypeFilter; use super::{ base64::Base64, move_object::MoveObject, move_value::MoveValue, sui_address::SuiAddress, @@ -174,9 +174,10 @@ impl DynamicField { let super_ = MoveObject::query( ctx, SuiAddress::from(field_id), - ObjectLookup::LatestAt { - parent_version, - checkpoint_viewed_at, + if let Some(parent_version) = parent_version { + Object::under_parent(parent_version, checkpoint_viewed_at) + } else { + Object::latest_at(checkpoint_viewed_at) }, ) .await?; diff --git a/crates/sui-graphql-rpc/src/types/object.rs b/crates/sui-graphql-rpc/src/types/object.rs index 7aa1734bdb044..6217580585af3 100644 --- a/crates/sui-graphql-rpc/src/types/object.rs +++ b/crates/sui-graphql-rpc/src/types/object.rs @@ -33,12 +33,12 @@ use crate::{filter, or_filter}; use async_graphql::connection::{CursorType, Edge}; use async_graphql::dataloader::Loader; use async_graphql::{connection::Connection, *}; -use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl}; +use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper}; use move_core_types::annotated_value::{MoveStruct, MoveTypeLayout}; use move_core_types::language_storage::StructTag; use serde::{Deserialize, Serialize}; use sui_indexer::models::objects::{StoredDeletedHistoryObject, StoredHistoryObject}; -use sui_indexer::schema::objects_history; +use sui_indexer::schema::{objects_history, objects_version}; use sui_indexer::types::ObjectStatus as NativeObjectStatus; use sui_indexer::types::OwnerType; use sui_types::object::bounded_visitor::BoundedVisitor; @@ -170,9 +170,14 @@ pub(crate) struct AddressOwner { pub(crate) enum ObjectLookup { LatestAt { - /// The parent version to be used as an optional upper bound for the query. Look for the - /// latest version of a child object that is less than or equal to this upper bound. - parent_version: Option, + /// The checkpoint sequence number at which this was viewed at + checkpoint_viewed_at: u64, + }, + + UnderParent { + /// The parent version to be used as an upper bound for the query. Look for the latest + /// version of a child object whose version is less than or equal to this upper bound. + parent_version: u64, /// The checkpoint sequence number at which this was viewed at checkpoint_viewed_at: u64, }, @@ -270,13 +275,21 @@ struct HistoricalKey { checkpoint_viewed_at: u64, } -/// DataLoader key for fetching the latest version of an `Object` as of a consistency cursor. The -/// query can optionally be bounded by a `parent_version` which imposes an additional requirement -/// that the object's version is bounded above by the parent version. +/// DataLoader key for fetching the latest version of an object whose parent object has version +/// `parent_version`, as of `checkpoint_viewed_at`. This look-up can fail to find a valid object if +/// the key is not self-consistent, for example if the `parent_version` is set to a higher version +/// than the object's actual parent as of `checkpoint_viewed_at`. +#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] +struct ParentVersionKey { + id: SuiAddress, + parent_version: u64, + checkpoint_viewed_at: u64, +} + +/// DataLoader key for fetching the latest version of an `Object` as of a consistency cursor. #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] struct LatestAtKey { id: SuiAddress, - parent_version: Option, checkpoint_viewed_at: u64, } @@ -776,7 +789,6 @@ impl Object { /// Look-up the latest version of the object as of a given checkpoint. pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup { ObjectLookup::LatestAt { - parent_version: None, checkpoint_viewed_at, } } @@ -784,8 +796,8 @@ impl Object { /// Look-up the latest version of an object whose version is less than or equal to its parent's /// version, as of a given checkpoint. pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup { - ObjectLookup::LatestAt { - parent_version: Some(parent_version), + ObjectLookup::UnderParent { + parent_version, checkpoint_viewed_at, } } @@ -818,18 +830,30 @@ impl Object { }) .await } - ObjectLookup::LatestAt { + + ObjectLookup::UnderParent { parent_version, checkpoint_viewed_at, } => { loader - .load_one(LatestAtKey { + .load_one(ParentVersionKey { id, parent_version, checkpoint_viewed_at, }) .await } + + ObjectLookup::LatestAt { + checkpoint_viewed_at, + } => { + loader + .load_one(LatestAtKey { + id, + checkpoint_viewed_at, + }) + .await + } } } @@ -1124,7 +1148,8 @@ impl Loader for Db { type Error = Error; async fn load(&self, keys: &[HistoricalKey]) -> Result, Error> { - use objects_history::dsl; + use objects_history::dsl as h; + use objects_version::dsl as v; let id_versions: BTreeSet<_> = keys .iter() @@ -1134,12 +1159,19 @@ impl Loader for Db { let objects: Vec = self .execute(move |conn| { conn.results(move || { - let mut query = dsl::objects_history.into_boxed(); + let mut query = h::objects_history + .inner_join( + v::objects_version.on(v::cp_sequence_number + .eq(h::checkpoint_sequence_number) + .and(v::object_id.eq(h::object_id)) + .and(v::object_version.eq(h::object_version))), + ) + .select(StoredHistoryObject::as_select()) + .into_boxed(); - // TODO: Speed up using an `obj_version` table. for (id, version) in id_versions.iter().cloned() { - query = query - .or_filter(dsl::object_id.eq(id).and(dsl::object_version.eq(version))); + query = + query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version))); } query @@ -1177,17 +1209,20 @@ impl Loader for Db { } #[async_trait::async_trait] -impl Loader for Db { +impl Loader for Db { type Value = Object; type Error = Error; - async fn load(&self, keys: &[LatestAtKey]) -> Result, Error> { + async fn load( + &self, + keys: &[ParentVersionKey], + ) -> Result, Error> { // Group keys by checkpoint viewed at and parent version -- we'll issue a separate query for // each group. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] struct GroupKey { checkpoint_viewed_at: u64, - parent_version: Option, + parent_version: u64, } let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new(); @@ -1200,50 +1235,40 @@ impl Loader for Db { keys_by_cursor_and_parent_version .entry(group_key) .or_default() - .insert(key.id); + .insert(key.id.into_vec()); } // Issue concurrent reads for each group of keys. let futures = keys_by_cursor_and_parent_version .into_iter() .map(|(group_key, ids)| { - self.execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, group_key.checkpoint_viewed_at)? - else { - return Ok::, diesel::result::Error>( - vec![], - ); - }; - - let filter = ObjectFilter { - object_ids: Some(ids.iter().cloned().collect()), - ..Default::default() - }; - - // TODO: Implement queries that use a parent version bound using an - // `obj_version` table. - let apply_parent_bound = |q: RawQuery| { - if let Some(parent_version) = group_key.parent_version { - filter!(q, format!("object_version <= {parent_version}")) - } else { - q - } - }; - - Ok(conn - .results(move || { - build_objects_query( - View::Consistent, - range, - &Page::bounded(ids.len() as u64), - |q| apply_parent_bound(filter.apply(q)), - apply_parent_bound, + self.execute(move |conn| { + let stored: Vec = conn.results(move || { + use objects_history::dsl as h; + use objects_version::dsl as v; + + h::objects_history + .inner_join( + v::objects_version.on(v::cp_sequence_number + .eq(h::checkpoint_sequence_number) + .and(v::object_id.eq(h::object_id)) + .and(v::object_version.eq(h::object_version))), ) + .select(StoredHistoryObject::as_select()) + .filter(v::object_id.eq_any(ids.iter().cloned())) + .filter(v::object_version.le(group_key.parent_version as i64)) + .distinct_on(v::object_id) + .order_by(v::object_id) + .then_order_by(v::object_version.desc()) .into_boxed() - })? - .into_iter() - .map(|r| (group_key, r)) - .collect()) + })?; + + Ok::<_, diesel::result::Error>( + stored + .into_iter() + .map(|stored| (group_key, stored)) + .collect::>(), + ) }) }); @@ -1255,10 +1280,16 @@ impl Loader for Db { for (group_key, stored) in group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))? { + // This particular object is invalid -- it didn't exist at the checkpoint we are + // viewing at. + if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 { + continue; + } + let object = Object::try_from_stored_history_object(stored, group_key.checkpoint_viewed_at)?; - let key = LatestAtKey { + let key = ParentVersionKey { id: object.address, checkpoint_viewed_at: group_key.checkpoint_viewed_at, parent_version: group_key.parent_version, @@ -1272,6 +1303,80 @@ impl Loader for Db { } } +#[async_trait::async_trait] +impl Loader for Db { + type Value = Object; + type Error = Error; + + async fn load(&self, keys: &[LatestAtKey]) -> Result, Error> { + // Group keys by checkpoint viewed at -- we'll issue a separate query for each group. + let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new(); + + for key in keys { + keys_by_cursor_and_parent_version + .entry(key.checkpoint_viewed_at) + .or_default() + .insert(key.id); + } + + // Issue concurrent reads for each group of keys. + let futures = + keys_by_cursor_and_parent_version + .into_iter() + .map(|(checkpoint_viewed_at, ids)| { + self.execute_repeatable(move |conn| { + let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? + else { + return Ok::, diesel::result::Error>( + vec![], + ); + }; + + let filter = ObjectFilter { + object_ids: Some(ids.iter().cloned().collect()), + ..Default::default() + }; + + Ok(conn + .results(move || { + build_objects_query( + View::Consistent, + range, + &Page::bounded(ids.len() as u64), + |q| filter.apply(q), + |q| q, + ) + .into_boxed() + })? + .into_iter() + .map(|r| (checkpoint_viewed_at, r)) + .collect()) + }) + }); + + // Wait for the reads to all finish, and gather them into the result map. + let groups = futures::future::join_all(futures).await; + + let mut results = HashMap::new(); + for group in groups { + for (checkpoint_viewed_at, stored) in + group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))? + { + let object = Object::try_from_stored_history_object(stored, checkpoint_viewed_at)?; + + let key = LatestAtKey { + id: object.address, + checkpoint_viewed_at, + }; + + results.insert(key, object); + } + } + + Ok(results) + } +} + impl From<&ObjectKind> for ObjectStatus { fn from(kind: &ObjectKind) -> Self { match kind { diff --git a/crates/sui-indexer/src/models/objects.rs b/crates/sui-indexer/src/models/objects.rs index b7663163731d1..b6295534acc90 100644 --- a/crates/sui-indexer/src/models/objects.rs +++ b/crates/sui-indexer/src/models/objects.rs @@ -132,7 +132,7 @@ impl From for StoredObjectSnapshot { } } -#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)] +#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName, Selectable)] #[diesel(table_name = objects_history, primary_key(object_id, object_version, checkpoint_sequence_number))] pub struct StoredHistoryObject { pub object_id: Vec,