Skip to content

Commit

Permalink
Add async methods for callers that run inside an async executor
Browse files Browse the repository at this point in the history
This otherwise errors out with an error like this, since you can't
use block_on inside another block_on:

"cannot execute `LocalPool` executor from within another executor"
  • Loading branch information
lfittl committed Jul 28, 2022
1 parent 8b1f259 commit 8a85a0b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ extern crate serde_json;

mod sidekiq;
pub use crate::sidekiq::{
create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts, RedisPool,
create_async_redis_pool, create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts,
RedisPool,
};
pub use serde_json::value::Value;
40 changes: 31 additions & 9 deletions src/sidekiq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ enum ErrorKind {
impl std::error::Error for ClientError {}

pub fn create_redis_pool() -> Result<ConnectionManager, ClientError> {
block_on(create_async_redis_pool())
}

pub async fn create_async_redis_pool() -> Result<ConnectionManager, ClientError> {
let redis_url =
&env::var(&REDIS_URL_ENV.to_owned()).unwrap_or_else(|_| REDIS_URL_DEFAULT.to_owned());
// Note: this connection is multiplexed. Users of this object will call clone(), but the same underlying connection will be used.
// https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html
match block_on(ConnectionManager::new(
redis::Client::open((*redis_url).clone()).unwrap(),
)) {
match ConnectionManager::new(redis::Client::open((*redis_url).clone()).unwrap()).await {
Ok(pool) => Ok(pool),
Err(err) => Err(ClientError {
kind: ErrorKind::Redis(err),
Expand Down Expand Up @@ -225,21 +227,41 @@ impl Client {
}

pub fn perform_in(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
let interval: f64 = interval.whole_seconds() as f64;
block_on(self.raw_push(&[job], self.calc_at(interval)))
block_on(self.perform_in_async(interval, job))
}

pub fn perform_at(&self, datetime: OffsetDateTime, job: Job) -> Result<(), ClientError> {
let timestamp: f64 = datetime.unix_timestamp() as f64;
block_on(self.raw_push(&[job], self.calc_at(timestamp)))
block_on(self.perform_at_async(datetime, job))
}

pub fn push(&self, job: Job) -> Result<(), ClientError> {
block_on(self.raw_push(&[job], None))
block_on(self.push_async(job))
}

pub fn push_bulk(&self, jobs: &[Job]) -> Result<(), ClientError> {
block_on(self.raw_push(jobs, None))
block_on(self.push_bulk_async(jobs))
}

pub async fn perform_in_async(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
let interval: f64 = interval.whole_seconds() as f64;
self.raw_push(&[job], self.calc_at(interval)).await
}

pub async fn perform_at_async(
&self,
datetime: OffsetDateTime,
job: Job,
) -> Result<(), ClientError> {
let timestamp: f64 = datetime.unix_timestamp() as f64;
self.raw_push(&[job], self.calc_at(timestamp)).await
}

pub async fn push_async(&self, job: Job) -> Result<(), ClientError> {
self.raw_push(&[job], None).await
}

pub async fn push_bulk_async(&self, jobs: &[Job]) -> Result<(), ClientError> {
self.raw_push(jobs, None).await
}

async fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
Expand Down

0 comments on commit 8a85a0b

Please sign in to comment.