diff --git a/crates/torii/cli/src/args.rs b/crates/torii/cli/src/args.rs index 87d92df01f..60c106401e 100644 --- a/crates/torii/cli/src/args.rs +++ b/crates/torii/cli/src/args.rs @@ -51,6 +51,9 @@ pub struct ToriiArgs { #[command(flatten)] pub erc: ErcOptions, + #[command(flatten)] + pub sql: SqlOptions, + #[cfg(feature = "server")] #[command(flatten)] pub metrics: MetricsOptions, @@ -107,6 +110,10 @@ impl ToriiArgs { self.erc = config.erc.unwrap_or_default(); } + if self.sql == SqlOptions::default() { + self.sql = config.sql.unwrap_or_default(); + } + #[cfg(feature = "server")] { if self.server == ServerOptions::default() { @@ -136,6 +143,7 @@ pub struct ToriiArgsConfig { pub indexing: Option, pub events: Option, pub erc: Option, + pub sql: Option, #[cfg(feature = "server")] pub metrics: Option, #[cfg(feature = "server")] @@ -167,6 +175,7 @@ impl TryFrom for ToriiArgsConfig { config.events = if args.events == EventsOptions::default() { None } else { Some(args.events) }; config.erc = if args.erc == ErcOptions::default() { None } else { Some(args.erc) }; + config.sql = if args.sql == SqlOptions::default() { None } else { Some(args.sql) }; #[cfg(feature = "server")] { @@ -187,7 +196,7 @@ mod test { use std::net::{IpAddr, Ipv4Addr}; use std::str::FromStr; - use torii_sqlite::types::{Contract, ContractType}; + use torii_sqlite::types::{Contract, ContractType, ModelIndices}; use super::*; @@ -208,6 +217,10 @@ mod test { "ns-E", "ns-EH" ] + + [[sql.model_indices]] + model_tag = "ns-Position" + fields = ["vec.x", "vec.y"] "#; let path = std::env::temp_dir().join("torii-config2.json"); std::fs::write(&path, content).unwrap(); @@ -225,6 +238,8 @@ mod test { "--events.historical", "a-A", "--indexing.transactions", + "--sql.model_indices", + "ns-Position:vec.x,vec.y;ns-Moves:player", "--config", path_str.as_str(), ]; @@ -238,6 +253,19 @@ mod test { assert_eq!(torii_args.events.historical, vec!["a-A".to_string()]); assert_eq!(torii_args.server, ServerOptions::default()); assert!(torii_args.indexing.transactions); + assert_eq!( + torii_args.sql.model_indices, + Some(vec![ + ModelIndices { + model_tag: "ns-Position".to_string(), + fields: vec!["vec.x".to_string(), "vec.y".to_string()], + }, + ModelIndices { + model_tag: "ns-Moves".to_string(), + fields: vec!["player".to_string()], + }, + ]) + ); } #[test] @@ -269,6 +297,10 @@ mod test { "erc721:0x5678" ] namespaces = [] + + [[sql.model_indices]] + model_tag = "ns-Position" + fields = ["vec.x", "vec.y"] "#; let path = std::env::temp_dir().join("torii-config.json"); std::fs::write(&path, content).unwrap(); @@ -303,6 +335,13 @@ mod test { } ] ); + assert_eq!( + torii_args.sql.model_indices, + Some(vec![ModelIndices { + model_tag: "ns-Position".to_string(), + fields: vec!["vec.x".to_string(), "vec.y".to_string()], + }]) + ); assert_eq!(torii_args.server.http_addr, IpAddr::V4(Ipv4Addr::LOCALHOST)); assert_eq!(torii_args.server.http_port, 7777); assert_eq!(torii_args.server.http_cors_origins, Some(vec!["*".to_string()])); diff --git a/crates/torii/cli/src/options.rs b/crates/torii/cli/src/options.rs index 152086ec63..d407bb970d 100644 --- a/crates/torii/cli/src/options.rs +++ b/crates/torii/cli/src/options.rs @@ -6,7 +6,7 @@ use camino::Utf8PathBuf; use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; use starknet::core::types::Felt; -use torii_sqlite::types::{Contract, ContractType}; +use torii_sqlite::types::{Contract, ContractType, ModelIndices}; pub const DEFAULT_HTTP_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST); pub const DEFAULT_HTTP_PORT: u16 = 8080; @@ -372,6 +372,51 @@ impl Default for ErcOptions { } } +#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)] +#[command(next_help_heading = "SQL options")] +pub struct SqlOptions { + /// Whether model tables should default to having indices on all columns + #[arg( + long = "sql.model_indices_keys", + default_value_t = false, + help = "If true, creates indices on only key fields columns of model tables by default. \ + If false, all model field columns will have indices." + )] + #[serde(default)] + pub model_indices_keys: bool, + + /// Specify which fields should have indices for specific models + /// Format: "model_name:field1,field2;another_model:field3,field4" + #[arg( + long = "sql.model_indices", + value_delimiter = ';', + value_parser = parse_model_indices, + help = "Specify which fields should have indices for specific models. Format: \"model_name:field1,field2;another_model:field3,field4\"" + )] + #[serde(default)] + pub model_indices: Option>, +} + +impl Default for SqlOptions { + fn default() -> Self { + Self { model_indices_keys: false, model_indices: None } + } +} + +// Parses clap cli argument which is expected to be in the format: +// - model-tag:field1,field2;othermodel-tag:field3,field4 +fn parse_model_indices(part: &str) -> anyhow::Result { + let parts = part.split(':').collect::>(); + if parts.len() != 2 { + return Err(anyhow::anyhow!("Invalid model indices format")); + } + + let model_tag = parts[0].to_string(); + let fields = parts[1].split(',').map(|s| s.to_string()).collect::>(); + + Ok(ModelIndices { model_tag, fields }) +} + // Parses clap cli argument which is expected to be in the format: // - erc_type:address:start_block // - address:start_block (erc_type defaults to ERC20) diff --git a/crates/torii/runner/src/lib.rs b/crates/torii/runner/src/lib.rs index 3faa6ec409..9149c4ba9e 100644 --- a/crates/torii/runner/src/lib.rs +++ b/crates/torii/runner/src/lib.rs @@ -38,7 +38,7 @@ use torii_sqlite::cache::ModelCache; use torii_sqlite::executor::Executor; use torii_sqlite::simple_broker::SimpleBroker; use torii_sqlite::types::{Contract, ContractType, Model}; -use torii_sqlite::Sql; +use torii_sqlite::{Sql, SqlConfig}; use tracing::{error, info}; use tracing_subscriber::{fmt, EnvFilter}; use url::form_urlencoded; @@ -148,11 +148,12 @@ impl Runner { let executor_handle = tokio::spawn(async move { executor.run().await }); let model_cache = Arc::new(ModelCache::new(readonly_pool.clone())); - let db = Sql::new( + let db = Sql::new_with_config( pool.clone(), sender.clone(), &self.args.indexing.contracts, model_cache.clone(), + SqlConfig { model_indices: self.args.sql.model_indices.unwrap_or_default() }, ) .await?; diff --git a/crates/torii/sqlite/src/lib.rs b/crates/torii/sqlite/src/lib.rs index b546cb7bfd..9a4aa9e657 100644 --- a/crates/torii/sqlite/src/lib.rs +++ b/crates/torii/sqlite/src/lib.rs @@ -21,7 +21,7 @@ use crate::executor::{ Argument, DeleteEntityQuery, EventMessageQuery, QueryMessage, QueryType, ResetCursorsQuery, SetHeadQuery, UpdateCursorsQuery, }; -use crate::types::Contract; +use crate::types::{Contract, ModelIndices}; use crate::utils::utc_dt_string_from_timestamp; type IsEventMessage = bool; @@ -39,12 +39,18 @@ pub mod utils; use cache::{LocalCache, Model, ModelCache}; +#[derive(Debug, Clone, Default)] +pub struct SqlConfig { + pub model_indices: Vec, +} + #[derive(Debug, Clone)] pub struct Sql { pub pool: Pool, pub executor: UnboundedSender, model_cache: Arc, local_cache: Arc, + config: SqlConfig, } #[derive(Debug, Clone)] @@ -60,6 +66,16 @@ impl Sql { executor: UnboundedSender, contracts: &[Contract], model_cache: Arc, + ) -> Result { + Self::new_with_config(pool, executor, contracts, model_cache, Default::default()).await + } + + pub async fn new_with_config( + pool: Pool, + executor: UnboundedSender, + contracts: &[Contract], + model_cache: Arc, + config: SqlConfig, ) -> Result { for contract in contracts { executor.send(QueryMessage::other( @@ -75,8 +91,13 @@ impl Sql { } let local_cache = LocalCache::new(pool.clone()).await; - let db = - Self { pool: pool.clone(), executor, model_cache, local_cache: Arc::new(local_cache) }; + let db = Self { + pool: pool.clone(), + executor, + model_cache, + local_cache: Arc::new(local_cache), + config, + }; db.execute().await?; @@ -769,7 +790,7 @@ impl Sql { )); // Recursively add columns for all nested type - add_columns_recursive( + self.add_columns_recursive( &path, model, &mut columns, @@ -813,77 +834,43 @@ impl Sql { Ok(()) } - pub async fn execute(&self) -> Result<()> { - let (execute, recv) = QueryMessage::execute_recv(); - self.executor.send(execute)?; - recv.await? - } - - pub async fn flush(&self) -> Result<()> { - let (flush, recv) = QueryMessage::flush_recv(); - self.executor.send(flush)?; - recv.await? - } - - pub async fn rollback(&self) -> Result<()> { - let (rollback, recv) = QueryMessage::rollback_recv(); - self.executor.send(rollback)?; - recv.await? - } - - pub async fn add_controller( - &mut self, - username: &str, - address: &str, - block_timestamp: u64, + #[allow(clippy::too_many_arguments)] + fn add_columns_recursive( + &self, + path: &[String], + ty: &Ty, + columns: &mut Vec, + alter_table_queries: &mut Vec, + indices: &mut Vec, + table_id: &str, + upgrade_diff: Option<&Ty>, ) -> Result<()> { - let insert_controller = " - INSERT INTO controllers (id, username, address, deployed_at) - VALUES (?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - username=EXCLUDED.username, - address=EXCLUDED.address, - deployed_at=EXCLUDED.deployed_at - RETURNING *"; + let column_prefix = if path.len() > 1 { path[1..].join(".") } else { String::new() }; - let arguments = vec![ - Argument::String(username.to_string()), - Argument::String(username.to_string()), - Argument::String(address.to_string()), - Argument::String(utc_dt_string_from_timestamp(block_timestamp)), - ]; - - self.executor.send(QueryMessage::other(insert_controller.to_string(), arguments))?; - - Ok(()) - } -} - -fn add_columns_recursive( - path: &[String], - ty: &Ty, - columns: &mut Vec, - alter_table_queries: &mut Vec, - indices: &mut Vec, - table_id: &str, - upgrade_diff: Option<&Ty>, -) -> Result<()> { - let column_prefix = if path.len() > 1 { path[1..].join(".") } else { String::new() }; - - let mut add_column = |name: &str, sql_type: &str| { - if upgrade_diff.is_some() { - alter_table_queries - .push(format!("ALTER TABLE [{table_id}] ADD COLUMN [{name}] {sql_type}")); - } else { - columns.push(format!("[{name}] {sql_type}")); - } - indices.push(format!( - "CREATE INDEX IF NOT EXISTS [idx_{table_id}_{name}] ON [{table_id}] ([{name}]);" - )); - }; + let mut add_column = |name: &str, sql_type: &str| { + if upgrade_diff.is_some() { + alter_table_queries + .push(format!("ALTER TABLE [{table_id}] ADD COLUMN [{name}] {sql_type}")); + } else { + columns.push(format!("[{name}] {sql_type}")); + } + if self + .config + .model_indices + .iter() + .any(|m| m.model_tag == table_id && m.fields.contains(&name.to_string())) + { + indices.push(format!( + "CREATE INDEX IF NOT EXISTS [idx_{table_id}_{name}] ON [{table_id}] \ + ([{name}]);" + )); + } + }; - let modify_column = - |alter_table_queries: &mut Vec, name: &str, sql_type: &str, sql_value: &str| { + let modify_column = |alter_table_queries: &mut Vec, + name: &str, + sql_type: &str, + sql_value: &str| { // SQLite doesn't support ALTER COLUMN directly, so we need to: // 1. Create a temporary table to store the current values // 2. Drop the old column & index @@ -907,170 +894,229 @@ fn add_columns_recursive( )); }; - match ty { - Ty::Struct(s) => { - let struct_diff = - if let Some(upgrade_diff) = upgrade_diff { upgrade_diff.as_struct() } else { None }; - - for member in &s.children { - let member_diff = if let Some(diff) = struct_diff { - if let Some(m) = diff.children.iter().find(|m| m.name == member.name) { - Some(&m.ty) - } else { - // If the member is not in the diff, skip it - continue; - } + match ty { + Ty::Struct(s) => { + let struct_diff = if let Some(upgrade_diff) = upgrade_diff { + upgrade_diff.as_struct() } else { None }; - let mut new_path = path.to_vec(); - new_path.push(member.name.clone()); - - add_columns_recursive( - &new_path, - &member.ty, - columns, - alter_table_queries, - indices, - table_id, - member_diff, - )?; - } - } - Ty::Tuple(tuple) => { - let elements_to_process = if let Some(diff) = upgrade_diff.and_then(|d| d.as_tuple()) { - // Only process elements from the diff - diff.iter() - .filter_map(|m| { - tuple.iter().position(|member| member == m).map(|idx| (idx, m, Some(m))) - }) - .collect() - } else { - // Process all elements - tuple - .iter() - .enumerate() - .map(|(idx, member)| (idx, member, None)) - .collect::>() - }; + for member in &s.children { + let member_diff = if let Some(diff) = struct_diff { + if let Some(m) = diff.children.iter().find(|m| m.name == member.name) { + Some(&m.ty) + } else { + // If the member is not in the diff, skip it + continue; + } + } else { + None + }; - for (idx, member, member_diff) in elements_to_process { - let mut new_path = path.to_vec(); - new_path.push(idx.to_string()); - add_columns_recursive( - &new_path, - member, - columns, - alter_table_queries, - indices, - table_id, - member_diff, - )?; - } - } - Ty::Array(_) => { - let column_name = - if column_prefix.is_empty() { "value".to_string() } else { column_prefix }; + let mut new_path = path.to_vec(); + new_path.push(member.name.clone()); - add_column(&column_name, "TEXT"); - } - Ty::Enum(e) => { - let enum_diff = - if let Some(upgrade_diff) = upgrade_diff { upgrade_diff.as_enum() } else { None }; - - let column_name = - if column_prefix.is_empty() { "option".to_string() } else { column_prefix }; - - let all_options = - e.options.iter().map(|c| format!("'{}'", c.name)).collect::>().join(", "); - - let sql_type = format!( - "TEXT CONSTRAINT [{column_name}_check] CHECK([{column_name}] IN ({all_options}))" - ); - if enum_diff.is_some_and(|diff| diff != e) { - // For upgrades, modify the existing option column to add the new options to the - // CHECK constraint We need to drop the old column and create a new - // one with the new CHECK constraint - modify_column( - alter_table_queries, - &column_name, - &sql_type, - &format!("[{column_name}]"), - ); - } else { - // For new tables, create the column directly - add_column(&column_name, &sql_type); + self.add_columns_recursive( + &new_path, + &member.ty, + columns, + alter_table_queries, + indices, + table_id, + member_diff, + )?; + } } - - for child in &e.options { - // If we have a diff, only process new variants that aren't in the original enum - let variant_diff = if let Some(diff) = enum_diff { - if let Some(v) = diff.options.iter().find(|v| v.name == child.name) { - Some(&v.ty) - } else { - continue; - } + Ty::Tuple(tuple) => { + let elements_to_process = if let Some(diff) = + upgrade_diff.and_then(|d| d.as_tuple()) + { + // Only process elements from the diff + diff.iter() + .filter_map(|m| { + tuple.iter().position(|member| member == m).map(|idx| (idx, m, Some(m))) + }) + .collect() } else { - None + // Process all elements + tuple + .iter() + .enumerate() + .map(|(idx, member)| (idx, member, None)) + .collect::>() }; - if let Ty::Tuple(tuple) = &child.ty { - if tuple.is_empty() { - continue; - } + for (idx, member, member_diff) in elements_to_process { + let mut new_path = path.to_vec(); + new_path.push(idx.to_string()); + self.add_columns_recursive( + &new_path, + member, + columns, + alter_table_queries, + indices, + table_id, + member_diff, + )?; } + } + Ty::Array(_) => { + let column_name = + if column_prefix.is_empty() { "value".to_string() } else { column_prefix }; - let mut new_path = path.to_vec(); - new_path.push(child.name.clone()); - - add_columns_recursive( - &new_path, - &child.ty, - columns, - alter_table_queries, - indices, - table_id, - variant_diff, - )?; + add_column(&column_name, "TEXT"); } - } - Ty::ByteArray(_) => { - let column_name = - if column_prefix.is_empty() { "value".to_string() } else { column_prefix }; + Ty::Enum(e) => { + let enum_diff = if let Some(upgrade_diff) = upgrade_diff { + upgrade_diff.as_enum() + } else { + None + }; - add_column(&column_name, "TEXT"); - } - Ty::Primitive(p) => { - let column_name = - if column_prefix.is_empty() { "value".to_string() } else { column_prefix }; - - if let Some(upgrade_diff) = upgrade_diff { - if let Some(old_primitive) = upgrade_diff.as_primitive() { - // For upgrades to larger numeric types, convert to hex string padded to 64 - // chars - let sql_value = if old_primitive.to_sql_type() == SqlType::Integer - && p.to_sql_type() == SqlType::Text - { - // Convert integer to hex string with '0x' prefix and proper padding - format!("printf('%064x', [{column_name}])") - } else { - format!("[{column_name}]") - }; + let column_name = + if column_prefix.is_empty() { "option".to_string() } else { column_prefix }; + + let all_options = e + .options + .iter() + .map(|c| format!("'{}'", c.name)) + .collect::>() + .join(", "); + let sql_type = format!( + "TEXT CONSTRAINT [{column_name}_check] CHECK([{column_name}] IN \ + ({all_options}))" + ); + if enum_diff.is_some_and(|diff| diff != e) { + // For upgrades, modify the existing option column to add the new options to the + // CHECK constraint We need to drop the old column and create a new + // one with the new CHECK constraint modify_column( alter_table_queries, &column_name, - p.to_sql_type().as_ref(), - &sql_value, + &sql_type, + &format!("[{column_name}]"), ); + } else { + // For new tables, create the column directly + add_column(&column_name, &sql_type); + } + + for child in &e.options { + // If we have a diff, only process new variants that aren't in the original enum + let variant_diff = if let Some(diff) = enum_diff { + if let Some(v) = diff.options.iter().find(|v| v.name == child.name) { + Some(&v.ty) + } else { + continue; + } + } else { + None + }; + + if let Ty::Tuple(tuple) = &child.ty { + if tuple.is_empty() { + continue; + } + } + + let mut new_path = path.to_vec(); + new_path.push(child.name.clone()); + + self.add_columns_recursive( + &new_path, + &child.ty, + columns, + alter_table_queries, + indices, + table_id, + variant_diff, + )?; + } + } + Ty::ByteArray(_) => { + let column_name = + if column_prefix.is_empty() { "value".to_string() } else { column_prefix }; + + add_column(&column_name, "TEXT"); + } + Ty::Primitive(p) => { + let column_name = + if column_prefix.is_empty() { "value".to_string() } else { column_prefix }; + + if let Some(upgrade_diff) = upgrade_diff { + if let Some(old_primitive) = upgrade_diff.as_primitive() { + // For upgrades to larger numeric types, convert to hex string padded to 64 + // chars + let sql_value = if old_primitive.to_sql_type() == SqlType::Integer + && p.to_sql_type() == SqlType::Text + { + // Convert integer to hex string with '0x' prefix and proper padding + format!("printf('%064x', [{column_name}])") + } else { + format!("[{column_name}]") + }; + + modify_column( + alter_table_queries, + &column_name, + p.to_sql_type().as_ref(), + &sql_value, + ); + } + } else { + // New column + add_column(&column_name, p.to_sql_type().as_ref()); } - } else { - // New column - add_column(&column_name, p.to_sql_type().as_ref()); } } + + Ok(()) } - Ok(()) + pub async fn execute(&self) -> Result<()> { + let (execute, recv) = QueryMessage::execute_recv(); + self.executor.send(execute)?; + recv.await? + } + + pub async fn flush(&self) -> Result<()> { + let (flush, recv) = QueryMessage::flush_recv(); + self.executor.send(flush)?; + recv.await? + } + + pub async fn rollback(&self) -> Result<()> { + let (rollback, recv) = QueryMessage::rollback_recv(); + self.executor.send(rollback)?; + recv.await? + } + + pub async fn add_controller( + &mut self, + username: &str, + address: &str, + block_timestamp: u64, + ) -> Result<()> { + let insert_controller = " + INSERT INTO controllers (id, username, address, deployed_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + username=EXCLUDED.username, + address=EXCLUDED.address, + deployed_at=EXCLUDED.deployed_at + RETURNING *"; + + let arguments = vec![ + Argument::String(username.to_string()), + Argument::String(username.to_string()), + Argument::String(address.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ]; + + self.executor.send(QueryMessage::other(insert_controller.to_string(), arguments))?; + + Ok(()) + } } diff --git a/crates/torii/sqlite/src/types.rs b/crates/torii/sqlite/src/types.rs index 3ffa31ed5c..6fea2115eb 100644 --- a/crates/torii/sqlite/src/types.rs +++ b/crates/torii/sqlite/src/types.rs @@ -226,3 +226,9 @@ pub struct ContractCursor { pub last_pending_block_tx: Option, pub last_pending_block_contract_tx: Option, } + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ModelIndices { + pub model_tag: String, + pub fields: Vec, +}