Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unstable_rpc: Add transactionBroadcast and transactionStop #1497

Merged
merged 9 commits into from
Mar 26, 2024
22 changes: 22 additions & 0 deletions subxt/src/backend/unstable/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,28 @@ impl<T: Config> UnstableRpcMethods<T> {

Ok(TransactionSubscription { sub, done: false })
}

/// Broadcast the transaction on the p2p network until the
///[`Self::transaction_unstable_stop`] is called.
lexnv marked this conversation as resolved.
Show resolved Hide resolved
///
/// Returns an operation ID that can be used to stop the broadcasting process.
/// Returns `None` if the server cannot handle the request at the moment.
pub async fn transaction_unstable_broadcast(&self, tx: &[u8]) -> Result<Option<String>, Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has the PR to rename to v1 merged in Substrate already? :)

Copy link
Member

@niklasad1 niklasad1 Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.client
.request("transaction_unstable_broadcast", rpc_params![to_hex(tx)])
.await
}

/// Stop the broadcasting process of the transaction.
///
/// The operation ID is obtained from the [`Self::transaction_unstable_broadcast`] method.
///
/// Returns an error if the operation ID does not correspond to any active transaction for this connection.
pub async fn transaction_unstable_stop(&self, operation_id: &str) -> Result<(), Error> {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
self.client
.request("transaction_unstable_stop", rpc_params![operation_id])
.await
}
}

/// This represents events generated by the `follow` method.
Expand Down
2 changes: 1 addition & 1 deletion testing/integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ scale-info = { workspace = true, features = ["bit-vec"] }
sp-core = { workspace = true }
syn = { workspace = true }
subxt = { workspace = true, features = ["unstable-metadata", "native", "jsonrpsee", "substrate-compat"] }
subxt-signer = { workspace = true }
subxt-signer = { workspace = true, features = ["default"] }
subxt-codegen = { workspace = true }
subxt-metadata = { workspace = true }
test-runtime = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use subxt::{
FollowEvent, Initialized, MethodResponse, RuntimeEvent, RuntimeVersionEvent, StorageQuery,
StorageQueryType,
},
utils::AccountId32,
utils::{AccountId32, MultiAddress},
};
use subxt_signer::sr25519::dev;

Expand Down Expand Up @@ -309,3 +309,65 @@ async fn next_operation_event<

panic!("Cannot find operation related event after {NUM_EVENTS} produced events");
}

#[tokio::test]
async fn transaction_unstable_broadcast() {
let bob = dev::bob();
let bob_address: MultiAddress<AccountId32, u32> = bob.public_key().into();

let ctx = test_context().await;
let api = ctx.client();
let rpc = ctx.unstable_rpc_methods().await;

let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob_address.clone(), 10_001);

let tx_bytes = ctx
.client()
.tx()
.create_signed_offline(&tx, &dev::alice(), Default::default())
.unwrap()
.into_encoded();

// Subscribe to finalized blocks.
let mut finalized_sub = api.blocks().subscribe_finalized().await.unwrap();
// Expect the tx to be encountered in a maximum number of blocks.
let mut num_blocks: usize = 10;

// Submit the transaction.
let _operation_id = rpc
.transaction_unstable_broadcast(&tx_bytes)
.await
.unwrap()
.expect("Server is not overloaded by 1 tx; qed");

while let Some(finalized) = finalized_sub.next().await {
let finalized = finalized.unwrap();

// Started with positive, should not overflow.
num_blocks = num_blocks.saturating_sub(1);
if num_blocks == 0 {
panic!("Did not find the tx in due time");
}

let extrinsics = finalized.extrinsics().await.unwrap();
let block_extrinsics = extrinsics
.iter()
.map(|res| res.unwrap())
.collect::<Vec<_>>();

// All blocks must contain the timestamp, we expect the next extrinsics to be our own.
let Some(tx) = block_extrinsics.get(1) else {
continue;
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this is all good, but if we wanted we could do something like this to avoid relying on any sort of ordering etc:

let tx_hash = SubstrateConfig::Hashing::hash_of(tx_bytes);

// ...

let Some(ext) = extrinsics
    .iter()
    .find(|e| SubstrateConfig::Hashing::hash_of(e.bytes()) == tx_hash)
    .next() else { continue };

let ext = ext.as_extrinsic::<node_runtime::balances::calls::types::TransferAllowDeath>()
    .unwrap()
    .unwrap();

// ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is just a small inconsistency with this:

  • create_signed_offline returns the extrinsic bytes including the prefix of compact encoding length
  • ExtrinsicDetails will strip the compact prefix:
    let bytes: Arc<[u8]> = strip_compact_prefix(extrinsic_bytes)?.1.into();

I've adjusted the code to reflect this, not sure if we should do anything else to align for ex with polkadot-api 🤔


let ext = tx
.as_extrinsic::<node_runtime::balances::calls::types::TransferAllowDeath>()
.unwrap()
.unwrap();
assert_eq!(ext.value, 10_001);
assert!(tx.is_signed());
return;
}
}
Loading