Skip to content

Commit

Permalink
fix(runtime update): wait until upgrade on chain (#1321)
Browse files Browse the repository at this point in the history
* fix(runtime update): wait until upgrade on chain

* address grumbles

* Update subxt/src/client/online_client.rs

* Update subxt/src/client/online_client.rs

* fix nits and debug logs

* remove debug logs
  • Loading branch information
niklasad1 authored Jan 8, 2024
1 parent c5948dc commit 298869b
Showing 1 changed file with 64 additions and 14 deletions.
78 changes: 64 additions & 14 deletions subxt/src/client/online_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
backend::{
legacy::LegacyBackend, rpc::RpcClient, Backend, BackendExt, RuntimeVersion, StreamOfResults,
},
blocks::BlocksClient,
blocks::{BlockRef, BlocksClient},
constants::ConstantsClient,
error::Error,
events::EventsClient,
Expand Down Expand Up @@ -430,24 +430,18 @@ pub struct RuntimeUpdaterStream<T: Config> {
impl<T: Config> RuntimeUpdaterStream<T> {
/// Wait for the next runtime update.
pub async fn next(&mut self) -> Option<Result<Update, Error>> {
let maybe_runtime_version = self.stream.next().await?;

let runtime_version = match maybe_runtime_version {
let runtime_version = match self.stream.next().await? {
Ok(runtime_version) => runtime_version,
Err(err) => return Some(Err(err)),
};

let latest_block_ref = match self.client.backend().latest_finalized_block_ref().await {
Ok(block_ref) => block_ref,
Err(e) => return Some(Err(e)),
};
let at =
match wait_runtime_upgrade_in_finalized_block(&self.client, &runtime_version).await? {
Ok(at) => at,
Err(err) => return Some(Err(err)),
};

let metadata = match OnlineClient::fetch_metadata(
self.client.backend(),
latest_block_ref.hash(),
)
.await
{
let metadata = match OnlineClient::fetch_metadata(self.client.backend(), at.hash()).await {
Ok(metadata) => metadata,
Err(err) => return Some(Err(err)),
};
Expand Down Expand Up @@ -484,3 +478,59 @@ impl Update {
&self.metadata
}
}

/// Helper to wait until the runtime upgrade is applied on at finalized block.
async fn wait_runtime_upgrade_in_finalized_block<T: Config>(
client: &OnlineClient<T>,
runtime_version: &RuntimeVersion,
) -> Option<Result<BlockRef<T::Hash>, Error>> {
use scale_value::At;

let mut block_sub = match client.backend().stream_finalized_block_headers().await {
Ok(s) => s,
Err(err) => return Some(Err(err)),
};

let block_ref = loop {
let (_, block_ref) = match block_sub.next().await? {
Ok(n) => n,
Err(err) => return Some(Err(err)),
};

let key: Vec<scale_value::Value> = vec![];
let addr = crate::dynamic::storage("System", "LastRuntimeUpgrade", key);

let chunk = match client.storage().at(block_ref.hash()).fetch(&addr).await {
Ok(Some(v)) => v,
Ok(None) => {
// The storage `system::lastRuntimeUpgrade` should always exist.
// <https://github.com/paritytech/polkadot-sdk/blob/master/substrate/frame/system/src/lib.rs#L958>
unreachable!("The storage item `system::lastRuntimeUpgrade` should always exist")
}
Err(e) => return Some(Err(e)),
};

let scale_val = match chunk.to_value() {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};

let Some(Ok(spec_version)) = scale_val
.at("spec_version")
.and_then(|v| v.as_u128())
.map(u32::try_from)
else {
return Some(Err(Error::Other(
"Decoding `RuntimeVersion::spec_version` as u32 failed".to_string(),
)));
};

// We are waiting for the chain to have the same spec version
// as sent out via the runtime subscription.
if spec_version == runtime_version.spec_version {
break block_ref;
}
};

Some(Ok(block_ref))
}

0 comments on commit 298869b

Please sign in to comment.