Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add token bucket example to Semaphore #5978

Merged
merged 10 commits into from
Sep 23, 2023
Next Next commit
add token bucket example to Semaphore
maminrayej committed Sep 8, 2023
commit 488b8f6998476fa6c1e60c7dc48950732c15f1a3
72 changes: 72 additions & 0 deletions tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
@@ -102,6 +102,78 @@ use std::sync::Arc;
/// }
/// ```
///
/// Implement a simple token bucket for rate limiting
///
/// Many applications and systems have constraints on the rate at which certain
/// operations should occur. Exceeding this rate can result in suboptimal
/// performance or even errors.
///
/// The provided example uses a `TokenBucket`, implemented using a semaphore, that
/// limits operations to a specific rate. The token bucket will be refilled gradually.
/// When the rate is exceeded, the `acquire` method will await until a token is available.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention two things here:

  • Token buckets allow short bursts that are faster than the allowed rate. In fact, this is part of the point of token buckets.
  • This implementation is suboptimal when the rate is large, because it consumes a lot of cpu constantly looping and sleeping.

/// ```
/// use std::sync::Arc;
/// use tokio::sync::{AcquireError, Semaphore, SemaphorePermit};
/// use tokio::time::{sleep, Duration};
///
/// struct TokenBucket {
/// sem: Arc<Semaphore>,
/// jh: tokio::task::JoinHandle<()>,
/// }
///
/// impl TokenBucket {
/// fn new(rate: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should just take a Duration of the amount of time there should be between each permit? That would also allow for permits being added more rarely than once per second.

/// let sem = Arc::new(Semaphore::new(rate));
///
/// // refills the permits each 1/rate seconds.
/// let jh = tokio::spawn({
/// let sem = sem.clone();
///
/// async move {
/// let time_slice = 1.0 / (rate as f32);
///
/// loop {
/// sleep(Duration::from_secs_f32(time_slice)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a tokio::time::Interval instead.

///
/// let cap = rate - sem.available_permits();
/// sem.add_permits(std::cmp::min(cap, 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will let the number of permits grow infinitely large.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misread. Anyway, please use usize::min instead of std::cmp::min.

/// }
/// }
/// });
///
/// Self { jh, sem }
/// }
///
/// async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
/// self.sem.acquire().await
/// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably forget the permit here. Otherwise, the user might use this incorrectly and not forget the permit.

///
/// async fn close(self) {
/// self.jh.abort();
/// let _ = self.jh.await;
///
/// self.sem.close();
maminrayej marked this conversation as resolved.
Show resolved Hide resolved
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let bucket = TokenBucket::new(1);
///
/// for _ in 0..5 {
/// bucket
/// .acquire()
/// .await
/// .map(|permit| permit.forget())
/// .unwrap();
///
/// // do the operation
/// }
///
/// bucket.close().await;
/// }
/// ```
///
/// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
#[derive(Debug)]