Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Indexer service #20

Merged
merged 7 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/cli/account.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use std::{path::PathBuf, str::FromStr};

use anyhow::{anyhow, Result};
use clap::Args;
use starknet::core::types::Felt;
use std::{path::PathBuf, str::FromStr};

fn parse_str_to_felt(s: &str) -> Result<Felt> {
fn parse_felt(s: &str) -> Result<Felt> {
Felt::from_str(s).map_err(|_| anyhow!("Could not convert {s} to Felt"))
}

#[derive(Clone, Debug, Args)]
pub struct AccountParams {
/// Account address of the liquidator account
#[clap(long, value_parser = parse_str_to_felt, value_name = "LIQUIDATOR ACCOUNT ADDRESS")]
#[clap(long, value_parser = parse_felt, value_name = "LIQUIDATOR ACCOUNT ADDRESS")]
pub account_address: Felt,

/// Private key of the liquidator account
#[clap(long, value_parser = parse_str_to_felt, value_name = "LIQUIDATOR PRIVATE KEY")]
#[clap(long, value_parser = parse_felt, value_name = "LIQUIDATOR PRIVATE KEY")]
pub private_key: Option<Felt>,

/// Keystore path for the liquidator account
Expand Down
28 changes: 21 additions & 7 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
pub mod account;

use std::{env, path::PathBuf};
use url::Url;

use anyhow::{anyhow, Result};
use strum::Display;
use url::Url;

use account::AccountParams;

fn parse_url(s: &str) -> Result<Url, url::ParseError> {
fn parse_url(s: &str) -> Result<Url> {
s.parse()
.map_err(|_| anyhow!("Could not convert {s} to Url"))
}

#[derive(Clone, Debug, clap::Parser)]
Expand Down Expand Up @@ -47,8 +48,11 @@ pub struct RunCmd {
pub pragma_api_key: Option<String>,
}

/// First blocks with Vesu activity. Not necessary to index before.
const FIRST_MAINNET_BLOCK: u64 = 654244;
const FIRST_SEPOLIA_BLOCK: u64 = 77860;

impl RunCmd {
/// Validate CLI arguments
pub fn validate(&mut self) -> Result<()> {
self.account_params.validate()?;
if self.pragma_api_key.is_none() {
Expand All @@ -57,11 +61,21 @@ impl RunCmd {
if self.apibara_api_key.is_none() {
self.apibara_api_key = env::var("APIBARA_API_KEY").ok();
}
if self.pragma_api_key.is_none() && self.apibara_api_key.is_none() {
return Err(anyhow!("Both Pragma API Key and Apibara API Key are missing. Please provide at least one either via command line arguments or environment variables."));
if self.pragma_api_key.is_none() || self.apibara_api_key.is_none() {
return Err(anyhow!("Pragma API Key or Apibara API Key is missing. Please provide at least one via command line arguments or environment variable."));
}
if self.starting_block < 654244 {
self.starting_block = 654244;

match self.network {
NetworkName::Mainnet => {
if self.starting_block <= FIRST_MAINNET_BLOCK {
self.starting_block = FIRST_MAINNET_BLOCK;
}
}
NetworkName::Sepolia => {
if self.starting_block <= FIRST_SEPOLIA_BLOCK {
self.starting_block = FIRST_SEPOLIA_BLOCK;
}
}
}
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use starknet::core::utils::get_selector_from_name;

use crate::cli::{NetworkName, RunCmd};

// Contract selectors
lazy_static! {
pub static ref MODIFY_POSITION_EVENT: Felt = get_selector_from_name("ModifyPosition").unwrap();
pub static ref VESU_POSITION_UNSAFE_SELECTOR: Felt =
Expand Down Expand Up @@ -72,7 +73,7 @@ impl Config {
Ok(config)
}

pub fn get_asset_name_for_address(&self, address: &Felt) -> Option<String> {
pub fn get_asset_ticker_for_address(&self, address: &Felt) -> Option<String> {
self.asset_map
.get(address)
.map(|asset| asset.ticker.clone())
Expand All @@ -83,7 +84,7 @@ impl Config {
}
}

/// Below are the structs that represents the raw config extracted from the yaml file.
// Below are the structs that represents the raw config extracted from the yaml file.

#[derive(Debug, Deserialize, Serialize)]
pub struct RawConfig {
Expand Down
7 changes: 4 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use std::sync::Arc;

use anyhow::Result;
use clap::Parser;
use cli::{NetworkName, RunCmd};
use config::Config;
use services::start_liquidator_services;
use starknet::{
core::types::Felt,
providers::{jsonrpc::HttpTransport, JsonRpcClient},
};

use cli::{NetworkName, RunCmd};
use config::Config;
use services::start_liquidator_services;
use types::account::StarknetAccount;

fn print_app_title(account_address: Felt, network: NetworkName, starting_block: u64) {
Expand Down
8 changes: 5 additions & 3 deletions src/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Result;
use bigdecimal::BigDecimal;
use serde::{Deserialize, Serialize};
use strum::Display;
use url::Url;

use crate::utils::conversions::hexa_price_to_big_decimal;

Expand All @@ -16,20 +17,19 @@ pub struct OracleApiResponse {
#[derive(Debug, Clone)]
pub struct PragmaOracle {
http_client: reqwest::Client,
pub api_url: String,
pub api_url: Url,
pub api_key: String,
pub aggregation_method: AggregationMethod,
pub interval: Interval,
}

impl PragmaOracle {
pub fn new(api_url: String, api_key: String) -> Self {
pub fn new(api_url: Url, api_key: String) -> Self {
Self {
http_client: reqwest::Client::new(),
api_url,
api_key,
aggregation_method: AggregationMethod::Median,
// TODO: Assert that we want OneMinute
interval: Interval::OneMinute,
}
}
Expand All @@ -43,6 +43,8 @@ impl PragmaOracle {
)
}

// TODO: Fix oracle timeout response with a retry
// TODO: cache
pub async fn get_dollar_price(&self, asset_name: String) -> Result<BigDecimal> {
let url = self.fetch_price_url(asset_name.clone(), USD_ASSET.to_owned());
let response = self
Expand Down
68 changes: 35 additions & 33 deletions src/services/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use std::sync::Arc;

use anyhow::Result;
use apibara_core::starknet::v1alpha2::Event;
use apibara_core::{
node::v1alpha2::DataFinality,
starknet::v1alpha2::{Block, Filter, HeaderFilter},
};
use apibara_sdk::{configuration, ClientBuilder, Configuration, Uri};
use bigdecimal::BigDecimal;
use futures_util::TryStreamExt;
use starknet::core::types::Felt;
Expand All @@ -10,19 +16,13 @@ use starknet::{
};
use tokio::sync::mpsc::Sender;

use apibara_core::{
node::v1alpha2::DataFinality,
starknet::v1alpha2::{Block, Filter, HeaderFilter},
};
use apibara_sdk::{configuration, ClientBuilder, Configuration, Uri};

use crate::cli::NetworkName;
use crate::config::{
Config, MODIFY_POSITION_EVENT, VESU_LTV_CONFIG_SELECTOR, VESU_POSITION_UNSAFE_SELECTOR,
};
use crate::{
types::position::Position,
utils::conversions::{apibara_field_element_as_felt, felt_as_apibara_field},
utils::conversions::{apibara_field_as_felt, felt_as_apibara_field},
};

const INDEXING_STREAM_CHUNK_SIZE: usize = 1024;
Expand All @@ -44,15 +44,14 @@ impl IndexerService {
positions_sender: Sender<Position>,
from_block: u64,
) -> IndexerService {
let uri: Uri = match config.network {
let uri = match config.network {
NetworkName::Mainnet => Uri::from_static("https://mainnet.starknet.a5a.ch"),
NetworkName::Sepolia => Uri::from_static("https://sepolia.starknet.a5a.ch"),
};

let stream_config = Configuration::<Filter>::default()
.with_starting_block(from_block)
.with_finality(DataFinality::DataStatusPending)
// TODO: Filter does not seem to do anything. Done manually; investigate
.with_filter(|mut filter| {
filter
.with_header(HeaderFilter::weak())
Expand All @@ -77,6 +76,7 @@ impl IndexerService {
/// Retrieve all the ModifyPosition events emitted from the Vesu Singleton Contract.
pub async fn start(self) -> Result<()> {
let (config_client, config_stream) = configuration::channel(INDEXING_STREAM_CHUNK_SIZE);

config_client
.send(self.stream_config.clone())
.await
Expand All @@ -100,31 +100,10 @@ impl IndexerService {
finality: _,
batch,
} => {
// TODO: Way better filtering :)
for block in batch {
for events_chunk in block.events {
for event in events_chunk.receipt.unwrap().events {
// TODO: Currently hand filtered :)
let from =
apibara_field_element_as_felt(&event.from_address.unwrap());
if from != self.config.singleton_address {
continue;
}
let first = apibara_field_element_as_felt(&event.keys[0]);
if first != MODIFY_POSITION_EVENT.to_owned() {
continue;
}
let third = apibara_field_element_as_felt(&event.keys[3]);
if third == Felt::ZERO {
continue;
}
// Create the new position & update the fields.
if let Some(mut new_position) =
Position::from_event(&self.config, &event.keys)
{
new_position = self.update_position(new_position).await?;
let _ = self.positions_sender.try_send(new_position);
}
for event in block.events {
if let Some(event) = event.event {
self.create_position_from_event(event).await?;
}
}
}
Expand Down Expand Up @@ -152,6 +131,29 @@ impl IndexerService {
}
}

/// Index the provided event & creates a new position.
async fn create_position_from_event(&self, event: Event) -> Result<()> {
if event.from_address.is_none() {
return Ok(());
}

let debt_address = apibara_field_as_felt(&event.keys[3]);
// Corresponds to event associated with the extension contract - we ignore them.
if debt_address == Felt::ZERO {
return Ok(());
}

// Create the new position & update the fields.
if let Some(mut new_position) = Position::from_event(&self.config, &event.keys) {
new_position = self.update_position(new_position).await?;
if new_position.is_closed() {
return Ok(());
}
let _ = self.positions_sender.try_send(new_position);
}
Ok(())
}

/// Update a position given the latest data available.
async fn update_position(&self, mut position: Position) -> Result<Position> {
position = self.update_position_amounts(position).await?;
Expand Down
Loading
Loading