Skip to content

Commit

Permalink
refactor/fix(torii-core): correctly queue entity deletions (#2428)
Browse files Browse the repository at this point in the history
* refactor(torii-core): correctly queue entity deletions

* fmt

* fix: dont use push front???

* remove push front

* clippy

* opt model id

* fix clippy
  • Loading branch information
Larkooo authored Sep 16, 2024
1 parent 6671efa commit f710fa7
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 59 deletions.
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
let entity_id = event.data[ENTITY_ID_INDEX];
let entity = model.schema;

db.delete_entity(entity_id, entity, event_id, block_timestamp).await?;
db.delete_entity(entity_id, selector, entity, event_id, block_timestamp).await?;

Ok(())
}
Expand Down
57 changes: 48 additions & 9 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,18 @@ pub struct QueryQueue {
pub publish_queue: VecDeque<BrokerMessage>,
}

#[derive(Debug, Clone)]
pub struct DeleteEntityQuery {
pub entity_id: String,
pub event_id: String,
pub block_timestamp: String,
pub entity: Ty,
}

#[derive(Debug, Clone)]
pub enum QueryType {
SetEntity(Ty),
DeleteEntity(DeleteEntityQuery),
Other,
}

Expand All @@ -57,15 +66,6 @@ impl QueryQueue {
self.queue.push_back((statement.into(), arguments, query_type));
}

pub fn push_front<S: Into<String>>(
&mut self,
statement: S,
arguments: Vec<Argument>,
query_type: QueryType,
) {
self.queue.push_front((statement.into(), arguments, query_type));
}

pub fn push_publish(&mut self, value: BrokerMessage) {
self.publish_queue.push_back(value);
}
Expand Down Expand Up @@ -97,6 +97,45 @@ impl QueryQueue {
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
}
QueryType::DeleteEntity(entity) => {
let delete_model = query.execute(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
if delete_model.rows_affected() == 0 {
continue;
}

let row = sqlx::query(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \
event_id=? WHERE id = ? RETURNING *",
)
.bind(entity.block_timestamp)
.bind(entity.event_id)
.bind(entity.entity_id)
.fetch_one(&mut *tx)
.await?;
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity.entity);

let count = sqlx::query_scalar::<_, i64>(
"SELECT count(*) FROM entity_model WHERE entity_id = ?",
)
.bind(entity_updated.id.clone())
.fetch_one(&mut *tx)
.await?;

// Delete entity if all of its models are deleted
if count == 0 {
sqlx::query("DELETE FROM entities WHERE id = ?")
.bind(entity_updated.id.clone())
.execute(&mut *tx)
.await?;
entity_updated.deleted = true;
}

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
}
QueryType::Other => {
query.execute(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
Expand Down
67 changes: 18 additions & 49 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use chrono::Utc;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{EnumOption, Member, Ty};
use dojo_world::contracts::abi::model::Layout;
use dojo_world::contracts::naming::{compute_selector_from_names, compute_selector_from_tag};
use dojo_world::contracts::naming::compute_selector_from_names;
use dojo_world::metadata::WorldMetadata;
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
Expand All @@ -16,10 +16,9 @@ use starknet_crypto::poseidon_hash_many;
use tracing::debug;

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue, QueryType};
use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType};
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered,
};
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp};

Expand Down Expand Up @@ -281,6 +280,7 @@ impl Sql {
pub async fn delete_entity(
&mut self,
entity_id: Felt,
model_id: Felt,
entity: Ty,
event_id: &str,
block_timestamp: u64,
Expand All @@ -289,49 +289,18 @@ impl Sql {
let path = vec![entity.name()];
// delete entity models data
self.build_delete_entity_queries_recursive(path, &entity_id, &entity);
self.execute().await?;

let deleted_entity_model =
sqlx::query("DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?")
.bind(&entity_id)
.bind(format!("{:#x}", compute_selector_from_tag(&entity.name())))
.execute(&self.pool)
.await?;
if deleted_entity_model.rows_affected() == 0 {
// fail silently. we have no entity-model relation to delete.
// this can happen if a entity model that doesnt exist
// got deleted
return Ok(());
}

let mut update_entity = sqlx::query_as::<_, EntityUpdated>(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id \
= ? RETURNING *",
)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.bind(event_id)
.bind(&entity_id)
.fetch_one(&self.pool)
.await?;
update_entity.updated_model = Some(entity.clone());

let models_count =
sqlx::query_scalar::<_, u32>("SELECT count(*) FROM entity_model WHERE entity_id = ?")
.bind(&entity_id)
.fetch_one(&self.pool)
.await?;

if models_count == 0 {
// delete entity
sqlx::query("DELETE FROM entities WHERE id = ?")
.bind(&entity_id)
.execute(&self.pool)
.await?;

update_entity.deleted = true;
}
self.query_queue.enqueue(
"DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?",
vec![Argument::String(entity_id.clone()), Argument::String(format!("{:#x}", model_id))],
QueryType::DeleteEntity(DeleteEntityQuery {
entity_id: entity_id.clone(),
event_id: event_id.to_string(),
block_timestamp: utc_dt_string_from_timestamp(block_timestamp),
entity: entity.clone(),
}),
);

self.query_queue.push_publish(BrokerMessage::EntityUpdated(update_entity));
Ok(())
}

Expand Down Expand Up @@ -797,7 +766,7 @@ impl Sql {
Ty::Struct(s) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue.push_front(
self.query_queue.enqueue(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
Expand All @@ -818,7 +787,7 @@ impl Sql {

let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue.push_front(
self.query_queue.enqueue(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
Expand All @@ -839,7 +808,7 @@ impl Sql {
Ty::Array(array) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue.push_front(
self.query_queue.enqueue(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
Expand All @@ -854,7 +823,7 @@ impl Sql {
Ty::Tuple(t) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue.push_front(
self.query_queue.enqueue(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
Expand Down

0 comments on commit f710fa7

Please sign in to comment.