Skip to content

Commit

Permalink
ensure locks are released
Browse files Browse the repository at this point in the history
  • Loading branch information
davidor committed May 13, 2021
1 parent 130be91 commit 67de259
Showing 1 changed file with 41 additions and 17 deletions.
58 changes: 41 additions & 17 deletions limitador/src/storage/infinispan/sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,48 @@ pub async fn add(
) -> Result<(), StorageErr> {
let cache_name = cache_name.into();
let set_key = set_key.into();
let element = element.into();

dist_lock::lock(infinispan, &cache_name, &set_key).await?;

let res = add_set(infinispan, cache_name.clone(), set_key.clone(), element).await;

dist_lock::release(infinispan, &cache_name, &set_key).await?;

res
}

pub async fn delete(
infinispan: &Infinispan,
cache_name: impl Into<String>,
set_key: impl Into<String>,
element: impl Into<String>,
) -> Result<HashSet<String>, StorageErr> {
let cache_name = cache_name.into();
let set_key = set_key.into();
let element = element.into();

dist_lock::lock(infinispan, &cache_name, &set_key).await?;

let res = delete_from_set(infinispan, cache_name.clone(), set_key.clone(), element).await;

dist_lock::release(infinispan, &cache_name, &set_key).await?;

res
}

async fn add_set(
infinispan: &Infinispan,
cache_name: String,
set_key: String,
element: String,
) -> Result<(), StorageErr> {
let get_entry_response = infinispan
.run(&request::entries::get(&cache_name, &set_key))
.await?;

if get_entry_response.status() == 404 {
let limits_set: HashSet<String> = HashSet::from_iter(vec![element.into()]);
let limits_set: HashSet<String> = HashSet::from_iter(vec![element]);

let _ = infinispan
.run(
Expand All @@ -58,7 +91,7 @@ pub async fn add(
// TODO: handle other errors
let mut limits_set: HashSet<String> =
serde_json::from_str(&response_to_string(get_entry_response).await).unwrap();
limits_set.insert(element.into());
limits_set.insert(element);

let _ = infinispan
.run(&request::entries::update(
Expand All @@ -69,30 +102,23 @@ pub async fn add(
.await?;
}

dist_lock::release(infinispan, &cache_name, &set_key).await?;

Ok(())
}

pub async fn delete(
async fn delete_from_set(
infinispan: &Infinispan,
cache_name: impl Into<String>,
set_key: impl Into<String>,
element: impl Into<String>,
cache_name: String,
set_key: String,
element: String,
) -> Result<HashSet<String>, StorageErr> {
let cache_name = cache_name.into();
let set_key = set_key.into();

dist_lock::lock(infinispan, &cache_name, &set_key).await?;

let response = infinispan
.run(&request::entries::get(&cache_name, &set_key))
.await?;

let mut set: HashSet<String> =
serde_json::from_str(&response_to_string(response).await).unwrap();

set.remove(&element.into());
set.remove(&element);

if set.is_empty() {
let _ = infinispan
Expand All @@ -108,7 +134,5 @@ pub async fn delete(
.await?;
}

dist_lock::release(infinispan, &cache_name, &set_key).await?;

Ok(HashSet::new())
return Ok(set);
}

0 comments on commit 67de259

Please sign in to comment.