Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unstable_rpc: Add transactionBroadcast and transactionStop #1497

Merged
merged 9 commits into from
Mar 26, 2024
Merged
22 changes: 22 additions & 0 deletions subxt/src/backend/unstable/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,28 @@ impl<T: Config> UnstableRpcMethods<T> {

Ok(TransactionSubscription { sub, done: false })
}

/// Broadcast the transaction on the p2p network until the
/// [`Self::transaction_unstable_stop`] is called.
///
/// Returns an operation ID that can be used to stop the broadcasting process.
/// Returns `None` if the server cannot handle the request at the moment.
pub async fn transaction_unstable_broadcast(&self, tx: &[u8]) -> Result<Option<String>, Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has the PR to rename to v1 merged in Substrate already? :)

Copy link
Member

@niklasad1 niklasad1 Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.client
.request("transaction_unstable_broadcast", rpc_params![to_hex(tx)])
.await
}

/// Stop the broadcasting process of the transaction.
///
/// The operation ID is obtained from the [`Self::transaction_unstable_broadcast`] method.
///
/// Returns an error if the operation ID does not correspond to any active transaction for this connection.
pub async fn transaction_unstable_stop(&self, operation_id: &str) -> Result<(), Error> {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
self.client
.request("transaction_unstable_stop", rpc_params![operation_id])
.await
}
}

/// This represents events generated by the `follow` method.
Expand Down
2 changes: 1 addition & 1 deletion testing/integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ scale-info = { workspace = true, features = ["bit-vec"] }
sp-core = { workspace = true }
syn = { workspace = true }
subxt = { workspace = true, features = ["unstable-metadata", "native", "jsonrpsee", "substrate-compat"] }
subxt-signer = { workspace = true }
subxt-signer = { workspace = true, features = ["default"] }
subxt-codegen = { workspace = true }
subxt-metadata = { workspace = true }
test-runtime = { workspace = true }
Expand Down
109 changes: 108 additions & 1 deletion testing/integration-tests/src/full_client/client/unstable_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use subxt::{
FollowEvent, Initialized, MethodResponse, RuntimeEvent, RuntimeVersionEvent, StorageQuery,
StorageQueryType,
},
utils::AccountId32,
config::Hasher,
utils::{AccountId32, MultiAddress},
SubstrateConfig,
};
use subxt_signer::sr25519::dev;

Expand Down Expand Up @@ -309,3 +311,108 @@ async fn next_operation_event<

panic!("Cannot find operation related event after {NUM_EVENTS} produced events");
}

#[tokio::test]
async fn transaction_unstable_broadcast() {
let bob = dev::bob();
let bob_address: MultiAddress<AccountId32, u32> = bob.public_key().into();

let ctx = test_context().await;
let api = ctx.client();
let rpc = ctx.unstable_rpc_methods().await;

let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob_address.clone(), 10_001);

let tx_bytes = ctx
.client()
.tx()
.create_signed_offline(&tx, &dev::alice(), Default::default())
.unwrap()
.into_encoded();

let tx_hash = <SubstrateConfig as subxt::Config>::Hasher::hash(&tx_bytes[2..]);

// Subscribe to finalized blocks.
let mut finalized_sub = api.blocks().subscribe_finalized().await.unwrap();
// Expect the tx to be encountered in a maximum number of blocks.
let mut num_blocks: usize = 10;

// Submit the transaction.
let _operation_id = rpc
.transaction_unstable_broadcast(&tx_bytes)
.await
.unwrap()
.expect("Server is not overloaded by 1 tx; qed");

while let Some(finalized) = finalized_sub.next().await {
let finalized = finalized.unwrap();

// Started with positive, should not overflow.
num_blocks = num_blocks.saturating_sub(1);
if num_blocks == 0 {
panic!("Did not find the tx in due time");
}

let extrinsics = finalized.extrinsics().await.unwrap();
let block_extrinsics = extrinsics
.iter()
.map(|res| res.unwrap())
.collect::<Vec<_>>();

let Some(ext) = block_extrinsics
.iter()
.find(|ext| <SubstrateConfig as subxt::Config>::Hasher::hash(ext.bytes()) == tx_hash)
else {
continue;
};

let ext = ext
.as_extrinsic::<node_runtime::balances::calls::types::TransferAllowDeath>()
.unwrap()
.unwrap();
assert_eq!(ext.value, 10_001);
return;
}
}

#[tokio::test]
async fn transaction_unstable_stop() {
let bob = dev::bob();
let bob_address: MultiAddress<AccountId32, u32> = bob.public_key().into();

let ctx = test_context().await;
let rpc = ctx.unstable_rpc_methods().await;

// Cannot stop an operation that was not started.
let _err = rpc
.transaction_unstable_stop("non-existent-operation-id")
.await
.unwrap_err();

// Submit a transaction and stop it.
let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob_address.clone(), 10_001);
let tx_bytes = ctx
.client()
.tx()
.create_signed_offline(&tx, &dev::alice(), Default::default())
.unwrap()
.into_encoded();

// Submit the transaction.
let operation_id = rpc
.transaction_unstable_broadcast(&tx_bytes)
.await
.unwrap()
.expect("Server is not overloaded by 1 tx; qed");

let _ = rpc.transaction_unstable_stop(&operation_id).await.unwrap();
// Cannot stop it twice.
let _err = rpc
.transaction_unstable_stop(&operation_id)
.await
.unwrap_err();
}
Loading