diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index 0351ab1ab..b213fdeba 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -131,6 +131,20 @@ where ) -> Result<()> { let parquet_structs = changes.data; let processor_name = self.processor_name.clone(); + + if self.last_upload_time.elapsed() >= self.upload_interval { + info!( + "Time has elapsed more than {} since last upload for {}", + self.upload_interval.as_secs(), + ParquetType::TABLE_NAME + ); + if let Err(e) = self.upload_buffer(gcs_client).await { + error!("Failed to upload buffer: {}", e); + return Err(e); + } + self.last_upload_time = Instant::now(); + } + for parquet_struct in parquet_structs { let size_of_struct = allocative::size_of_unique(&parquet_struct); PARQUET_STRUCT_SIZE @@ -154,19 +168,6 @@ where } } - if self.last_upload_time.elapsed() >= self.upload_interval { - info!( - "Time has elapsed more than {} since last upload for {}", - self.upload_interval.as_secs(), - ParquetType::TABLE_NAME - ); - if let Err(e) = self.upload_buffer(gcs_client).await { - error!("Failed to upload buffer: {}", e); - return Err(e); - } - self.last_upload_time = Instant::now(); - } - PARQUET_HANDLER_CURRENT_BUFFER_SIZE .with_label_values(&[&self.processor_name, ParquetType::TABLE_NAME]) .set(self.buffer_size_bytes as i64); @@ -252,7 +253,12 @@ where parquet_processed_structs: Some(parquet_processed_transactions), table_name: ParquetType::TABLE_NAME.to_string(), }; - + info!( + table_name = ParquetType::TABLE_NAME, + start_version = start_version, + end_version = end_version, + "Uploaded parquet to GCS and sending result to gap detector." + ); self.gap_detector_sender .send(ProcessingResult::ParquetProcessingResult( parquet_processing_result, diff --git a/rust/processor/src/gap_detectors/mod.rs b/rust/processor/src/gap_detectors/mod.rs index 278175e20..70bb9ca4b 100644 --- a/rust/processor/src/gap_detectors/mod.rs +++ b/rust/processor/src/gap_detectors/mod.rs @@ -125,11 +125,6 @@ pub async fn create_gap_detector_status_tracker_loop( } }, Ok(ProcessingResult::ParquetProcessingResult(result)) => { - tracing::info!( - processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - "[ParquetGapDetector] received parquet gap detector task", - ); match gap_detector .process_versions(ProcessingResult::ParquetProcessingResult(result)) { diff --git a/rust/processor/src/gap_detectors/parquet_gap_detector.rs b/rust/processor/src/gap_detectors/parquet_gap_detector.rs index 7c7e7f1bf..08232f867 100644 --- a/rust/processor/src/gap_detectors/parquet_gap_detector.rs +++ b/rust/processor/src/gap_detectors/parquet_gap_detector.rs @@ -5,7 +5,7 @@ use crate::gap_detectors::{GapDetectorResult, GapDetectorTrait, ProcessingResult use ahash::{AHashMap, AHashSet}; use anyhow::Result; use std::{ - cmp::{max, min}, + cmp::max, sync::{Arc, Mutex}, }; use tracing::{debug, info}; @@ -64,62 +64,55 @@ impl ParquetFileGapDetectorInner { } } self.max_version = max(self.max_version, end_version); - - // b/c of the case where file gets uploaded first, we should check if we have to update last_success_version for this processor - self.update_next_version_to_process( - min(self.next_version_to_process, end_version), - "all_table", - ); } - /// This function updates the next version to process based on the current version counters. - /// It will increment the next version to process if the current version is fully processed. - /// It will also remove the version from the version counters if it is fully processed. - /// what it means to be fully processed is that all the structs for that version processed, i.e. count = 0. - /// Note that for tables other than transactions, it won't be always the latest txn version since we update this value with - /// Thus we will keep the latest version_to_process in the db with the min(max version of latest table files per processor) - /// that has been uploaded to GCS. so whenever we restart the processor, it may generate some duplicates rows, and we are okay with that. + /// This function updates the `next_version_to_process` based on the current version counters. + /// It increments the `next_version_to_process` if the current version is fully processed, which means + /// that all the structs for that version have been processed, i.e., `count = 0`. + /// If a version is fully processed, it removes the version from the version counters and adds it to the `seen_versions`. + /// For tables other than transactions, the latest version to process may not always be the most recent transaction version + /// since this value is updated based on the minimum of the maximum versions of the latest table files per processor + /// that have been uploaded to GCS. Therefore, when the processor restarts, some duplicate rows may be generated, which is acceptable. + /// The function also ensures that the current version starts checking from the `next_version_to_process` + /// value stored in the database. While there might be potential performance improvements, + /// the current implementation prioritizes data integrity. + /// The function also handles cases where a version is already processed or where no struct count + /// is found for a version, providing appropriate logging for these scenarios. pub fn update_next_version_to_process(&mut self, end_version: i64, table_name: &str) { // this has to start checking with this value all the time, since this is the value that will be stored in the db as well. // maybe there could be an improvement to be more performant. but priortizing the data integrity as of now. let mut current_version = self.next_version_to_process; while current_version <= end_version { - #[allow(clippy::collapsible_else_if)] - if self.version_counters.contains_key(¤t_version) { - while let Some(&count) = self.version_counters.get(¤t_version) { - if current_version > end_version { - // we shouldn't update further b/c we haven't uploaded the files containing versions after end_version. - break; - } - if count == 0 { - self.version_counters.remove(¤t_version); - self.seen_versions.insert(current_version); // seen_version holds the txns version that we have processed already - current_version += 1; - self.next_version_to_process += 1; - } else { - break; - } - } - } else { - if self.seen_versions.contains(¤t_version) { - debug!( - "Version {} already processed, skipping and current next_version {} ", - current_version, self.next_version_to_process - ); - self.next_version_to_process = - max(self.next_version_to_process, current_version + 1); + // If the current version has a struct count entry + if let Some(&count) = self.version_counters.get(¤t_version) { + if count == 0 { + self.version_counters.remove(¤t_version); + self.seen_versions.insert(current_version); + self.next_version_to_process += 1; } else { - // this is the case where we haven't updated the map yet, while the file gets uploaded first. the bigger file size we will have, - // the less chance we will see this as upload takes longer time. And map population is done before the upload. - debug!( - current_version = current_version, - "No struct count found for version. This shouldn't happen b/c we already added default count for this version." - ); + // Stop processing if the version is not yet complete + break; } + } else if self.seen_versions.contains(¤t_version) { + // If the version is already seen and processed + debug!( + "Version {} already processed, skipping and current next_version {} ", + current_version, self.next_version_to_process + ); + self.next_version_to_process = + max(self.next_version_to_process, current_version + 1); + } else { + // If the version is neither in seen_versions nor version_counters + debug!( + current_version = current_version, + "No struct count found for version. This shouldn't happen b/c we already added default count for this version." + ); } + current_version += 1; } + debug!( next_version_to_process = self.next_version_to_process, table_name = table_name, @@ -155,7 +148,8 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner { info!( start_version = result.start_version, end_version = result.end_version, - "Parquet file has been uploaded." + table_name = &result.table_name, + "[Parquet Gap Detector] Processing versions after parquet file upload." ); for (version, count) in parquet_processed_structs.iter() { @@ -166,7 +160,6 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner { self.version_counters.insert(*version, -count); } } - self.update_next_version_to_process(result.end_version, &result.table_name); Ok(GapDetectorResult::ParquetFileGapDetectorResult( diff --git a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs index cb4f2621c..17495022b 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs @@ -9,7 +9,7 @@ use crate::{ db::common::models::default_models::{ parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::{TableItem, TableMetadata}, + parquet_move_tables::TableItem, parquet_transactions::{Transaction as ParquetTransaction, TransactionModel}, parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, }, @@ -51,7 +51,6 @@ pub struct ParquetDefaultProcessor { wsc_sender: AsyncSender>, table_item_sender: AsyncSender>, move_module_sender: AsyncSender>, - table_metadata_sender: AsyncSender>, } // TODO: Since each table item has different size allocated, the pace of being backfilled to PQ varies a lot. @@ -113,16 +112,6 @@ impl ParquetDefaultProcessor { config.parquet_upload_interval_in_secs(), ); - let table_metadata_sender = create_parquet_handler_loop::( - new_gap_detector_sender.clone(), - ProcessorName::ParquetDefaultProcessor.into(), - config.bucket_name.clone(), - config.bucket_root.clone(), - config.parquet_handler_response_channel_size, - config.max_buffer_size, - config.parquet_upload_interval_in_secs(), - ); - Self { connection_pool, transaction_sender, @@ -130,7 +119,6 @@ impl ParquetDefaultProcessor { wsc_sender, table_item_sender, move_module_sender, - table_metadata_sender, } } } @@ -139,13 +127,12 @@ impl Debug for ParquetDefaultProcessor { fn fmt(&self, f: &mut Formatter<'_>) -> Result { write!( f, - "ParquetProcessor {{ capacity of trnasactions channel: {:?}, capacity of move resource channel: {:?}, capacity of wsc channel: {:?}, capacity of table items channel: {:?}, capacity of move_module channel: {:?}, capacity of table_metadata channel: {:?} }}", + "ParquetProcessor {{ capacity of trnasactions channel: {:?}, capacity of move resource channel: {:?}, capacity of wsc channel: {:?}, capacity of table items channel: {:?}, capacity of move_module channel: {:?}}}", &self.transaction_sender.capacity(), &self.move_resource_sender.capacity(), &self.wsc_sender.capacity(), &self.table_item_sender.capacity(), &self.move_module_sender.capacity(), - &self.table_metadata_sender.capacity(), ) } } @@ -166,14 +153,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let ( - ( - move_resources, - write_set_changes, - transactions, - table_items, - move_modules, - table_metadata, - ), + (move_resources, write_set_changes, transactions, table_items, move_modules), transaction_version_to_struct_count, ) = tokio::task::spawn_blocking(move || process_transactions(transactions)) .await @@ -216,15 +196,6 @@ impl ProcessorTrait for ParquetDefaultProcessor { .await .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; - let tm_parquet_data = ParquetDataGeneric { - data: table_metadata, - }; - - self.table_metadata_sender - .send(tm_parquet_data) - .await - .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; - Ok(ProcessingResult::ParquetProcessingResult( ParquetProcessingResult { start_version: start_version as i64, @@ -251,7 +222,6 @@ pub fn process_transactions( Vec, Vec, Vec, - Vec, ), AHashMap, ) { @@ -265,7 +235,6 @@ pub fn process_transactions( let mut move_modules = vec![]; let mut move_resources = vec![]; let mut table_items = vec![]; - let mut table_metadata: AHashMap = AHashMap::new(); for detail in wsc_details { match detail { @@ -280,30 +249,21 @@ pub fn process_transactions( WriteSetChangeDetail::Resource(resource) => { transaction_version_to_struct_count .entry(resource.txn_version) - .and_modify(|e| *e += 1); + .and_modify(|e| *e += 1) + .or_insert(1); move_resources.push(resource); }, - WriteSetChangeDetail::Table(item, _current_item, metadata) => { + WriteSetChangeDetail::Table(item, _current_item, _) => { let txn_version = item.txn_version; transaction_version_to_struct_count .entry(txn_version) - .and_modify(|e| *e += 1); + .and_modify(|e| *e += 1) + .or_insert(1); table_items.push(item); - - if let Some(meta) = metadata { - table_metadata.insert(meta.handle.clone(), meta); - transaction_version_to_struct_count - .entry(txn_version) - .and_modify(|e| *e += 1); - } }, } } - let mut table_metadata = table_metadata.into_values().collect::>(); - // Sort by PK - table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); - ( ( move_resources, @@ -311,7 +271,6 @@ pub fn process_transactions( txns, table_items, move_modules, - table_metadata, ), transaction_version_to_struct_count, ) diff --git a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs index b12324dbe..beb2e9414 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs @@ -141,7 +141,9 @@ impl ProcessorTrait for ParquetEventsProcessor { ); transaction_version_to_struct_count .entry(txn_version) - .and_modify(|e| *e += txn_events.len() as i64); + .and_modify(|e| *e += txn_events.len() as i64) + .or_insert(txn_events.len() as i64); + events.extend(txn_events); } diff --git a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs index 2c0636095..0f699aea8 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs @@ -92,7 +92,7 @@ impl Debug for ParquetFungibleAssetProcessor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "ParquetFungibleAssetProcessor {{ capacity of tsi channel: {:?}, capacity of es channel: {:?}}}", + "ParquetFungibleAssetProcessor {{ capacity of coin_supply channel: {:?}, capacity of fungible_asset_balances channel: {:?}}}", &self.coin_supply_sender.capacity(), &self.fungible_asset_balances_sender.capacity(), ) diff --git a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs index 9ce0bedd2..dd9e12d9c 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs @@ -427,7 +427,7 @@ async fn parse_v2_token( transaction_version_to_struct_count .entry(txn_version) .and_modify(|e| *e += ownerships.len() as i64 + 1) - .or_insert(1); + .or_insert(ownerships.len() as i64 + 1); token_ownerships_v2.append(&mut ownerships); token_datas_v2.push(token_data); }