Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pallas stake snapshots integration #1513

Merged
merged 12 commits into from
Feb 22, 2024
8 changes: 4 additions & 4 deletions Cargo.lock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@falcucci, actually this file should not be modified, except for the version of the mithril-common that is bumped.
Can you roll it back completely and just bump the mithril-common version?

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

208 changes: 192 additions & 16 deletions mithril-common/src/chain_observer/pallas_observer.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use bech32::{self, ToBase32, Variant};
use pallas_addresses::Address;
use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
use pallas_network::{
facades::NodeClient,
miniprotocols::localstate::{
queries_v16::{
self, Addr, Addrs, PostAlonsoTransactionOutput, TransactionOutput, UTxOByAddress,
self, Addr, Addrs, PostAlonsoTransactionOutput, StakeSnapshot, TransactionOutput,
UTxOByAddress,
},
Client,
},
};

use pallas_primitives::ToCanonicalJson;
use std::path::{Path, PathBuf};
use std::{
collections::BTreeSet,
path::{Path, PathBuf},
};

use crate::{
chain_observer::{interface::*, ChainAddress, TxDatum},
Expand Down Expand Up @@ -175,6 +181,58 @@ impl PallasChainObserver {
Ok(utxo)
}

/// Fetches the current stake distribution using the provided `statequery` client.
async fn do_stake_snapshots_state_query(
&self,
statequery: &mut Client,
) -> StdResult<StakeSnapshot> {
statequery
.acquire(None)
.await
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to acquire statequery")?;

let era = queries_v16::get_current_era(statequery)
.await
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to get current era")?;

let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
.await
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to get stake snapshot")?;

Ok(state_snapshot)
}

fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
let pool_hash = bech32::encode("pool", key.to_base32(), Variant::Bech32)
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to encode stake pool hash")?;

Ok(pool_hash)
}

async fn get_stake_distribution_snapshot(
falcucci marked this conversation as resolved.
Show resolved Hide resolved
&self,
client: &mut NodeClient,
) -> Result<Option<StakeDistribution>, ChainObserverError> {
let statequery = client.statequery();

let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;

let mut stake_distribution = StakeDistribution::new();

for (key, stakes) in stake_snapshot.snapshots.stake_snapshots.iter() {
if stakes.snapshot_mark_pool > 0 {
let pool_hash = self.get_stake_pool_hash(key)?;
stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
}
}
falcucci marked this conversation as resolved.
Show resolved Hide resolved

Ok(Some(stake_distribution))
}

/// Processes a state query with the `NodeClient`, releasing the state query.
async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
let statequery = client.statequery();
Expand Down Expand Up @@ -245,8 +303,15 @@ impl ChainObserver for PallasChainObserver {
async fn get_current_stake_distribution(
&self,
) -> Result<Option<StakeDistribution>, ChainObserverError> {
let fallback = self.get_fallback();
fallback.get_current_stake_distribution().await
let mut client = self.get_client().await?;

let stake_pools = self.get_stake_distribution_snapshot(&mut client).await?;
falcucci marked this conversation as resolved.
Show resolved Hide resolved

self.post_process_statequery(&mut client).await?;

client.abort().await;

Ok(stake_pools)
}

async fn get_current_kes_period(
Expand All @@ -264,7 +329,7 @@ mod tests {

use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
use pallas_crypto::hash::Hash;
use pallas_network::miniprotocols::localstate::{queries_v16::Value, ClientQueryRequest};
use pallas_network::miniprotocols::localstate::{self, queries_v16::Value, ClientQueryRequest};
use tokio::net::UnixListener;

use super::*;
Expand Down Expand Up @@ -302,6 +367,64 @@ mod tests {
UTxOByAddress { utxo }
}

fn get_fake_stake_snapshot() -> StakeSnapshot {
let stake_snapshots = KeyValuePairs::from(vec![
(
Bytes::from(
hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
.unwrap(),
),
localstate::queries_v16::Stakes {
falcucci marked this conversation as resolved.
Show resolved Hide resolved
snapshot_mark_pool: 300000000001,
snapshot_set_pool: 300000000002,
snapshot_go_pool: 300000000000,
},
),
(
Bytes::from(
hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
.unwrap(),
),
localstate::queries_v16::Stakes {
snapshot_mark_pool: 600000000001,
snapshot_set_pool: 600000000002,
snapshot_go_pool: 600000000000,
},
),
(
Bytes::from(
hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
.unwrap(),
),
localstate::queries_v16::Stakes {
snapshot_mark_pool: 1200000000001,
snapshot_set_pool: 1200000000002,
snapshot_go_pool: 1200000000000,
},
),
(
Bytes::from(
hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
.unwrap(),
),
localstate::queries_v16::Stakes {
snapshot_mark_pool: 0,
snapshot_set_pool: 1300000000002,
snapshot_go_pool: 0,
},
),
]);

localstate::queries_v16::StakeSnapshot {
snapshots: localstate::queries_v16::Snapshots {
stake_snapshots,
snapshot_stake_mark_total: 2100000000003,
snapshot_stake_set_total: 2100000000006,
snapshot_stake_go_total: 2100000000000,
},
}
}

/// pallas responses mock server.
async fn mock_server(server: &mut pallas_network::facades::NodeServer) -> AnyCbor {
let query: queries_v16::Request =
Expand All @@ -311,27 +434,45 @@ mod tests {
};

match query {
queries_v16::Request::LedgerQuery(queries_v16::LedgerQuery::HardForkQuery(
queries_v16::HardForkQuery::GetCurrentEra,
)) => AnyCbor::from_encode(4),
queries_v16::Request::LedgerQuery(queries_v16::LedgerQuery::BlockQuery(
_,
queries_v16::BlockQuery::GetEpochNo,
)) => AnyCbor::from_encode([8]),
queries_v16::Request::LedgerQuery(queries_v16::LedgerQuery::BlockQuery(
_,
queries_v16::BlockQuery::GetUTxOByAddress(_),
)) => AnyCbor::from_encode(get_fake_utxo_by_address()),
localstate::queries_v16::Request::LedgerQuery(
localstate::queries_v16::LedgerQuery::HardForkQuery(
localstate::queries_v16::HardForkQuery::GetCurrentEra,
),
) => AnyCbor::from_encode(4),
localstate::queries_v16::Request::LedgerQuery(
localstate::queries_v16::LedgerQuery::BlockQuery(
_,
localstate::queries_v16::BlockQuery::GetEpochNo,
),
) => AnyCbor::from_encode([8]),
localstate::queries_v16::Request::LedgerQuery(
localstate::queries_v16::LedgerQuery::BlockQuery(
_,
localstate::queries_v16::BlockQuery::GetUTxOByAddress(_),
),
) => AnyCbor::from_encode(get_fake_utxo_by_address()),
localstate::queries_v16::Request::LedgerQuery(
localstate::queries_v16::LedgerQuery::BlockQuery(
_,
localstate::queries_v16::BlockQuery::GetStakeSnapshots(_),
),
) => AnyCbor::from_encode(get_fake_stake_snapshot()),
_ => panic!("unexpected query from client: {query:?}"),
}
}

/// Creates a new work directory in the system's temporary folder.
fn create_temp_dir(folder_name: &str) -> PathBuf {
#[cfg(not(target_os = "macos"))]
let temp_dir = std::env::temp_dir()
.join("mithril_test")
.join("pallas_chain_observer_test")
.join(folder_name);

// macOS-domain addresses are variable-length filesystem pathnames of at most 104 characters.
#[cfg(target_os = "macos")]
let temp_dir: PathBuf = std::env::temp_dir().join(folder_name);

if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).expect("Previous work dir removal failed");
}
Expand Down Expand Up @@ -403,4 +544,39 @@ mod tests {
let datums = client_res.expect("Client failed");
assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
}

#[tokio::test]
async fn get_current_stake_distribution_fallback() {
falcucci marked this conversation as resolved.
Show resolved Hide resolved
let socket_path =
create_temp_dir("get_current_stake_distribution_fallback").join("node.socket");
let server = setup_server(socket_path.clone()).await;
let client = tokio::spawn(async move {
let fallback = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
let observer = super::PallasChainObserver::new(
socket_path.as_path(),
CardanoNetwork::TestNet(10),
fallback,
);
observer.get_current_stake_distribution().await.unwrap()
});

let (_, client_res) = tokio::join!(server, client);
let computed_stake_distribution = client_res.unwrap().unwrap();

let mut expected_stake_distribution = StakeDistribution::new();
expected_stake_distribution.insert(
"pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
300000000001,
);
expected_stake_distribution.insert(
"pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
600000000001,
);
expected_stake_distribution.insert(
"pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
1200000000001,
);

assert_eq!(expected_stake_distribution, computed_stake_distribution);
}
}
Loading