Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Oracle Service #21

Merged
merged 8 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,48 @@ brew install protobuf
### Environment Variables

Create an `.env` file following the example file and fill the keys.

## Usage

See:

```bash
Usage: vesu-liquidator [OPTIONS] --account-address <LIQUIDATOR ACCOUNT ADDRESS> --network <NETWORK NAME> --rpc-url <RPC URL> --starting-block <BLOCK NUMBER> --pragma-api-base-url <PRAGMA API BASE URL>

Options:
--account-address <LIQUIDATOR ACCOUNT ADDRESS>
Account address of the liquidator account

--private-key <LIQUIDATOR PRIVATE KEY>
Private key of the liquidator account

--keystore-path <LIQUIDATOR KEYSTORE>
Keystore path for the liquidator account

--keystore-password <LIQUIDATOR KEYSTORE PASSWORD>
Keystore password for the liquidator account

-n, --network <NETWORK NAME>
The network chain configuration [possible values: mainnet, sepolia]

--rpc-url <RPC URL>
The rpc endpoint url

--config-path <VESU CONFIG PATH>
Configuration file path [default: config.yaml]

-s, --starting-block <BLOCK NUMBER>
The block you want to start syncing from

--pragma-api-base-url <PRAGMA API BASE URL>
Pragma API Key for indexing

--apibara-api-key <APIBARA API KEY>
Apibara API Key for indexing

--pragma-api-key <PRAGMA API KEY>
Pragma API Key for indexing

-h, --help
Print help
```
5 changes: 2 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod cli;
pub mod config;
pub mod oracle;
pub mod services;
pub mod types;
pub mod utils;
Expand All @@ -16,7 +15,7 @@ use starknet::{

use cli::{NetworkName, RunCmd};
use config::Config;
use services::start_liquidator_services;
use services::start_all_services;
use types::account::StarknetAccount;

fn print_app_title(account_address: Felt, network: NetworkName, starting_block: u64) {
Expand Down Expand Up @@ -52,5 +51,5 @@ async fn main() -> Result<()> {
let rpc_client = Arc::new(JsonRpcClient::new(HttpTransport::new(rpc_url)));
let account = StarknetAccount::from_cli(rpc_client.clone(), run_cmd.clone())?;

start_liquidator_services(config, rpc_client, account, run_cmd).await
start_all_services(config, rpc_client, account, run_cmd).await
}
19 changes: 15 additions & 4 deletions src/services/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::Arc;

use anyhow::Result;
Expand Down Expand Up @@ -26,6 +27,7 @@ use crate::{
};

const INDEXING_STREAM_CHUNK_SIZE: usize = 1024;
const ETHEREUM_DECIMALS: i64 = 18;

pub struct IndexerService {
config: Config,
Expand All @@ -34,6 +36,7 @@ pub struct IndexerService {
apibara_api_key: String,
stream_config: Configuration<Filter>,
positions_sender: Sender<Position>,
seen_positions: HashSet<u64>,
}

impl IndexerService {
Expand Down Expand Up @@ -70,11 +73,12 @@ impl IndexerService {
apibara_api_key,
stream_config,
positions_sender,
seen_positions: HashSet::default(),
}
}

/// Retrieve all the ModifyPosition events emitted from the Vesu Singleton Contract.
pub async fn start(self) -> Result<()> {
pub async fn start(mut self) -> Result<()> {
let (config_client, config_stream) = configuration::channel(INDEXING_STREAM_CHUNK_SIZE);

config_client
Expand Down Expand Up @@ -132,7 +136,7 @@ impl IndexerService {
}

/// Index the provided event & creates a new position.
async fn create_position_from_event(&self, event: Event) -> Result<()> {
async fn create_position_from_event(&mut self, event: Event) -> Result<()> {
if event.from_address.is_none() {
return Ok(());
}
Expand All @@ -149,7 +153,14 @@ impl IndexerService {
if new_position.is_closed() {
return Ok(());
}
let _ = self.positions_sender.try_send(new_position);
let position_key = new_position.key();
if self.seen_positions.insert(position_key) {
println!("[🔍 Indexer] Found new position 0x{:x}", new_position.key());
}
match self.positions_sender.try_send(new_position) {
Ok(_) => {}
Err(e) => panic!("Could not send position: {}", e),
}
}
Ok(())
}
Expand Down Expand Up @@ -194,7 +205,7 @@ impl IndexerService {
.await?;

// Decimals is always 18 for the ltv_config response
position.lltv = BigDecimal::new(ltv_config[0].to_bigint(), 18);
position.lltv = BigDecimal::new(ltv_config[0].to_bigint(), ETHEREUM_DECIMALS);
Ok(position)
}
}
53 changes: 39 additions & 14 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod indexer;
pub mod monitoring;
pub mod oracle;

use std::{sync::Arc, time::Duration};
use oracle::{LatestOraclePrices, OracleService};
use std::sync::Arc;
use url::Url;

use anyhow::{Context, Result};
Expand All @@ -22,7 +24,7 @@ use crate::{
/// This include:
/// - the indexer service, that indexes blocks & send positions,
/// - the monitoring service, that monitors & liquidates positions.
pub async fn start_liquidator_services(
pub async fn start_all_services(
config: Config,
rpc_client: Arc<JsonRpcClient<HttpTransport>>,
account: StarknetAccount,
Expand All @@ -39,24 +41,32 @@ pub async fn start_liquidator_services(
run_cmd.apibara_api_key.unwrap(),
);

println!("⏳ Waiting a few moment for the indexer to fetch positions...");
tokio::time::sleep(Duration::from_secs(15)).await;
println!("\n⏳ Waiting a few moment for the indexer to fetch positions...\n");

println!("\n🧩 Starting the monitoring service...");
let latest_oracle_prices = LatestOraclePrices::from_config(&config);
println!("\n🧩 Starting the oracle service...");
let oracle_handle = start_oracle_service(
run_cmd.pragma_api_base_url,
run_cmd.pragma_api_key.unwrap(),
latest_oracle_prices.clone(),
);

println!("\n🧩 Starting the monitoring service...\n");
let monitoring_handle = start_monitoring_service(
config.clone(),
rpc_client.clone(),
account,
run_cmd.pragma_api_base_url,
run_cmd.pragma_api_key.unwrap(),
position_receiver,
latest_oracle_prices,
);

// Wait for both tasks to complete, and handle any errors
let (indexer_result, monitoring_result) = tokio::try_join!(indexer_handle, monitoring_handle)?;
// Wait for tasks to complete, and handle any errors
let (indexer_result, oracle_result, monitoring_result) =
tokio::try_join!(indexer_handle, oracle_handle, monitoring_handle)?;

// Handle results from both services
// Handle results
indexer_result?;
oracle_result?;
monitoring_result?;
Ok(())
}
Expand Down Expand Up @@ -85,22 +95,37 @@ fn start_indexer_service(
})
}

/// Starts the oracle service.
fn start_oracle_service(
pragma_api_base_url: Url,
pragma_api_key: String,
latest_oracle_prices: LatestOraclePrices,
) -> JoinHandle<Result<()>> {
let oracle_service =
OracleService::new(pragma_api_base_url, pragma_api_key, latest_oracle_prices);

tokio::spawn(async move {
oracle_service
.start()
.await
.context("😱 Oracle service error")
})
}

/// Starts the monitoring service.
fn start_monitoring_service(
config: Config,
rpc_client: Arc<JsonRpcClient<HttpTransport>>,
account: StarknetAccount,
pragma_api_base_url: Url,
pragma_api_key: String,
position_receiver: Receiver<Position>,
latest_oracle_prices: LatestOraclePrices,
) -> JoinHandle<Result<()>> {
let monitoring_service = MonitoringService::new(
config,
rpc_client,
account,
pragma_api_base_url,
pragma_api_key,
position_receiver,
latest_oracle_prices,
);

tokio::spawn(async move {
Expand Down
30 changes: 14 additions & 16 deletions src/services/monitoring.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{sync::Arc, time::Duration};
use url::Url;

use anyhow::{anyhow, Result};
use bigdecimal::BigDecimal;
Expand All @@ -13,13 +12,14 @@ use tokio::time::interval;

use crate::{
config::Config,
oracle::PragmaOracle,
types::{
account::StarknetAccount,
position::{Position, PositionsMap},
},
};

use super::oracle::LatestOraclePrices;

const CHECK_POSITIONS_INTERVAL: u64 = 10;
const MAX_RETRIES_VERIFY_TX_FINALITY: usize = 10;
const INTERVAL_CHECK_TX_FINALITY: u64 = 3;
Expand All @@ -28,27 +28,26 @@ pub struct MonitoringService {
pub config: Config,
pub rpc_client: Arc<JsonRpcClient<HttpTransport>>,
pub account: StarknetAccount,
pub pragma_oracle: Arc<PragmaOracle>,
pub positions_receiver: Receiver<Position>,
pub positions: PositionsMap,
pub latest_oracle_prices: LatestOraclePrices,
}

impl MonitoringService {
pub fn new(
config: Config,
rpc_client: Arc<JsonRpcClient<HttpTransport>>,
account: StarknetAccount,
pragma_api_base_url: Url,
pragma_api_key: String,
positions_receiver: Receiver<Position>,
latest_oracle_prices: LatestOraclePrices,
) -> MonitoringService {
MonitoringService {
config,
rpc_client,
account,
pragma_oracle: Arc::new(PragmaOracle::new(pragma_api_base_url, pragma_api_key)),
positions_receiver,
positions: PositionsMap::new(),
latest_oracle_prices,
}
}

Expand All @@ -66,10 +65,8 @@ impl MonitoringService {
// Insert the new positions indexed by the IndexerService
maybe_position = self.positions_receiver.recv() => {
match maybe_position {
Some(position) => {
if !position.is_closed() {
self.positions.insert(position).await;
}
Some(new_position) => {
self.positions.0.write().await.insert(new_position.key(), new_position);
}
None => {
return Err(anyhow!("⛔ Monitoring stopped unexpectedly."));
Expand All @@ -83,19 +80,18 @@ impl MonitoringService {
/// Update all monitored positions and check if it's worth to liquidate any.
/// TODO: Check issue for multicall update:
/// https://github.com/astraly-labs/vesu-liquidator/issues/12
/// TODO: Check all positions in parallel
async fn monitor_positions_liquidability(&self) -> Result<()> {
let monitored_positions = self.positions.0.read().await;
if monitored_positions.is_empty() {
return Ok(());
}
println!("\n🔎 Checking if any position is liquidable...");
println!("\n[🔭 Monitoring] Checking if any position is liquidable...");
for (_, position) in monitored_positions.iter() {
if position.is_liquidable(&self.pragma_oracle).await {
if position.is_liquidable(&self.latest_oracle_prices).await {
let _profit_made = self.try_to_liquidate_position(position).await?;
}
}
println!("🤨 They're good.. for now...");
println!("[🔭 Monitoring] 🤨 They're good.. for now...");
Ok(())
}

Expand All @@ -109,7 +105,7 @@ impl MonitoringService {
let tx_hash = tx_hash_felt.to_string();
self.wait_for_tx_to_be_accepted(&tx_hash).await?;
println!(
"✅ Liquidated position #{}! (TX #{})",
"[🔭 Monitoring] ✅ Liquidated position #{}! (TX #{})",
position.key(),
tx_hash
);
Expand All @@ -120,7 +116,9 @@ impl MonitoringService {
/// Simulates the profit generated by liquidating a given position. Returns the profit
/// and the transactions needed to liquidate the position.
async fn compute_profitability(&self, position: &Position) -> Result<(BigDecimal, Vec<Call>)> {
let liquidable_amount = position.liquidable_amount(&self.pragma_oracle).await?;
let liquidable_amount = position
.liquidable_amount(&self.latest_oracle_prices)
.await?;

let liquidation_txs =
position.get_liquidation_txs(self.config.singleton_address, liquidable_amount.clone());
Expand Down
Loading
Loading