diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index 3c8338768..cc06fe08d 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -21,7 +21,7 @@ use diesel_async::{ }; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use futures_util::{future::BoxFuture, FutureExt}; -use std::{cmp::min, sync::Arc}; +use std::sync::Arc; pub type MyDbConnection = AsyncPgConnection; pub type PgPool = Pool; @@ -44,20 +44,6 @@ pub struct UpsertFilterLatestTransactionQuery { // the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize; -/// This function returns boundaries of chunks in the form of (start_index, end_index) -pub fn get_chunks(num_items_to_insert: usize, chunk_size: usize) -> Vec<(usize, usize)> { - let mut chunk: (usize, usize) = (0, min(num_items_to_insert, chunk_size)); - let mut chunks = vec![chunk]; - while chunk.1 != num_items_to_insert { - chunk = ( - chunk.0 + chunk_size, - min(num_items_to_insert, chunk.1 + chunk_size), - ); - chunks.push(chunk); - } - chunks -} - /// This function will clean the data for postgres. Currently it has support for removing /// null bytes from strings but in the future we will add more functionality. pub fn clean_data_for_db serde::Deserialize<'de>>( @@ -147,13 +133,11 @@ where U: QueryFragment + diesel::query_builder::QueryId + Send + 'static, T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + 'static, { - let chunks = get_chunks(items_to_insert.len(), chunk_size); - - let tasks = chunks - .into_iter() - .map(|(start_ind, end_ind)| { - let items = items_to_insert[start_ind..end_ind].to_vec(); + let tasks = items_to_insert + .chunks(chunk_size) + .map(|chunk| { let conn = conn.clone(); + let items = chunk.to_vec(); tokio::spawn(async move { let (query, additional_where_clause) = build_query(items.clone()); execute_or_retry_cleaned(conn, build_query, items, query, additional_where_clause) @@ -299,35 +283,3 @@ where Ok(()) } } - -#[cfg(test)] -mod test { - use super::*; - - #[tokio::test] - async fn test_get_chunks_logic() { - assert_eq!(get_chunks(10, 5), vec![(0, 10)]); - assert_eq!(get_chunks(65535, 1), vec![ - (0, 32767), - (32767, 65534), - (65534, 65535), - ]); - // 200,000 total items will take 6 buckets. Each bucket can only be 3276 size. - assert_eq!(get_chunks(10000, 20), vec![ - (0, 1638), - (1638, 3276), - (3276, 4914), - (4914, 6552), - (6552, 8190), - (8190, 9828), - (9828, 10000), - ]); - assert_eq!(get_chunks(65535, 2), vec![ - (0, 16383), - (16383, 32766), - (32766, 49149), - (49149, 65532), - (65532, 65535), - ]); - } -}