Skip to content

Commit

Permalink
feat: Oracle Service (#21)
Browse files Browse the repository at this point in the history
* feat(oracle_service): oracle service

* feat(oracle_service): Drop

* feat(oracle_service): Oracle acquires the lock for the whole update period

* feat(oracle_service):

* feat(oracle_service): Async calls

* feat(oracle_service): Oracle service OKg

* feat(oracle_service): README

* feat(oracle_service): Lint :)
  • Loading branch information
akhercha authored Aug 21, 2024
1 parent 472b8f9 commit f87d59e
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 72 deletions.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,48 @@ brew install protobuf
### Environment Variables

Create an `.env` file following the example file and fill the keys.

##Β Usage

See:

```bash
Usage: vesu-liquidator [OPTIONS] --account-address <LIQUIDATOR ACCOUNT ADDRESS> --network <NETWORK NAME> --rpc-url <RPC URL> --starting-block <BLOCK NUMBER> --pragma-api-base-url <PRAGMA API BASE URL>

Options:
--account-address <LIQUIDATOR ACCOUNT ADDRESS>
Account address of the liquidator account

--private-key <LIQUIDATOR PRIVATE KEY>
Private key of the liquidator account

--keystore-path <LIQUIDATOR KEYSTORE>
Keystore path for the liquidator account

--keystore-password <LIQUIDATOR KEYSTORE PASSWORD>
Keystore password for the liquidator account

-n, --network <NETWORK NAME>
The network chain configuration [possible values: mainnet, sepolia]

--rpc-url <RPC URL>
The rpc endpoint url

--config-path <VESU CONFIG PATH>
Configuration file path [default: config.yaml]

-s, --starting-block <BLOCK NUMBER>
The block you want to start syncing from

--pragma-api-base-url <PRAGMA API BASE URL>
Pragma API Key for indexing

--apibara-api-key <APIBARA API KEY>
Apibara API Key for indexing

--pragma-api-key <PRAGMA API KEY>
Pragma API Key for indexing

-h, --help
Print help
```
5 changes: 2 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod cli;
pub mod config;
pub mod oracle;
pub mod services;
pub mod types;
pub mod utils;
Expand All @@ -16,7 +15,7 @@ use starknet::{

use cli::{NetworkName, RunCmd};
use config::Config;
use services::start_liquidator_services;
use services::start_all_services;
use types::account::StarknetAccount;

fn print_app_title(account_address: Felt, network: NetworkName, starting_block: u64) {
Expand Down Expand Up @@ -52,5 +51,5 @@ async fn main() -> Result<()> {
let rpc_client = Arc::new(JsonRpcClient::new(HttpTransport::new(rpc_url)));
let account = StarknetAccount::from_cli(rpc_client.clone(), run_cmd.clone())?;

start_liquidator_services(config, rpc_client, account, run_cmd).await
start_all_services(config, rpc_client, account, run_cmd).await
}
19 changes: 15 additions & 4 deletions src/services/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::Arc;

use anyhow::Result;
Expand Down Expand Up @@ -26,6 +27,7 @@ use crate::{
};

const INDEXING_STREAM_CHUNK_SIZE: usize = 1024;
const ETHEREUM_DECIMALS: i64 = 18;

pub struct IndexerService {
config: Config,
Expand All @@ -34,6 +36,7 @@ pub struct IndexerService {
apibara_api_key: String,
stream_config: Configuration<Filter>,
positions_sender: Sender<Position>,
seen_positions: HashSet<u64>,
}

impl IndexerService {
Expand Down Expand Up @@ -70,11 +73,12 @@ impl IndexerService {
apibara_api_key,
stream_config,
positions_sender,
seen_positions: HashSet::default(),
}
}

/// Retrieve all the ModifyPosition events emitted from the Vesu Singleton Contract.
pub async fn start(self) -> Result<()> {
pub async fn start(mut self) -> Result<()> {
let (config_client, config_stream) = configuration::channel(INDEXING_STREAM_CHUNK_SIZE);

config_client
Expand Down Expand Up @@ -132,7 +136,7 @@ impl IndexerService {
}

/// Index the provided event & creates a new position.
async fn create_position_from_event(&self, event: Event) -> Result<()> {
async fn create_position_from_event(&mut self, event: Event) -> Result<()> {
if event.from_address.is_none() {
return Ok(());
}
Expand All @@ -149,7 +153,14 @@ impl IndexerService {
if new_position.is_closed() {
return Ok(());
}
let _ = self.positions_sender.try_send(new_position);
let position_key = new_position.key();
if self.seen_positions.insert(position_key) {
println!("[πŸ” Indexer] Found new position 0x{:x}", new_position.key());
}
match self.positions_sender.try_send(new_position) {
Ok(_) => {}
Err(e) => panic!("Could not send position: {}", e),
}
}
Ok(())
}
Expand Down Expand Up @@ -194,7 +205,7 @@ impl IndexerService {
.await?;

// Decimals is always 18 for the ltv_config response
position.lltv = BigDecimal::new(ltv_config[0].to_bigint(), 18);
position.lltv = BigDecimal::new(ltv_config[0].to_bigint(), ETHEREUM_DECIMALS);
Ok(position)
}
}
53 changes: 39 additions & 14 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod indexer;
pub mod monitoring;
pub mod oracle;

use std::{sync::Arc, time::Duration};
use oracle::{LatestOraclePrices, OracleService};
use std::sync::Arc;
use url::Url;

use anyhow::{Context, Result};
Expand All @@ -22,7 +24,7 @@ use crate::{
/// This include:
/// - the indexer service, that indexes blocks & send positions,
/// - the monitoring service, that monitors & liquidates positions.
pub async fn start_liquidator_services(
pub async fn start_all_services(
config: Config,
rpc_client: Arc<JsonRpcClient<HttpTransport>>,
account: StarknetAccount,
Expand All @@ -39,24 +41,32 @@ pub async fn start_liquidator_services(
run_cmd.apibara_api_key.unwrap(),
);

println!("⏳ Waiting a few moment for the indexer to fetch positions...");
tokio::time::sleep(Duration::from_secs(15)).await;
println!("\n⏳ Waiting a few moment for the indexer to fetch positions...\n");

println!("\n🧩 Starting the monitoring service...");
let latest_oracle_prices = LatestOraclePrices::from_config(&config);
println!("\n🧩 Starting the oracle service...");
let oracle_handle = start_oracle_service(
run_cmd.pragma_api_base_url,
run_cmd.pragma_api_key.unwrap(),
latest_oracle_prices.clone(),
);

println!("\n🧩 Starting the monitoring service...\n");
let monitoring_handle = start_monitoring_service(
config.clone(),
rpc_client.clone(),
account,
run_cmd.pragma_api_base_url,
run_cmd.pragma_api_key.unwrap(),
position_receiver,
latest_oracle_prices,
);

// Wait for both tasks to complete, and handle any errors
let (indexer_result, monitoring_result) = tokio::try_join!(indexer_handle, monitoring_handle)?;
// Wait for tasks to complete, and handle any errors
let (indexer_result, oracle_result, monitoring_result) =
tokio::try_join!(indexer_handle, oracle_handle, monitoring_handle)?;

// Handle results from both services
// Handle results
indexer_result?;
oracle_result?;
monitoring_result?;
Ok(())
}
Expand Down Expand Up @@ -85,22 +95,37 @@ fn start_indexer_service(
})
}

/// Starts the oracle service.
fn start_oracle_service(
pragma_api_base_url: Url,
pragma_api_key: String,
latest_oracle_prices: LatestOraclePrices,
) -> JoinHandle<Result<()>> {
let oracle_service =
OracleService::new(pragma_api_base_url, pragma_api_key, latest_oracle_prices);

tokio::spawn(async move {
oracle_service
.start()
.await
.context("😱 Oracle service error")
})
}

/// Starts the monitoring service.
fn start_monitoring_service(
config: Config,
rpc_client: Arc<JsonRpcClient<HttpTransport>>,
account: StarknetAccount,
pragma_api_base_url: Url,
pragma_api_key: String,
position_receiver: Receiver<Position>,
latest_oracle_prices: LatestOraclePrices,
) -> JoinHandle<Result<()>> {
let monitoring_service = MonitoringService::new(
config,
rpc_client,
account,
pragma_api_base_url,
pragma_api_key,
position_receiver,
latest_oracle_prices,
);

tokio::spawn(async move {
Expand Down
30 changes: 14 additions & 16 deletions src/services/monitoring.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{sync::Arc, time::Duration};
use url::Url;

use anyhow::{anyhow, Result};
use bigdecimal::BigDecimal;
Expand All @@ -13,13 +12,14 @@ use tokio::time::interval;

use crate::{
config::Config,
oracle::PragmaOracle,
types::{
account::StarknetAccount,
position::{Position, PositionsMap},
},
};

use super::oracle::LatestOraclePrices;

const CHECK_POSITIONS_INTERVAL: u64 = 10;
const MAX_RETRIES_VERIFY_TX_FINALITY: usize = 10;
const INTERVAL_CHECK_TX_FINALITY: u64 = 3;
Expand All @@ -28,27 +28,26 @@ pub struct MonitoringService {
pub config: Config,
pub rpc_client: Arc<JsonRpcClient<HttpTransport>>,
pub account: StarknetAccount,
pub pragma_oracle: Arc<PragmaOracle>,
pub positions_receiver: Receiver<Position>,
pub positions: PositionsMap,
pub latest_oracle_prices: LatestOraclePrices,
}

impl MonitoringService {
pub fn new(
config: Config,
rpc_client: Arc<JsonRpcClient<HttpTransport>>,
account: StarknetAccount,
pragma_api_base_url: Url,
pragma_api_key: String,
positions_receiver: Receiver<Position>,
latest_oracle_prices: LatestOraclePrices,
) -> MonitoringService {
MonitoringService {
config,
rpc_client,
account,
pragma_oracle: Arc::new(PragmaOracle::new(pragma_api_base_url, pragma_api_key)),
positions_receiver,
positions: PositionsMap::new(),
latest_oracle_prices,
}
}

Expand All @@ -66,10 +65,8 @@ impl MonitoringService {
// Insert the new positions indexed by the IndexerService
maybe_position = self.positions_receiver.recv() => {
match maybe_position {
Some(position) => {
if !position.is_closed() {
self.positions.insert(position).await;
}
Some(new_position) => {
self.positions.0.write().await.insert(new_position.key(), new_position);
}
None => {
return Err(anyhow!("β›” Monitoring stopped unexpectedly."));
Expand All @@ -83,19 +80,18 @@ impl MonitoringService {
/// Update all monitored positions and check if it's worth to liquidate any.
/// TODO: Check issue for multicall update:
/// https://github.com/astraly-labs/vesu-liquidator/issues/12
/// TODO: Check all positions in parallel
async fn monitor_positions_liquidability(&self) -> Result<()> {
let monitored_positions = self.positions.0.read().await;
if monitored_positions.is_empty() {
return Ok(());
}
println!("\nπŸ”Ž Checking if any position is liquidable...");
println!("\n[πŸ”­ Monitoring] Checking if any position is liquidable...");
for (_, position) in monitored_positions.iter() {
if position.is_liquidable(&self.pragma_oracle).await {
if position.is_liquidable(&self.latest_oracle_prices).await {
let _profit_made = self.try_to_liquidate_position(position).await?;
}
}
println!("🀨 They're good.. for now...");
println!("[πŸ”­ Monitoring] 🀨 They're good.. for now...");
Ok(())
}

Expand All @@ -109,7 +105,7 @@ impl MonitoringService {
let tx_hash = tx_hash_felt.to_string();
self.wait_for_tx_to_be_accepted(&tx_hash).await?;
println!(
"βœ… Liquidated position #{}! (TX #{})",
"[πŸ”­ Monitoring] βœ… Liquidated position #{}! (TX #{})",
position.key(),
tx_hash
);
Expand All @@ -120,7 +116,9 @@ impl MonitoringService {
/// Simulates the profit generated by liquidating a given position. Returns the profit
/// and the transactions needed to liquidate the position.
async fn compute_profitability(&self, position: &Position) -> Result<(BigDecimal, Vec<Call>)> {
let liquidable_amount = position.liquidable_amount(&self.pragma_oracle).await?;
let liquidable_amount = position
.liquidable_amount(&self.latest_oracle_prices)
.await?;

let liquidation_txs =
position.get_liquidation_txs(self.config.singleton_address, liquidable_amount.clone());
Expand Down
Loading

0 comments on commit f87d59e

Please sign in to comment.