Skip to content

Commit

Permalink
indexer remove weird chunk logic (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
lightmark authored and yuunlimm committed May 9, 2024
1 parent 939759d commit 59b2e40
Showing 1 changed file with 5 additions and 53 deletions.
58 changes: 5 additions & 53 deletions rust/processor/src/utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use diesel_async::{
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use futures_util::{future::BoxFuture, FutureExt};
use std::{cmp::min, sync::Arc};
use std::sync::Arc;

pub type MyDbConnection = AsyncPgConnection;
pub type PgPool = Pool<MyDbConnection>;
Expand All @@ -44,20 +44,6 @@ pub struct UpsertFilterLatestTransactionQuery<T> {
// the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit
pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize;

/// This function returns boundaries of chunks in the form of (start_index, end_index)
pub fn get_chunks(num_items_to_insert: usize, chunk_size: usize) -> Vec<(usize, usize)> {
let mut chunk: (usize, usize) = (0, min(num_items_to_insert, chunk_size));
let mut chunks = vec![chunk];
while chunk.1 != num_items_to_insert {
chunk = (
chunk.0 + chunk_size,
min(num_items_to_insert, chunk.1 + chunk_size),
);
chunks.push(chunk);
}
chunks
}

/// This function will clean the data for postgres. Currently it has support for removing
/// null bytes from strings but in the future we will add more functionality.
pub fn clean_data_for_db<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
Expand Down Expand Up @@ -147,13 +133,11 @@ where
U: QueryFragment<diesel::pg::Pg> + diesel::query_builder::QueryId + Send + 'static,
T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + 'static,
{
let chunks = get_chunks(items_to_insert.len(), chunk_size);

let tasks = chunks
.into_iter()
.map(|(start_ind, end_ind)| {
let items = items_to_insert[start_ind..end_ind].to_vec();
let tasks = items_to_insert
.chunks(chunk_size)
.map(|chunk| {
let conn = conn.clone();
let items = chunk.to_vec();
tokio::spawn(async move {
let (query, additional_where_clause) = build_query(items.clone());
execute_or_retry_cleaned(conn, build_query, items, query, additional_where_clause)
Expand Down Expand Up @@ -299,35 +283,3 @@ where
Ok(())
}
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn test_get_chunks_logic() {
assert_eq!(get_chunks(10, 5), vec![(0, 10)]);
assert_eq!(get_chunks(65535, 1), vec![
(0, 32767),
(32767, 65534),
(65534, 65535),
]);
// 200,000 total items will take 6 buckets. Each bucket can only be 3276 size.
assert_eq!(get_chunks(10000, 20), vec![
(0, 1638),
(1638, 3276),
(3276, 4914),
(4914, 6552),
(6552, 8190),
(8190, 9828),
(9828, 10000),
]);
assert_eq!(get_chunks(65535, 2), vec![
(0, 16383),
(16383, 32766),
(32766, 49149),
(49149, 65532),
(65532, 65535),
]);
}
}

0 comments on commit 59b2e40

Please sign in to comment.