Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Alternative compute API that skips if other thread/async task is already computing for the key #433

Open
tatsuya6502 opened this issue Jun 29, 2024 · 5 comments
Labels
enhancement New feature or request

Comments

@tatsuya6502
Copy link
Member

Split from:

As of [email protected], concurrent calls on Entry's and_compute_with/and_try_compute_with for the same key are serialized by the cache. If caller A and B called and_compute_with for the same key at the same time, only one of the closures f from either A or B is evaluated first. And once finish, another closure is evaluated (#432 (comment)). These closures are always called, but not at the same time.

There will be some use cases that do not need/want the latter closure to be evaluated, and return immediately instead of blocking.

I have not came up with a good short name for the alternative methods yet. But they could be like the followings:

use moka::future::Cache;

let result = cache
    .entry_by_ref(&key)
    .and_try_compute_if_nobody_else(|entry| async move { 
        ... 
    }).await?;

Should I try the builder pattern?

let result = cache
    .entry_builder_by_ref(&key)
    .try_compute_with(|entry| async move {
        ...
    })
    .skip_if_already_in_progress()
    .run()
    .await?;
@xuehaonan27
Copy link

Is there any plan to add this feature into moka? I'm eager to contribute!

@xuehaonan27
Copy link

Is there any plan to add this feature into moka? I'm eager to contribute!

I am new to this project, so I want to take this issue as my starting point! Much thanks if you could give me some guidelines!

@xuehaonan27
Copy link

I have submitted a PR #460 implement this feature.
I'm looking forward to your reply!
Test code:

use std::sync::Arc;

use moka::{
    future::Cache,
    ops::compute::{CompResult, Op},
};

const N: usize = 100000;
const CONCURRENCY: usize = 100; // concurrent task nums

#[tokio::main]
async fn main() {
    /*
        Note: the final result should less than `N`.
        That's okay because we are testing [`and_try_compute_if_nobody_else`] which should
        cancel some computations, rather than testing whether the addition itself is thread-safe
        or not.
    */

    // Uncomment this line to test computation serialized by cache.
    // You can only see
    // ReplacedWith(Entry { key: "key1", value: <xxx>, is_fresh: true, is_old_value_replaced: true })
    // in stdout, which is as expected.
    computation_serialized().await;

    // Uncomment this line to test new feature.
    // You can see lines like:
    // Unchanged(Entry { key: "key1", value: <xxx>, is_fresh: false, is_old_value_replaced: false })
    // in stdout, which is as expected because there are multiple waiters manipulating the same
    // entry simutaneously.
    // However, I am not sure whether [`CompResult`] like `Unchanged` is proper for
    // this situation where the computation is cancelled because of another waiter
    // occupying the entry.
    // Should we add a new Enum to [`CompResult`] to represent this situation?
    computation_maybe_cancelled().await;
}

async fn computation_serialized() {
    // Increment a cached `u64` counter.
    async fn inclement_counter(cache: &Cache<String, u64>, key: &str) -> CompResult<String, u64> {
        cache
            .entry_by_ref(key)
            .and_compute_with(|maybe_entry| {
                let op = if let Some(entry) = maybe_entry {
                    let counter = entry.into_value();
                    Op::Put(counter.saturating_add(1)) // Update
                } else {
                    Op::Put(1) // Insert
                };
                // Return a Future that is resolved to `op` immediately.
                std::future::ready(op)
            })
            .await
    }

    let cache: Arc<Cache<String, u64>> = Arc::new(Cache::new(100));
    let key = "key1".to_string();

    let mut handles = Vec::new();

    for _ in 0..CONCURRENCY {
        let cache_clone = Arc::clone(&cache);
        let key_clone = key.clone();
        let handle = tokio::spawn(async move {
            for _ in 0..(N / CONCURRENCY) {
                let res = inclement_counter(&cache_clone, &key_clone).await;
                println!("{:?}", res);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }

    let result = cache.get(&key).await;
    println!("{result:?}");
}

async fn computation_maybe_cancelled() {
    // Increment a cached `u64` counter.
    async fn inclement_counter_if_nobody_else(
        cache: &Cache<String, u64>,
        key: &str,
    ) -> Result<CompResult<String, u64>, ()> {
        cache
            .entry_by_ref(key)
            .and_try_compute_if_nobody_else(|maybe_entry| {
                let op = if let Some(entry) = maybe_entry {
                    let counter = entry.into_value();
                    Ok(Op::Put(counter.saturating_add(1))) // Update
                } else {
                    Ok(Op::Put(1)) // Insert
                };
                // Return a Future that is resolved to `op` immediately.
                std::future::ready(op)
            })
            .await
    }

    let cache: Arc<Cache<String, u64>> = Arc::new(Cache::new(100));
    let key = "key1".to_string();

    let mut handles = Vec::new();

    for _ in 0..CONCURRENCY {
        let cache_clone = Arc::clone(&cache);
        let key_clone = key.clone();
        let handle = tokio::spawn(async move {
            for _ in 0..(N / CONCURRENCY) {
                let res = inclement_counter_if_nobody_else(&cache_clone, &key_clone)
                    .await
                    .unwrap(); // unwrap safe
                println!("{:?}", res);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }

    let result = cache.get(&key).await;
    println!("{result:?}");
}

@xuehaonan27
Copy link

This PR is now okay for further CI testing, could you please approve another one? @tatsuya6502

@tatsuya6502
Copy link
Member Author

@xuehaonan27 — Sorry for the delay. I will try to review the PR this weekend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants