Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add feasibility_check on mined solution #483

Merged
merged 10 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
989 changes: 693 additions & 296 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ tokio = { version = "1.26", features = ["macros", "rt-multi-thread", "sync", "si
pin-project-lite = "0.2"

# subxt
subxt = "0.27.1"
subxt = { git = "https://github.com/paritytech/subxt" }
scale-value = "0.6.0"

# substrate
frame-election-provider-support = "16.0.0"
pallet-election-provider-multi-phase = "15.0.0"
sp-npos-elections = "14.0.0"
frame-support = "16.0.0"
sp-core = "16.0.0"
sp-runtime = "18.0.0"
sp-io = { git = "https://github.com/paritytech/substrate" }
frame-system = { git = "https://github.com/paritytech/substrate" }
frame-election-provider-support = { git = "https://github.com/paritytech/substrate" }
pallet-election-provider-multi-phase = { git = "https://github.com/paritytech/substrate" }
sp-npos-elections = { git = "https://github.com/paritytech/substrate" }
frame-support = { git = "https://github.com/paritytech/substrate" }
sp-version = { git = "https://github.com/paritytech/substrate" }
sp-runtime = { git = "https://github.com/paritytech/substrate" }

# prometheus
prometheus = "0.13"
Expand All @@ -38,7 +40,6 @@ once_cell = "1.17"
[dev-dependencies]
assert_cmd = "2.0"
sp-storage = "11.0.0"
sp-keyring = "18.0.0"
regex = "1"

[features]
Expand Down
Binary file modified artifacts/metadata.scale
Binary file not shown.
9 changes: 8 additions & 1 deletion src/dry_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,15 @@ where

log::info!(target: LOG_TARGET, "Loaded account {}, {:?}", signer, account_info);

let round = api
.storage()
.at(config.at)
.await?
.fetch_or_default(&runtime::storage().election_provider_multi_phase().round())
.await?;

let (solution, score, _size) =
epm::fetch_snapshot_and_mine_solution::<T>(&api, config.at, config.solver).await?;
epm::fetch_snapshot_and_mine_solution::<T>(&api, config.at, config.solver, round).await?;

let round = api
.storage()
Expand Down
113 changes: 69 additions & 44 deletions src/epm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
use crate::{helpers::RuntimeDispatchInfo, opt::Solver, prelude::*, static_types};
use codec::{Decode, Encode};
use frame_election_provider_support::{NposSolution, PhragMMS, SequentialPhragmen};
use frame_support::{weights::Weight, BoundedVec};
use frame_support::weights::Weight;
use pallet_election_provider_multi_phase::{RawSolution, SolutionOf, SolutionOrSnapshotSize};
use runtime::runtime_types::pallet_election_provider_multi_phase::RoundSnapshot;
use scale_info::{PortableRegistry, TypeInfo};
use scale_value::scale::{decode_as_type, TypeId};
use sp_core::Bytes;
Expand All @@ -30,6 +29,11 @@ use subxt::{dynamic::Value, rpc::rpc_params, tx::DynamicTxPayload};

const EPM_PALLET_NAME: &str = "ElectionProviderMultiPhase";

type MinerVoterOf =
frame_election_provider_support::Voter<AccountId, crate::static_types::MaxVotesPerVoter>;

type RoundSnapshot = pallet_election_provider_multi_phase::RoundSnapshot<AccountId, MinerVoterOf>;

#[derive(Copy, Clone, Debug)]
struct EpmConstant {
epm: &'static str,
Expand Down Expand Up @@ -57,33 +61,26 @@ pub(crate) async fn update_metadata_constants(api: &SubxtClient) -> Result<(), E
const SIGNED_MAX_WEIGHT: EpmConstant = EpmConstant::new("SignedMaxWeight");
const MAX_LENGTH: EpmConstant = EpmConstant::new("MinerMaxLength");
const MAX_VOTES_PER_VOTER: EpmConstant = EpmConstant::new("MinerMaxVotesPerVoter");
const MAX_WINNERS: EpmConstant = EpmConstant::new("MinerMaxWinners");

fn log_metadata(metadata: EpmConstant, val: impl std::fmt::Display) {
log::trace!(target: LOG_TARGET, "updating metadata constant `{metadata}`: {val}",);
}

let max_weight = read_constant::<Weight>(api, SIGNED_MAX_WEIGHT)?;
let max_length: u32 = read_constant(api, MAX_LENGTH)?;
let max_votes_per_voter: u32 = read_constant(api, MAX_VOTES_PER_VOTER)?;
let max_winners: u32 = read_constant(api, MAX_WINNERS)?;

log::trace!(
target: LOG_TARGET,
"updating metadata constant `{}`: {}",
SIGNED_MAX_WEIGHT.to_string(),
max_weight.ref_time()
);
log::trace!(
target: LOG_TARGET,
"updating metadata constant `{}`: {}",
MAX_LENGTH.to_string(),
max_length
);
log::trace!(
target: LOG_TARGET,
"updating metadata constant `{}`: {}",
MAX_VOTES_PER_VOTER.to_string(),
max_votes_per_voter
);
log_metadata(SIGNED_MAX_WEIGHT, max_weight);
log_metadata(MAX_LENGTH, max_length);
log_metadata(MAX_VOTES_PER_VOTER, max_votes_per_voter);
log_metadata(MAX_WINNERS, max_winners);

static_types::MaxWeight::set(max_weight);
static_types::MaxLength::set(max_length);
static_types::MaxVotesPerVoter::set(max_votes_per_voter);
static_types::MaxWinners::set(max_winners);

Ok(())
}
Expand Down Expand Up @@ -150,12 +147,28 @@ pub async fn signed_submission_at<S: NposSolution + Decode + TypeInfo + 'static>
}
}

/// Helper to the signed submissions at the block `at`.
pub async fn snapshot_at(at: Option<Hash>, api: &SubxtClient) -> Result<RoundSnapshot, Error> {
let empty = Vec::<Value>::new();
let addr = subxt::dynamic::storage(EPM_PALLET_NAME, "Snapshot", empty);

match api.storage().at(at).await?.fetch(&addr).await {
Ok(Some(val)) => {
let snapshot = Decode::decode(&mut val.encoded())?;
Ok(snapshot)
},
Ok(None) => Err(Error::EmptySnapshot),
Err(err) => Err(err.into()),
}
}

/// Helper to fetch snapshot data via RPC
/// and compute an NPos solution via [`pallet_election_provider_multi_phase`].
pub async fn fetch_snapshot_and_mine_solution<T>(
api: &SubxtClient,
hash: Option<Hash>,
solver: Solver,
round: u32,
) -> Result<(SolutionOf<T>, ElectionScore, SolutionOrSnapshotSize), Error>
where
T: MinerConfig<AccountId = AccountId, MaxVotesPerVoter = static_types::MaxVotesPerVoter>
Expand All @@ -164,31 +177,24 @@ where
+ 'static,
T::Solution: Send,
{
let RoundSnapshot { voters, targets } = api
let snapshot = snapshot_at(hash, &api).await?;
let desired_targets = api
.storage()
.at(hash)
.await?
.fetch(&runtime::storage().election_provider_multi_phase().snapshot())
.fetch(&runtime::storage().election_provider_multi_phase().desired_targets())
.await?
.unwrap_or_default();
.expect("Snapshot is non-empty; `desired_target` should exist; qed");

let desired_targets = api
let minimum_untrusted_score = api
.storage()
.at(hash)
.await?
.fetch(&runtime::storage().election_provider_multi_phase().desired_targets())
.await?
.unwrap_or_default();

let voters: Vec<_> = voters
.into_iter()
.map(|(a, b, mut c)| {
let mut bounded_vec: BoundedVec<AccountId, static_types::MaxVotesPerVoter> = BoundedVec::default();
// If this fails just crash the task.
bounded_vec.try_append(&mut c.0).unwrap_or_else(|_| panic!("BoundedVec capacity: {} failed; `MinerConfig::MaxVotesPerVoter` is different from the chain data; this is a bug please file an issue", static_types::MaxVotesPerVoter::get()));
(a, b, bounded_vec)
})
.collect();
.fetch(&runtime::storage().election_provider_multi_phase().minimum_untrusted_score())
.await?;

let voters = snapshot.voters.clone();
let targets = snapshot.targets.clone();

let blocking_task = tokio::task::spawn_blocking(move || match solver {
Solver::SeqPhragmen { iterations } => {
Expand All @@ -209,9 +215,24 @@ where
.await;

match blocking_task {
Ok(Ok(res)) => Ok(res),
Ok(Ok((solution, score, solution_or_snapshot))) => {
match Miner::<T>::feasibility_check(
RawSolution { solution: solution.clone(), score, round },
pallet_election_provider_multi_phase::ElectionCompute::Signed,
desired_targets,
snapshot,
round,
minimum_untrusted_score,
) {
Ok(_) => Ok((solution, score, solution_or_snapshot)),
Err(e) => {
log::error!(target: LOG_TARGET, "Solution feasibility error {:?}", e);
Err(Error::Feasibility(format!("{:?}", e)))
},
}
},
Ok(Err(err)) => Err(Error::Other(format!("{:?}", err))),
Err(err) => Err(Error::Other(format!("{:?}", err))),
Err(err) => Err(err.into()),
}
}

Expand Down Expand Up @@ -278,14 +299,18 @@ pub async fn runtime_api_solution_weight<S: Encode + NposSolution + TypeInfo + '
}

/// Helper to mock the votes based on `voters` and `desired_targets`.
pub fn mock_votes(voters: u32, desired_targets: u16) -> Vec<(u32, u16)> {
assert!(voters >= desired_targets as u32);
(0..voters).zip((0..desired_targets).cycle()).collect()
pub fn mock_votes(voters: u32, desired_targets: u16) -> Option<Vec<(u32, u16)>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored to not panic with weird input here

if voters >= desired_targets as u32 {
Some((0..voters).zip((0..desired_targets).cycle()).collect())
} else {
None
}
}

#[cfg(test)]
#[test]
fn mock_votes_works() {
assert_eq!(mock_votes(3, 2), vec![(0, 0), (1, 1), (2, 0)]);
assert_eq!(mock_votes(3, 3), vec![(0, 0), (1, 1), (2, 2)]);
assert_eq!(mock_votes(3, 2), Some(vec![(0, 0), (1, 1), (2, 0)]));
assert_eq!(mock_votes(3, 3), Some(vec![(0, 0), (1, 1), (2, 2)]));
assert_eq!(mock_votes(2, 3), None);
}
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::prelude::*;

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed to parse log directive: `{0}´")]
Expand Down Expand Up @@ -48,4 +50,10 @@ pub enum Error {
SubscriptionClosed,
#[error("Dynamic transaction error: {0}")]
DynamicTransaction(String),
#[error("Feasibility error: {0}")]
Feasibility(String),
#[error("{0}")]
JoinError(#[from] tokio::task::JoinError),
#[error("Empty snapshot")]
EmptySnapshot,
}
65 changes: 65 additions & 0 deletions src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{error::Error, prelude::*};
use codec::Decode;
use frame_support::weights::Weight;
use jsonrpsee::{core::Error as JsonRpseeError, types::error::CallError};
use pin_project_lite::pin_project;
use serde::Deserialize;
use std::{
Expand All @@ -24,6 +26,7 @@ use std::{
task::{Context, Poll},
time::{Duration, Instant},
};
use subxt::error::{Error as SubxtError, RpcError};

pin_project! {
pub struct Timed<Fut>
Expand Down Expand Up @@ -73,3 +76,65 @@ pub struct RuntimeDispatchInfo {
/// Weight of this dispatch.
pub weight: Weight,
}

pub fn kill_main_task_if_critical_err(tx: &tokio::sync::mpsc::UnboundedSender<Error>, err: Error) {
match err {
Error::AlreadySubmitted |
Error::BetterScoreExist |
Error::IncorrectPhase |
Error::TransactionRejected(_) |
Error::JoinError(_) |
Error::Feasibility(_) |
Error::EmptySnapshot |
Error::SubscriptionClosed => {},
Error::Subxt(SubxtError::Rpc(rpc_err)) => {
log::debug!(target: LOG_TARGET, "rpc error: {:?}", rpc_err);

match rpc_err {
RpcError::ClientError(e) => {
let jsonrpsee_err = match e.downcast::<JsonRpseeError>() {
Ok(e) => *e,
Err(_) => {
let _ = tx.send(Error::Other(
"Failed to downcast RPC error; this is a bug please file an issue"
.to_string(),
));
return
},
};

match jsonrpsee_err {
JsonRpseeError::Call(CallError::Custom(e)) => {
const BAD_EXTRINSIC_FORMAT: i32 = 1001;
const VERIFICATION_ERROR: i32 = 1002;
use jsonrpsee::types::error::ErrorCode;

// Check if the transaction gets fatal errors from the `author` RPC.
// It's possible to get other errors such as outdated nonce and similar
// but then it should be possible to try again in the next block or round.
if e.code() == BAD_EXTRINSIC_FORMAT ||
e.code() == VERIFICATION_ERROR || e.code() ==
ErrorCode::MethodNotFound.code()
{
let _ = tx.send(Error::Subxt(SubxtError::Rpc(
RpcError::ClientError(Box::new(CallError::Custom(e))),
)));
}
},
JsonRpseeError::Call(CallError::Failed(_)) => {},
JsonRpseeError::RequestTimeout => {},
err => {
let _ = tx.send(Error::Subxt(SubxtError::Rpc(RpcError::ClientError(
Box::new(err),
))));
},
}
},
RpcError::SubscriptionDropped => (),
}
},
err => {
let _ = tx.send(err);
},
}
}
Loading