Skip to content

Commit

Permalink
feat(vn): recognize abandoned state (#4272)
Browse files Browse the repository at this point in the history
Description
---
This is a subtask for quarantining a contract. It lets the VN's monitoring the contract to flag it as abandoned (locally only) when checkpoints have been missed.

Motivation and Context
---
VN's need to be more aware of the status of contracts this is an initital step before quarantine happens.

How Has This Been Tested?
---

- [x] Tested manually
- [ ] ~Test written~ To much work to get contract state from the vn for a single spec
  • Loading branch information
brianp authored Jul 12, 2022
1 parent 850e78f commit e42085a
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 18 deletions.
108 changes: 98 additions & 10 deletions applications/tari_validator_node/src/contract_worker_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use log::*;
use tari_common_types::types::{FixedHash, FixedHashSizeError};
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_dht::Dht;
use tari_core::transactions::transaction_components::ContractConstitution;
use tari_core::transactions::transaction_components::{ContractConstitution, OutputType};
use tari_crypto::tari_utilities::{hex::Hex, message_format::MessageFormat, ByteArray};
use tari_dan_core::{
models::{AssetDefinition, BaseLayerMetadata, Committee},
Expand Down Expand Up @@ -137,7 +137,10 @@ impl ContractWorkerManager {
pub async fn start(mut self) -> Result<(), WorkerManagerError> {
self.load_initial_state()?;

info!("constitution_auto_accept is {}", self.config.constitution_auto_accept);
info!(
target: LOG_TARGET,
"constitution_auto_accept is {}", self.config.constitution_auto_accept
);

if !self.config.scan_for_assets {
info!(
Expand All @@ -156,12 +159,14 @@ impl ContractWorkerManager {

let tip = self.base_node_client.get_tip_info().await?;
let new_contracts = self.scan_for_new_contracts(&tip).await?;
self.set_last_scanned_block(&tip)?;

if self.config.constitution_auto_accept {
self.accept_contracts(new_contracts).await?;
}

self.validate_contract_activity(&tip).await?;

self.set_last_scanned_block(&tip)?;
tokio::select! {
_ = time::sleep(Duration::from_secs(self.config.constitution_management_polling_interval_in_seconds)) => {},
_ = &mut self.shutdown => break
Expand All @@ -171,13 +176,47 @@ impl ContractWorkerManager {
Ok(())
}

async fn validate_contract_activity(&mut self, tip: &BaseLayerMetadata) -> Result<(), WorkerManagerError> {
let active_contracts = self.global_db.get_contracts_with_state(ContractState::Active)?;

for contract in active_contracts {
let contract_id = FixedHash::try_from(contract.contract_id)?;
info!("Validating contract={} activity", contract_id.to_hex());

if let Some(checkpoint) = self.scan_for_last_checkpoint(tip, &contract_id).await? {
let constitution = ContractConstitution::from_binary(&*contract.constitution).map_err(|error| {
WorkerManagerError::DataCorruption {
details: error.to_string(),
}
})?;

if tip.height_of_longest_chain >
checkpoint.mined_height + constitution.checkpoint_params.abandoned_interval
{
self.global_db
.update_contract_state(contract_id, ContractState::Abandoned)?;

info!(
target: LOG_TARGET,
"Contract={} has missed checkpoints and has been marked Abandoned",
contract_id.to_hex()
);
}
}
}

Ok(())
}

async fn start_active_contracts(&mut self) -> Result<(), WorkerManagerError> {
let active_contracts = self.global_db.get_active_contracts()?;
// Abandoned contracts can be revived by the VNC so they should continue to monitor them
let mut active_contracts = self.global_db.get_contracts_with_state(ContractState::Active)?;
active_contracts.append(&mut self.global_db.get_contracts_with_state(ContractState::Abandoned)?);

for contract in active_contracts {
let contract_id = FixedHash::try_from(contract.contract_id)?;

println!("Starting contract {}", contract_id);
info!(target: LOG_TARGET, "Starting contract {}", contract_id.to_hex());

let constitution = ContractConstitution::from_binary(&*contract.constitution).map_err(|error| {
WorkerManagerError::DataCorruption {
Expand Down Expand Up @@ -238,6 +277,40 @@ impl ContractWorkerManager {
Ok(())
}

async fn scan_for_last_checkpoint(
&mut self,
tip: &BaseLayerMetadata,
contract_id: &FixedHash,
) -> Result<Option<Checkpoint>, WorkerManagerError> {
info!(
target: LOG_TARGET,
"Scanning base layer (tip: {}) for last checkpoint of contract={}",
tip.height_of_longest_chain,
contract_id
);

let outputs = self
.base_node_client
.get_current_contract_outputs(
tip.height_of_longest_chain,
*contract_id,
OutputType::ContractCheckpoint,
)
.await?;

let mut outputs = outputs
.iter()
.map({
|output| Checkpoint {
mined_height: output.mined_height,
}
})
.collect::<Vec<Checkpoint>>();
outputs.sort_by(|l, r| l.mined_height.partial_cmp(&r.mined_height).unwrap());

Ok(outputs.pop())
}

async fn scan_for_new_contracts(
&mut self,
tip: &BaseLayerMetadata,
Expand Down Expand Up @@ -288,8 +361,13 @@ impl ContractWorkerManager {
};

match self.global_db.save_contract(contract.into(), ContractState::Expired) {
Ok(_) => info!("Saving expired contract data id={}", contract_id.to_hex()),
Ok(_) => info!(
target: LOG_TARGET,
"Saving expired contract data id={}",
contract_id.to_hex()
),
Err(error) => error!(
target: LOG_TARGET,
"Couldn't save expired contract data id={} received error={}",
contract_id.to_hex(),
error.to_string()
Expand All @@ -309,8 +387,13 @@ impl ContractWorkerManager {
.global_db
.save_contract(contract.clone().into(), ContractState::Pending)
{
Ok(_) => info!("Saving contract data id={}", contract.contract_id.to_hex()),
Ok(_) => info!(
target: LOG_TARGET,
"Saving contract data id={}",
contract.contract_id.to_hex()
),
Err(error) => error!(
target: LOG_TARGET,
"Couldn't save contract data id={} received error={}",
contract.contract_id.to_hex(),
error.to_string()
Expand Down Expand Up @@ -447,8 +530,8 @@ impl ContractWorkerManager {
.publish_acceptance(&self.identity, &contract.contract_id)
.await?;
info!(
"Contract {} acceptance submitted with id={}",
contract.contract_id, tx_id
target: LOG_TARGET,
"Contract {} acceptance submitted with id={}", contract.contract_id, tx_id
);
Ok(())
}
Expand All @@ -475,13 +558,18 @@ pub enum WorkerManagerError {
#[error("Storage error: {0}")]
StorageError(#[from] StorageError),
#[error("DigitalAsset error: {0}")]
DigitalAssetErrror(#[from] DigitalAssetError),
DigitalAssetError(#[from] DigitalAssetError),
// TODO: remove dead_code
#[allow(dead_code)]
#[error("Data corruption: {details}")]
DataCorruption { details: String },
}

#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
struct Checkpoint {
pub mined_height: u64,
}

#[derive(Debug, Clone)]
struct ActiveContract {
pub constitution: ContractConstitution,
Expand Down
7 changes: 5 additions & 2 deletions dan_layer/core/src/storage/global/global_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ impl<TGlobalDbBackendAdapter: GlobalDbBackendAdapter> GlobalDb<TGlobalDbBackendA
.map_err(TGlobalDbBackendAdapter::Error::into)
}

pub fn get_active_contracts(&self) -> Result<Vec<TGlobalDbBackendAdapter::Model>, StorageError> {
pub fn get_contracts_with_state(
&self,
state: ContractState,
) -> Result<Vec<TGlobalDbBackendAdapter::Model>, StorageError> {
self.adapter
.get_active_contracts()
.get_contracts_with_state(state)
.map_err(TGlobalDbBackendAdapter::Error::into)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub trait GlobalDbBackendAdapter: Send + Sync + Clone {
) -> Result<Option<Vec<u8>>, Self::Error>;
fn save_contract(&self, contract: Self::NewModel, state: ContractState) -> Result<(), Self::Error>;
fn update_contract_state(&self, contract_id: FixedHash, state: ContractState) -> Result<(), Self::Error>;
fn get_active_contracts(&self) -> Result<Vec<Self::Model>, Self::Error>;
fn get_contracts_with_state(&self, state: ContractState) -> Result<Vec<Self::Model>, Self::Error>;
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -69,8 +69,9 @@ pub enum ContractState {
Expired = 2,
QuorumMet = 3,
Active = 4,
Quarantined = 5,
Shutdown = 6,
Abandoned = 5,
Quarantined = 6,
Shutdown = 7,
}

impl ContractState {
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/core/src/storage/mocks/global_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl GlobalDbBackendAdapter for MockGlobalDbBackupAdapter {
todo!()
}

fn get_active_contracts(&self) -> Result<Vec<Self::Model>, Self::Error> {
fn get_contracts_with_state(&self, _state: ContractState) -> Result<Vec<Self::Model>, Self::Error> {
todo!()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ impl GlobalDbBackendAdapter for SqliteGlobalDbBackendAdapter {
Ok(())
}

fn get_active_contracts(&self) -> Result<Vec<Contract>, Self::Error> {
fn get_contracts_with_state(&self, state: ContractState) -> Result<Vec<Contract>, Self::Error> {
use crate::global::schema::{contracts, contracts::dsl};
let tx = self.create_transaction()?;

dsl::contracts
.filter(contracts::state.eq(i32::from(ContractState::Active.as_byte())))
.filter(contracts::state.eq(i32::from(state.as_byte())))
.load::<Contract>(tx.connection())
.map_err(|source| SqliteStorageError::DieselError {
source,
Expand Down

0 comments on commit e42085a

Please sign in to comment.