diff --git a/ethcore/src/engines/authority_round/mod.rs b/ethcore/src/engines/authority_round/mod.rs index 8ceb6220c86..99d1d7353c1 100644 --- a/ethcore/src/engines/authority_round/mod.rs +++ b/ethcore/src/engines/authority_round/mod.rs @@ -382,12 +382,16 @@ impl Decodable for SealedEmptyStep { } } +struct PermissionedStep { + inner: Step, + can_propose: AtomicBool, +} + /// Engine using `AuthorityRound` proof-of-authority BFT consensus. pub struct AuthorityRound { transition_service: IoService<()>, - step: Arc, - can_propose: AtomicBool, - client: RwLock>>, + step: Arc, + client: Arc>>>, signer: RwLock, validators: Box, validate_score_transition: u64, @@ -407,7 +411,7 @@ pub struct AuthorityRound { // header-chain validator. struct EpochVerifier { - step: Arc, + step: Arc, subchain_validators: SimpleList, empty_steps_transition: u64, } @@ -415,7 +419,7 @@ struct EpochVerifier { impl super::EpochVerifier for EpochVerifier { fn verify_light(&self, header: &Header) -> Result<(), Error> { // Validate the timestamp - verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?)?; + verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?)?; // always check the seal since it's fast. // nothing heavier to do. verify_external(header, &self.subchain_validators, self.empty_steps_transition) @@ -615,13 +619,15 @@ impl AuthorityRound { let engine = Arc::new( AuthorityRound { transition_service: IoService::<()>::start()?, - step: Arc::new(Step { - inner: AtomicUsize::new(initial_step), - calibrate: our_params.start_step.is_none(), - duration: our_params.step_duration, + step: Arc::new(PermissionedStep { + inner: Step { + inner: AtomicUsize::new(initial_step), + calibrate: our_params.start_step.is_none(), + duration: our_params.step_duration, + }, + can_propose: AtomicBool::new(true), }), - can_propose: AtomicBool::new(true), - client: RwLock::new(None), + client: Arc::new(RwLock::new(None)), signer: Default::default(), validators: our_params.validators, validate_score_transition: our_params.validate_score_transition, @@ -641,7 +647,10 @@ impl AuthorityRound { // Do not initialize timeouts for tests. if should_timeout { - let handler = TransitionHandler { engine: Arc::downgrade(&engine) }; + let handler = TransitionHandler { + step: engine.step.clone(), + client: engine.client.clone(), + }; engine.transition_service.register_handler(Arc::new(handler))?; } Ok(engine) @@ -667,7 +676,7 @@ impl AuthorityRound { } fn generate_empty_step(&self, parent_hash: &H256) { - let step = self.step.load(); + let step = self.step.inner.load(); let empty_step_rlp = empty_step_rlp(step, parent_hash); if let Ok(signature) = self.sign(keccak(&empty_step_rlp)).map(Into::into) { @@ -699,34 +708,37 @@ fn unix_now() -> Duration { } struct TransitionHandler { - engine: Weak, + step: Arc, + client: Arc>>>, } const ENGINE_TIMEOUT_TOKEN: TimerToken = 23; impl IoHandler<()> for TransitionHandler { fn initialize(&self, io: &IoContext<()>) { - if let Some(engine) = self.engine.upgrade() { - let remaining = engine.step.duration_remaining().as_millis(); - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining)) - .unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e)) - } + let remaining = self.step.inner.duration_remaining().as_millis(); + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining)) + .unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e)) } fn timeout(&self, io: &IoContext<()>, timer: TimerToken) { if timer == ENGINE_TIMEOUT_TOKEN { - if let Some(engine) = self.engine.upgrade() { - // NOTE we might be lagging by couple of steps in case the timeout - // has not been called fast enough. - // Make sure to advance up to the actual step. - while engine.step.duration_remaining().as_millis() == 0 { - engine.step(); + // NOTE we might be lagging by couple of steps in case the timeout + // has not been called fast enough. + // Make sure to advance up to the actual step. + while self.step.inner.duration_remaining().as_millis() == 0 { + self.step.inner.increment(); + self.step.can_propose.store(true, AtomicOrdering::SeqCst); + if let Some(ref weak) = *self.client.read() { + if let Some(c) = weak.upgrade() { + c.update_sealing(); + } } - - let next_run_at = engine.step.duration_remaining().as_millis() >> 2; - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at)) - .unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e)) } + + let next_run_at = self.step.inner.duration_remaining().as_millis() >> 2; + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at)) + .unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e)) } } } @@ -743,8 +755,8 @@ impl Engine for AuthorityRound { } fn step(&self) { - self.step.increment(); - self.can_propose.store(true, AtomicOrdering::SeqCst); + self.step.inner.increment(); + self.step.can_propose.store(true, AtomicOrdering::SeqCst); if let Some(ref weak) = *self.client.read() { if let Some(c) = weak.upgrade() { c.update_sealing(); @@ -791,7 +803,7 @@ impl Engine for AuthorityRound { fn populate_from_parent(&self, header: &mut Header, parent: &Header) { let parent_step = header_step(parent, self.empty_steps_transition).expect("Header has been verified; qed"); - let current_step = self.step.load(); + let current_step = self.step.inner.load(); let current_empty_steps_len = if header.number() >= self.empty_steps_transition { self.empty_steps(parent_step.into(), current_step.into(), parent.hash()).len() @@ -817,7 +829,7 @@ impl Engine for AuthorityRound { let empty_step: EmptyStep = rlp.as_val().map_err(fmt_err)?;; if empty_step.verify(&*self.validators).unwrap_or(false) { - if self.step.check_future(empty_step.step).is_ok() { + if self.step.inner.check_future(empty_step.step).is_ok() { trace!(target: "engine", "handle_message: received empty step message {:?}", empty_step); self.handle_empty_step_message(empty_step); } else { @@ -837,7 +849,7 @@ impl Engine for AuthorityRound { fn generate_seal(&self, block: &ExecutedBlock, parent: &Header) -> Seal { // first check to avoid generating signature most of the time // (but there's still a race to the `compare_and_swap`) - if !self.can_propose.load(AtomicOrdering::SeqCst) { + if !self.step.can_propose.load(AtomicOrdering::SeqCst) { trace!(target: "engine", "Aborting seal generation. Can't propose."); return Seal::None; } @@ -846,7 +858,7 @@ impl Engine for AuthorityRound { let parent_step: U256 = header_step(parent, self.empty_steps_transition) .expect("Header has been verified; qed").into(); - let step = self.step.load(); + let step = self.step.inner.load(); // filter messages from old and future steps and different parents let empty_steps = if header.number() >= self.empty_steps_transition { @@ -923,7 +935,7 @@ impl Engine for AuthorityRound { trace!(target: "engine", "generate_seal: Issuing a block for step {}.", step); // only issue the seal if we were the first to reach the compare_and_swap. - if self.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) { + if self.step.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) { self.clear_empty_steps(parent_step); @@ -999,7 +1011,7 @@ impl Engine for AuthorityRound { .decode()?; let parent_step = header_step(&parent, self.empty_steps_transition)?; - let current_step = self.step.load(); + let current_step = self.step.inner.load(); self.empty_steps(parent_step.into(), current_step.into(), parent.hash()) } else { // we're verifying a block, extract empty steps from the seal @@ -1049,7 +1061,7 @@ impl Engine for AuthorityRound { // If yes then probably benign reporting needs to be moved further in the verification. let set_number = header.number(); - match verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?) { + match verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?) { Err(BlockError::InvalidSeal) => { self.validators.report_benign(header.author(), set_number, header.number()); Err(BlockError::InvalidSeal.into()) @@ -1291,7 +1303,7 @@ impl Engine for AuthorityRound { // This way, upon encountering an epoch change, the proposer from the // new set will be forced to wait until the next step to avoid sealing a // block that breaks the invariant that the parent's step < the block's step. - self.can_propose.store(false, AtomicOrdering::SeqCst); + self.step.can_propose.store(false, AtomicOrdering::SeqCst); return Some(combine_proofs(signal_number, &pending.proof, &*finality_proof)); } }