Skip to content

Commit

Permalink
feat: implement phase-2 validation / evaluation (#421)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Dec 30, 2024
1 parent b6b933c commit a62dee9
Show file tree
Hide file tree
Showing 27 changed files with 3,405 additions and 197 deletions.
362 changes: 241 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ paste = "1.0.15"
tower-http = { version = "0.6.1", features = ["cors"] }
chrono = { version = "0.4.39", default-features = false }

# phase2 dependencies
uplc = { git = "https://github.com/pragma-org/uplc.git", optional = true }
# blst = { version = "=0.3.13", optional = true }
rug = { version = "1.26.1", optional = true }

[target.'cfg(not(windows))'.dependencies.mithril-client]
version = "0.10.4"
optional = true
Expand All @@ -80,7 +85,8 @@ tempfile = "3.3.0"
mithril = ["mithril-client"]
utils = ["comfy-table", "inquire", "toml"]
debug = ["console-subscriber", "tokio/tracing"]
default = ["mithril", "utils"]
phase2 = ["uplc", "rug"]
default = ["mithril", "utils", "phase2"]

# The profile that 'cargo dist' will build with
[profile.dist]
Expand Down
2 changes: 1 addition & 1 deletion src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {

let (wal, ledger) = crate::common::open_data_stores(&config)?;
let genesis = Arc::new(crate::common::open_genesis_files(&config.genesis)?);
let mempool = dolos::mempool::Mempool::new();
let mempool = dolos::mempool::Mempool::new(genesis.clone(), ledger.clone());
let exit = crate::common::hook_exit_token();

let sync = dolos::sync::pipeline(
Expand Down
4 changes: 2 additions & 2 deletions src/bin/dolos/data/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use pallas::ledger::traverse::{MultiEraBlock, MultiEraUpdate};
use std::path::Path;

use dolos::{
ledger::{ChainPoint, PParamsBody},
ledger::{ChainPoint, EraCbor},
wal::{redb::WalStore, RawBlock, ReadUtils, WalReader as _},
};

Expand Down Expand Up @@ -107,7 +107,7 @@ pub fn run(config: &crate::Config, _args: &Args) -> miette::Result<()> {

let updates: Vec<_> = updates
.iter()
.map(|PParamsBody(era, cbor)| MultiEraUpdate::decode_for_era(*era, cbor))
.map(|EraCbor(era, cbor)| MultiEraUpdate::decode_for_era(*era, cbor))
.try_collect()
.into_diagnostic()?;

Expand Down
4 changes: 2 additions & 2 deletions src/bin/dolos/eval.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use dolos::ledger::{PParamsBody, TxoRef};
use dolos::ledger::{EraCbor, TxoRef};
use itertools::*;
use miette::{Context, IntoDiagnostic};
use pallas::{
Expand Down Expand Up @@ -82,7 +82,7 @@ pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> {

let updates: Vec<_> = updates
.iter()
.map(|PParamsBody(era, cbor)| -> miette::Result<MultiEraUpdate> {
.map(|EraCbor(era, cbor)| -> miette::Result<MultiEraUpdate> {
MultiEraUpdate::decode_for_era(*era, cbor).into_diagnostic()
})
.try_collect()?;
Expand Down
2 changes: 1 addition & 1 deletion src/bin/dolos/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {

let (wal, ledger) = crate::common::open_data_stores(&config)?;
let genesis = Arc::new(crate::common::open_genesis_files(&config.genesis)?);
let mempool = dolos::mempool::Mempool::new();
let mempool = dolos::mempool::Mempool::new(genesis.clone(), ledger.clone());
let exit = crate::common::hook_exit_token();

dolos::serve::serve(config.serve, genesis, wal, ledger, mempool, exit)
Expand Down
4 changes: 1 addition & 3 deletions src/bin/dolos/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, ledger) = crate::common::open_data_stores(config)?;

let mempool = dolos::mempool::Mempool::new();

let genesis = Arc::new(crate::common::open_genesis_files(&config.genesis)?);
let mempool = dolos::mempool::Mempool::new(genesis.clone(), ledger.clone());

let sync = dolos::sync::pipeline(
&config.sync,
Expand Down
31 changes: 20 additions & 11 deletions src/ledger/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use pallas::ledger::traverse::{Era, MultiEraBlock};
use pallas::ledger::traverse::{Era, MultiEraBlock, MultiEraInput, MultiEraUpdate};
use pallas::{crypto::hash::Hash, ledger::traverse::MultiEraOutput};
use pparams::Genesis;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -29,8 +29,8 @@ impl From<EraCbor> for (Era, Vec<u8>) {
}
}

impl<'a> From<MultiEraOutput<'a>> for EraCbor {
fn from(value: MultiEraOutput<'a>) -> Self {
impl From<MultiEraOutput<'_>> for EraCbor {
fn from(value: MultiEraOutput<'_>) -> Self {
EraCbor(value.era(), value.encode())
}
}
Expand All @@ -43,6 +43,14 @@ impl<'a> TryFrom<&'a EraCbor> for MultiEraOutput<'a> {
}
}

impl TryFrom<EraCbor> for MultiEraUpdate<'_> {
type Error = pallas::codec::minicbor::decode::Error;

fn try_from(value: EraCbor) -> Result<Self, Self::Error> {
MultiEraUpdate::decode_for_era(value.0, &value.1)
}
}

pub type UtxoBody<'a> = MultiEraOutput<'a>;

#[derive(Debug, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)]
Expand All @@ -60,12 +68,15 @@ impl From<TxoRef> for (TxHash, TxoIdx) {
}
}

impl From<&MultiEraInput<'_>> for TxoRef {
fn from(value: &MultiEraInput<'_>) -> Self {
TxoRef(*value.hash(), value.index() as u32)
}
}

#[derive(Debug, Eq, PartialEq, Hash)]
pub struct ChainPoint(pub BlockSlot, pub BlockHash);

#[derive(Debug)]
pub struct PParamsBody(pub Era, pub Vec<u8>);

pub type UtxoMap = HashMap<TxoRef, EraCbor>;

pub type UtxoSet = HashSet<TxoRef>;
Expand Down Expand Up @@ -94,7 +105,7 @@ pub struct LedgerDelta {
pub consumed_utxo: HashMap<TxoRef, EraCbor>,
pub recovered_stxi: HashMap<TxoRef, EraCbor>,
pub undone_utxo: HashMap<TxoRef, EraCbor>,
pub new_pparams: Vec<PParamsBody>,
pub new_pparams: Vec<EraCbor>,
}

/// Computes the ledger delta of applying a particular block.
Expand Down Expand Up @@ -141,17 +152,15 @@ pub fn compute_delta(
}

if let Some(update) = tx.update() {
delta
.new_pparams
.push(PParamsBody(tx.era(), update.encode()));
delta.new_pparams.push(EraCbor(tx.era(), update.encode()));
}
}

// check block-level updates (because of f#!@#@ byron)
if let Some(update) = block.update() {
delta
.new_pparams
.push(PParamsBody(block.era(), update.encode()));
.push(EraCbor(block.era(), update.encode()));
}

Ok(delta)
Expand Down
14 changes: 7 additions & 7 deletions src/ledger/pparams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,13 +509,13 @@ pub fn fold_until_epoch(
let mut summary = ChainSummary::start(&pparams);

if let Some(force_protocol) = genesis.force_protocol {
while summary.current_protocol_version() < force_protocol {
let next_protocol = summary.current_protocol_version() + 1;
while summary.current().protocol_version < force_protocol {
let next_protocol = summary.current().protocol_version + 1;
pparams = migrate_pparams(pparams, genesis, next_protocol);
summary.advance(0, &pparams);

debug!(
protocol = summary.current_protocol_version(),
protocol = summary.current().protocol_version,
"forced hardfork"
);
}
Expand Down Expand Up @@ -549,13 +549,13 @@ pub fn fold_until_epoch(
let version_change = byron_version_change.or(post_byron_version_change);

if let Some(next_protocol) = version_change {
while summary.current_protocol_version() < next_protocol {
let next_protocol = summary.current_protocol_version() + 1;
while summary.current().protocol_version < next_protocol {
let next_protocol = summary.current().protocol_version + 1;
pparams = migrate_pparams(pparams, genesis, next_protocol);
summary.advance(epoch, &pparams);

debug!(
protocol = summary.current_protocol_version(),
protocol = summary.current().protocol_version,
"hardfork executed"
);
}
Expand Down Expand Up @@ -657,7 +657,7 @@ mod tests {
let expected = load_json::<usize, _>(filename);
let (_, summary) = fold_until_epoch(&genesis, &chained_updates, epoch);

assert_eq!(expected, summary.current_protocol_version())
assert_eq!(expected, summary.current().protocol_version)

//assert_eq!(expected, actual)
}
Expand Down
5 changes: 3 additions & 2 deletions src/ledger/pparams/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl ChainSummary {
});
}

pub fn current_protocol_version(&self) -> usize {
self.current.as_ref().unwrap().protocol_version
pub fn current(&self) -> &EraSummary {
// safe to unwrap since it's a business invariant
self.current.as_ref().unwrap()
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@ pub mod state;
pub mod sync;
pub mod wal;

#[cfg(feature = "phase2")]
pub mod uplc;

#[cfg(test)]
mod tests;
103 changes: 78 additions & 25 deletions src/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
use crate::{
ledger::pparams::Genesis,
state::LedgerStore,
uplc::{script_context::SlotConfig, tx, EvalReport},
};

use futures_util::StreamExt;
use itertools::Itertools;
use pallas::{
crypto::hash::Hash,
ledger::traverse::{MultiEraBlock, MultiEraTx},
};
use std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
};
use thiserror::Error;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
Expand All @@ -18,22 +22,26 @@ type TxHash = Hash<32>;

#[derive(Debug, Error)]
pub enum MempoolError {
#[error("traverse error: {0}")]
TraverseError(#[from] pallas::ledger::traverse::Error),

#[error("decode error: {0}")]
DecodeError(pallas::ledger::traverse::Error),
DecodeError(#[from] pallas::codec::minicbor::decode::Error),

#[error("plutus scripts not supported")]
#[cfg(feature = "phase2")]
#[error("tx evaluation failed")]
EvaluationError(#[from] crate::uplc::error::Error),

#[error("state error: {0}")]
StateError(#[from] crate::state::LedgerError),

#[error("plutus not supported")]
PlutusNotSupported,

#[error("invalid tx: {0}")]
InvalidTx(String),
}

impl From<pallas::ledger::traverse::Error> for MempoolError {
fn from(value: pallas::ledger::traverse::Error) -> Self {
Self::DecodeError(value)
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Tx {
pub hash: TxHash,
Expand Down Expand Up @@ -70,20 +78,21 @@ struct MempoolState {
pub struct Mempool {
mempool: Arc<RwLock<MempoolState>>,
updates: broadcast::Sender<Event>,
}

impl Default for Mempool {
fn default() -> Self {
Self::new()
}
genesis: Arc<Genesis>,
ledger: LedgerStore,
}

impl Mempool {
pub fn new() -> Self {
pub fn new(genesis: Arc<Genesis>, ledger: LedgerStore) -> Self {
let mempool = Arc::new(RwLock::new(MempoolState::default()));
let (updates, _) = broadcast::channel(16);

Self { mempool, updates }
Self {
mempool,
updates,
genesis,
ledger,
}
}

pub fn subscribe(&self) -> broadcast::Receiver<Event> {
Expand All @@ -110,20 +119,64 @@ impl Mempool {
);
}

#[cfg(feature = "phase2")]
pub fn evaluate(&self, tx: &MultiEraTx) -> Result<EvalReport, MempoolError> {
let tip = self.ledger.cursor()?;

let updates: Vec<_> = self
.ledger
.get_pparams(tip.as_ref().map(|p| p.0).unwrap_or_default())?;

let updates: Vec<_> = updates.into_iter().map(TryInto::try_into).try_collect()?;

let (pparams, eras) = crate::ledger::pparams::fold(&self.genesis, &updates);

let slot_config = SlotConfig {
slot_length: eras.current().slot_length,
zero_slot: eras.current().start.slot,
zero_time: eras
.current()
.start
.timestamp
.timestamp()
.try_into()
.unwrap(),
};

let input_refs = tx.requires().iter().map(From::from).collect();

let utxos = self.ledger.get_utxos(input_refs)?;

let report = tx::eval_tx(tx, &pparams, &utxos, &slot_config)?;

Ok(report)
}

#[cfg(feature = "phase2")]
pub fn evaluate_raw(&self, cbor: &[u8]) -> Result<EvalReport, MempoolError> {
let tx = MultiEraTx::decode(cbor)?;
self.evaluate(&tx)
}

pub fn receive_raw(&self, cbor: &[u8]) -> Result<TxHash, MempoolError> {
let decoded = MultiEraTx::decode(cbor)?;
let tx = MultiEraTx::decode(cbor)?;

let hash = decoded.hash();
#[cfg(feature = "phase2")]
self.evaluate(&tx)?;

// TODO: we don't phase-2 validate txs before propagating so we could
// propagate p2 invalid transactions resulting in collateral loss
// if we don't have phase-2 enabled, we reject txs before propagating something
// that could result in collateral loss
#[cfg(not(feature = "phase2"))]
if !decoded.redeemers().is_empty() {
return Err(MempoolError::PlutusNotSupported);
}

let hash = tx.hash();

let tx = Tx {
hash,
era: u16::from(decoded.era()) - 1,
// TODO: this is a hack to make the era compatible with the ledger
era: u16::from(tx.era()) - 1,
bytes: cbor.into(),
confirmed: false,
};
Expand Down
2 changes: 1 addition & 1 deletion src/serve/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn serve(
let sync_service = sync::SyncServiceImpl::new(wal.clone(), ledger.clone());
let sync_service = u5c::sync::sync_service_server::SyncServiceServer::new(sync_service);

let query_service = query::QueryServiceImpl::new(ledger.clone(), genesis);
let query_service = query::QueryServiceImpl::new(ledger.clone(), genesis.clone());
let query_service = u5c::query::query_service_server::QueryServiceServer::new(query_service);

let watch_service = watch::WatchServiceImpl::new(wal.clone(), ledger.clone());
Expand Down
Loading

0 comments on commit a62dee9

Please sign in to comment.