Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Cacheing the nonce in case of transact_async #367

Closed
wants to merge 4 commits into from

Conversation

jaswinder6991
Copy link

@jaswinder6991 jaswinder6991 commented Aug 2, 2024

  • Creating a nonce cache for async transaction.
  • The nonce is stored locally and incremented only sequentially.

Note: The PR might be a little rough around the edges in terms of best practices and thread safety. Hence, your feedback is welcome.

@jaswinder6991
Copy link
Author

@race-of-sloths

@race-of-sloths
Copy link

race-of-sloths commented Aug 2, 2024

@jaswinder6991 Thank you for your contribution! Your pull request is now a part of the Race of Sloths!
New Sloth joined the Race! Welcome!

Shows profile picture for the author of the PR

Current status: stale

This pull request was removed from the race, but you can include it again with @race-of-sloths include command

What is the Race of Sloths

Race of Sloths is a friendly competition where you can participate in challenges and compete with other open-source contributors within your normal workflow

For contributors:

  • Tag @race-of-sloths inside your pull requests
  • Wait for the maintainer to review and score your pull request
  • Check out your position in the Leaderboard
  • Keep weekly and monthly streaks to reach higher positions
  • Boast your contributions with a dynamic picture of your Profile

For maintainers:

  • Score pull requests that participate in the Race of Sloths
  • Engage contributors with fair scoring and fast responses so they keep their streaks
  • Promote the Race to the point where the Race starts promoting you
  • Grow the community of your contributors

Feel free to check our website for additional details!

Bot commands
  • For contributors
    • Include a PR: @race-of-sloths include to enter the Race with your PR
  • For maintainers:
    • Assign points: @race-of-sloths score [1/2/3/5/8/13] to award points based on your assessment.
    • Reject this PR: @race-of-sloths exclude to send this PR back to the drawing board.
    • Exclude repo: @race-of-sloths pause to stop bot activity in this repo until @race-of-sloths unpause command is called

Copy link
Collaborator

@frol frol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already implementation for that inside fetch_tx_nonce:

let nonces = client.access_key_nonces.read().await;
if let Some(nonce) = nonces.get(cache_key) {
let nonce = nonce.fetch_add(1, Ordering::SeqCst);
drop(nonces);
// Fetch latest block_hash since the previous one is now invalid for new transactions:
let block = client.view_block(Some(Finality::Final.into())).await?;
let block_hash = block.header.hash;
Ok((block_hash, nonce + 1))
} else {

I believe the problem is that it does not work for the first transactions since if there is no nonce cached, it will end up in the else branch too early and thus several concurrent jobs will end up querying the latest nonce and use that.

@jaswinder6991
Copy link
Author

There is already implementation for that inside fetch_tx_nonce:

let nonces = client.access_key_nonces.read().await;
if let Some(nonce) = nonces.get(cache_key) {
let nonce = nonce.fetch_add(1, Ordering::SeqCst);
drop(nonces);
// Fetch latest block_hash since the previous one is now invalid for new transactions:
let block = client.view_block(Some(Finality::Final.into())).await?;
let block_hash = block.header.hash;
Ok((block_hash, nonce + 1))
} else {

I believe the problem is that it does not work for the first transactions since if there is no nonce cached, it will end up in the else branch too early and thus several concurrent jobs will end up querying the latest nonce and use that.

Nice. My B, I should have checked it.
Anyway, do you suggest I fix the race condition in the fetch_nonce function or just take it out of here and handle it like I have done in the send_batch_tx_async function.

One reason, I did not want to touch fetch_nonce was because I thought it might break other functions.

@jaswinder6991
Copy link
Author

@frol so I tried to simplify my solution by checking if another thread has already update the cache (L506):

} else {
drop(nonces);
let (account_id, public_key) = cache_key;
let (access_key, block_hash) =
access_key(client, account_id.clone(), public_key.clone()).await?;
let mut nonces = client.access_key_nonces.write().await;
// Double-check if another thread has updated the nonce while we were fetching the access key
if let Some(nonce) = nonces.get(cache_key) {
let nonce = nonce.fetch_add(1, Ordering::SeqCst);
return Ok((block_hash, nonce + 1));
}
// Insert the new nonce into the cache
let nonce = nonces
.entry(cache_key.clone())
.or_insert_with(|| AtomicU64::new(access_key.nonce + 1))
.fetch_max(access_key.nonce + 1, Ordering::SeqCst)
.max(access_key.nonce + 1);
drop(nonces);
Ok((block_hash, nonce))

It worked fine for me locally, seems like it failed in CI tests. However it has achieved some level of concurrency.

Do you recommend I dig deeper or this should be fine?

@jaswinder6991 jaswinder6991 marked this pull request as ready for review August 2, 2024 13:24
@jaswinder6991 jaswinder6991 requested a review from frol August 2, 2024 13:24
@frol
Copy link
Collaborator

frol commented Aug 3, 2024

Thinking about this problem more, I believe that the issue is not due to the nonce, the implementation in master doesn’t have a race

// case where multiple writers end up at the same lock acquisition point and tries
// to overwrite the cached value that a previous writer already wrote.
let nonce = client
.access_key_nonces
.write()
.await
.entry(cache_key.clone())
.or_insert_with(|| AtomicU64::new(access_key.nonce + 1))
.fetch_max(access_key.nonce + 1, Ordering::SeqCst)
.max(access_key.nonce + 1);

This is the right way to handle it.

There are two other problems:

  1. Making 50 parallel calls means that nonces could be assigned to these requests randomly (it is NOT msg0 = nonce+1, msg1 = nonce+2, … msg49 = nonce+50. Thus, given that blockchain will order the transactions by nonce, you may easily end up with msg17 having the highest nonce and executed last (see the failed test)
  2. Given that the transactions do not arrive all at once, you may end up with a transaction with nonce+50 to be executed at block height 10, and all the transactions that have nonces lower that nonce+50 and arrived late for that block will be discarded (not applied at block height 11)

@jaswinder6991
Copy link
Author

Hi @frol thanks for your helpful insight. I understand it now too.

Seems like the nonce never had the issue. I went on the wrong track and ofcourse the test is incorrect.
Also, the telegram query in question was about sequential async txs, which work fine, here is a test for that:

use anyhow::Result;
use serde_json::json;
use std::time::Duration;

const STATUS_MSG_CONTRACT: &[u8] = include_bytes!("../../examples/res/status_message.wasm");

#[tokio::test]
async fn test_nonce_caching_sequential_async_no_wait() -> Result<()> {
    let worker = near_workspaces::sandbox().await?;
    let contract = worker.dev_deploy(STATUS_MSG_CONTRACT).await?;
    let account = worker.dev_create_account().await?;

    // Get the initial nonce
    let initial_nonce = worker
        .view_access_key(account.id(), &account.secret_key().public_key())
        .await?
        .nonce;

    const NUM_TRANSACTIONS: usize = 10;
    let mut transaction_statuses = Vec::with_capacity(NUM_TRANSACTIONS);

    // Send transactions sequentially without waiting for completion
    for i in 0..NUM_TRANSACTIONS {
        let msg = format!("msg{}", i);
        let result = account
            .call(&contract.id(), "set_status")
            .args_json(json!({ "message": msg }))
            .transact_async()
            .await?;

        transaction_statuses.push(result);

        // Small delay to simulate some processing time between transactions
        //tokio::time::sleep(Duration::from_millis(10)).await;
    }

    // Now wait for all transactions to complete
    for (i, status) in transaction_statuses.into_iter().enumerate() {
        loop {
            match status.status().await? {
                std::task::Poll::Ready(outcome) => {
                    if outcome.is_success() {
                        println!("Transaction {} completed successfully", i);
                        break;
                    } else {
                        return Err(anyhow::anyhow!("Transaction {} failed: {:?}", i, outcome));
                    }
                }
                std::task::Poll::Pending => {
                    tokio::time::sleep(Duration::from_millis(100)).await;
                }
            }
        }
    }

    // Get the final nonce
    let final_nonce = worker
        .view_access_key(account.id(), &account.secret_key().public_key())
        .await?
        .nonce;

    // Verify that the nonce increased by exactly the number of transactions
    assert_eq!(
        final_nonce - initial_nonce,
        NUM_TRANSACTIONS as u64,
        "Nonce did not increment correctly"
    );

    Ok(())
}

Going back to the original query on Telegram:

Hey, does near-workspaces have a way to send txs sequentially (with proper nonce) in an async execution context?
If I do this:
let users = try_join_all((0..user_amount).map(|_| worker.dev_create_account())).await?;
then I randomly get invalid nonce error

I think it is more of an account creation issue in async context.

I will close this PR now.
As an aside, I apologise for pulling you into this, I appreciate your valuable time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants