Skip to content

Commit

Permalink
unstable_rpc: Add transactionBroadcast and transactionStop (#1497)
Browse files Browse the repository at this point in the history
* unstable_rpc: Add transactionBroadcast and transactionStop

Signed-off-by: Alexandru Vasile <[email protected]>

* tests: Check transactionBroadcast works

Signed-off-by: Alexandru Vasile <[email protected]>

* testing: Enable default feature for subxt-signer

Signed-off-by: Alexandru Vasile <[email protected]>

* tests: Increase number of blocks to look for

Signed-off-by: Alexandru Vasile <[email protected]>

* Fix clippy for unneed let binds

Signed-off-by: Alexandru Vasile <[email protected]>

* Update subxt/src/backend/unstable/rpc_methods.rs

Co-authored-by: Niklas Adolfsson <[email protected]>

* tests: Adjust txBroadcast test

Signed-off-by: Alexandru Vasile <[email protected]>

* tests: Add test for txStop

Signed-off-by: Alexandru Vasile <[email protected]>

* tests: Ignore compact encoded lenght prefix

Signed-off-by: Alexandru Vasile <[email protected]>

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Niklas Adolfsson <[email protected]>
  • Loading branch information
lexnv and niklasad1 authored Mar 26, 2024
1 parent d6ae5b4 commit 92c1ba7
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 1 deletion.
22 changes: 22 additions & 0 deletions subxt/src/backend/unstable/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,28 @@ impl<T: Config> UnstableRpcMethods<T> {

Ok(TransactionSubscription { sub, done: false })
}

/// Broadcast the transaction on the p2p network until the
/// [`Self::transaction_unstable_stop`] is called.
///
/// Returns an operation ID that can be used to stop the broadcasting process.
/// Returns `None` if the server cannot handle the request at the moment.
pub async fn transaction_unstable_broadcast(&self, tx: &[u8]) -> Result<Option<String>, Error> {
self.client
.request("transaction_unstable_broadcast", rpc_params![to_hex(tx)])
.await
}

/// Stop the broadcasting process of the transaction.
///
/// The operation ID is obtained from the [`Self::transaction_unstable_broadcast`] method.
///
/// Returns an error if the operation ID does not correspond to any active transaction for this connection.
pub async fn transaction_unstable_stop(&self, operation_id: &str) -> Result<(), Error> {
self.client
.request("transaction_unstable_stop", rpc_params![operation_id])
.await
}
}

/// This represents events generated by the `follow` method.
Expand Down
109 changes: 108 additions & 1 deletion testing/integration-tests/src/full_client/client/unstable_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use subxt::{
FollowEvent, Initialized, MethodResponse, RuntimeEvent, RuntimeVersionEvent, StorageQuery,
StorageQueryType,
},
utils::AccountId32,
config::Hasher,
utils::{AccountId32, MultiAddress},
SubstrateConfig,
};
use subxt_signer::sr25519::dev;

Expand Down Expand Up @@ -309,3 +311,108 @@ async fn next_operation_event<

panic!("Cannot find operation related event after {NUM_EVENTS} produced events");
}

#[tokio::test]
async fn transaction_unstable_broadcast() {
let bob = dev::bob();
let bob_address: MultiAddress<AccountId32, u32> = bob.public_key().into();

let ctx = test_context().await;
let api = ctx.client();
let rpc = ctx.unstable_rpc_methods().await;

let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob_address.clone(), 10_001);

let tx_bytes = ctx
.client()
.tx()
.create_signed_offline(&tx, &dev::alice(), Default::default())
.unwrap()
.into_encoded();

let tx_hash = <SubstrateConfig as subxt::Config>::Hasher::hash(&tx_bytes[2..]);

// Subscribe to finalized blocks.
let mut finalized_sub = api.blocks().subscribe_finalized().await.unwrap();
// Expect the tx to be encountered in a maximum number of blocks.
let mut num_blocks: usize = 10;

// Submit the transaction.
let _operation_id = rpc
.transaction_unstable_broadcast(&tx_bytes)
.await
.unwrap()
.expect("Server is not overloaded by 1 tx; qed");

while let Some(finalized) = finalized_sub.next().await {
let finalized = finalized.unwrap();

// Started with positive, should not overflow.
num_blocks = num_blocks.saturating_sub(1);
if num_blocks == 0 {
panic!("Did not find the tx in due time");
}

let extrinsics = finalized.extrinsics().await.unwrap();
let block_extrinsics = extrinsics
.iter()
.map(|res| res.unwrap())
.collect::<Vec<_>>();

let Some(ext) = block_extrinsics
.iter()
.find(|ext| <SubstrateConfig as subxt::Config>::Hasher::hash(ext.bytes()) == tx_hash)
else {
continue;
};

let ext = ext
.as_extrinsic::<node_runtime::balances::calls::types::TransferAllowDeath>()
.unwrap()
.unwrap();
assert_eq!(ext.value, 10_001);
return;
}
}

#[tokio::test]
async fn transaction_unstable_stop() {
let bob = dev::bob();
let bob_address: MultiAddress<AccountId32, u32> = bob.public_key().into();

let ctx = test_context().await;
let rpc = ctx.unstable_rpc_methods().await;

// Cannot stop an operation that was not started.
let _err = rpc
.transaction_unstable_stop("non-existent-operation-id")
.await
.unwrap_err();

// Submit a transaction and stop it.
let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob_address.clone(), 10_001);
let tx_bytes = ctx
.client()
.tx()
.create_signed_offline(&tx, &dev::alice(), Default::default())
.unwrap()
.into_encoded();

// Submit the transaction.
let operation_id = rpc
.transaction_unstable_broadcast(&tx_bytes)
.await
.unwrap()
.expect("Server is not overloaded by 1 tx; qed");

let _ = rpc.transaction_unstable_stop(&operation_id).await.unwrap();
// Cannot stop it twice.
let _err = rpc
.transaction_unstable_stop(&operation_id)
.await
.unwrap_err();
}

0 comments on commit 92c1ba7

Please sign in to comment.