Skip to content

Commit

Permalink
fetch contract firsdt
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Oct 4, 2024
1 parent caf38b2 commit 938eabf
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 26 deletions.
12 changes: 4 additions & 8 deletions crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,14 @@ impl<'c> Executor<'c> {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;

sqlx::query("UPDATE contracts SET tps = ? WHERE id = ?")
let row = sqlx::query("UPDATE contracts SET tps = ? WHERE id = ? RETURNING *")
.bind(tps as i64)
.bind(format!("{:#x}", set_head.contract_address))
.execute(&mut **tx)
.fetch_one(&mut **tx)
.await?;

self.publish_queue.push(BrokerMessage::SetHead(ContractUpdated {
head: set_head.head,
tps,
last_block_timestamp: set_head.last_block_timestamp,
contract_address: set_head.contract_address,
}));
let contract = ContractUpdated::from_row(&row)?;
self.publish_queue.push(BrokerMessage::SetHead(contract));
}
QueryType::SetEntity(entity) => {
let row = query.fetch_one(&mut **tx).await.with_context(|| {
Expand Down
8 changes: 4 additions & 4 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ pub struct Event {
#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Contract {
pub head: u64,
pub tps: u64,
pub last_block_timestamp: u64,
pub contract_address: Felt,
pub head: i64,
pub tps: i64,
pub last_block_timestamp: i64,
pub contract_address: String,
}
6 changes: 3 additions & 3 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ message SubscribeIndexerRequest {

// A response containing indexer updates.
message SubscribeIndexerResponse {
uint64 head = 1;
uint64 tps = 2;
uint64 last_block_timestamp = 3;
int64 head = 1;
int64 tps = 2;
int64 last_block_timestamp = 3;
bytes contract_address = 4;
}

Expand Down
2 changes: 1 addition & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ impl DojoWorld {
contract_address: Felt,
) -> Result<Receiver<Result<proto::world::SubscribeIndexerResponse, tonic::Status>>, Error>
{
self.indexer_manager.add_subscriber(contract_address).await
self.indexer_manager.add_subscriber(&self.pool, contract_address).await
}

async fn subscribe_models(
Expand Down
26 changes: 19 additions & 7 deletions crates/torii/grpc/src/server/subscriptions/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::{Stream, StreamExt};
use rand::Rng;
use sqlx::{FromRow, Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use torii_core::error::Error;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
use torii_core::types::Contract as ContractUpdated;
use tracing::{error, trace};
Expand All @@ -35,6 +37,7 @@ pub struct IndexerManager {
impl IndexerManager {
pub async fn add_subscriber(
&self,
pool: &Pool<Sqlite>,
contract_address: Felt,
) -> Result<Receiver<Result<proto::world::SubscribeIndexerResponse, tonic::Status>>, Error>
{
Expand All @@ -44,11 +47,19 @@ impl IndexerManager {
// NOTE: unlock issue with firefox/safari
// initially send empty stream message to return from
// initial subscribe call
let contract = sqlx::query(
"SELECT head, tps, last_block_timestamp, contract_address FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", contract_address))
.fetch_one(pool)
.await?;
let contract = ContractUpdated::from_row(&contract)?;

let _ = sender
.send(Ok(SubscribeIndexerResponse {
head: 0,
tps: 0,
last_block_timestamp: 0,
head: contract.head,
tps: contract.tps,
last_block_timestamp: contract.last_block_timestamp,
contract_address: contract_address.to_bytes_be().to_vec(),
}))
.await;
Expand Down Expand Up @@ -80,18 +91,19 @@ impl Service {
update: &ContractUpdated,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let contract_address =
Felt::from_str(&update.contract_address).map_err(ParseError::FromStr)?;

for (idx, sub) in subs.subscribers.read().await.iter() {
if sub.contract_address != Felt::ZERO && sub.contract_address != update.contract_address
{
if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address {
continue;
}

let resp = SubscribeIndexerResponse {
head: update.head,
tps: update.tps,
last_block_timestamp: update.last_block_timestamp,
contract_address: update.contract_address.to_bytes_be().to_vec(),
contract_address: contract_address.to_bytes_be().to_vec(),
};

if sub.sender.send(Ok(resp)).await.is_err() {
Expand Down
6 changes: 3 additions & 3 deletions crates/torii/grpc/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ pub mod schema;

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct IndexerUpdate {
pub head: u64,
pub tps: u64,
pub last_block_timestamp: u64,
pub head: i64,
pub tps: i64,
pub last_block_timestamp: i64,
pub contract_address: Felt,
}

Expand Down

0 comments on commit 938eabf

Please sign in to comment.