diff --git a/rust/processor/src/bq_analytics/gcs_handler.rs b/rust/processor/src/bq_analytics/gcs_handler.rs index fd7aa07eb..c22624704 100644 --- a/rust/processor/src/bq_analytics/gcs_handler.rs +++ b/rust/processor/src/bq_analytics/gcs_handler.rs @@ -1,14 +1,15 @@ -use crate::bq_analytics::ParquetProcessorError; -use anyhow::Result; +use crate::{bq_analytics::ParquetProcessorError, utils::counters::PARQUET_BUFFER_SIZE}; +use anyhow::{Context, Result}; use chrono::{Datelike, Timelike}; use google_cloud_storage::{ client::Client as GCSClient, http::objects::upload::{Media, UploadObjectRequest, UploadType}, }; -use hyper::Body; +use hyper::{body::HttpBody, Body}; use std::path::{Path, PathBuf}; use tokio::time::{sleep, timeout, Duration}; use tracing::{debug, error, info}; + const MAX_RETRIES: usize = 3; const INITIAL_DELAY_MS: u64 = 500; const TIMEOUT_SECONDS: u64 = 300; @@ -18,6 +19,7 @@ pub async fn upload_parquet_to_gcs( table_name: &str, bucket_name: &str, bucket_root: &Path, + processor_name: String, ) -> Result<(), ParquetProcessorError> { if buffer.is_empty() { error!("The file is empty and has no data to upload.",); @@ -57,6 +59,12 @@ pub async fn upload_parquet_to_gcs( loop { let data = Body::from(buffer.clone()); + let size_hint = data.size_hint(); + let size = size_hint.exact().context("Failed to get size hint")?; + PARQUET_BUFFER_SIZE + .with_label_values(&[&processor_name, table_name]) + .set(size as i64); + let upload_result = timeout( Duration::from_secs(TIMEOUT_SECONDS), client.upload_object(&upload_request, data, &upload_type), @@ -65,7 +73,11 @@ pub async fn upload_parquet_to_gcs( match upload_result { Ok(Ok(result)) => { - info!("File uploaded successfully to GCS: {}", result.name); + info!( + table_name = table_name, + file_name = result.name, + "File uploaded successfully to GCS", + ); return Ok(()); }, Ok(Err(e)) => { diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index 00259246a..30fa3ccc1 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -3,7 +3,7 @@ use crate::{ bq_analytics::gcs_handler::upload_parquet_to_gcs, gap_detectors::ProcessingResult, utils::{ - counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE}, + counters::{PARQUET_HANDLER_CURRENT_BUFFER_SIZE, PARQUET_STRUCT_SIZE}, util::naive_datetime_to_timestamp, }, }; @@ -23,7 +23,6 @@ use tracing::{debug, error, info}; #[derive(Debug, Default, Clone)] pub struct ParquetDataGeneric { pub data: Vec, - pub transaction_version_to_struct_count: AHashMap, } pub trait NamedTable { @@ -71,6 +70,7 @@ where pub upload_interval: Duration, pub max_buffer_size: usize, pub last_upload_time: Instant, + pub processor_name: String, } fn create_new_writer(schema: Arc) -> Result>> { let props = WriterProperties::builder() @@ -103,6 +103,7 @@ where schema: Arc, upload_interval: Duration, max_buffer_size: usize, + processor_name: String, ) -> Result { // had to append unique id to avoid concurrent write issues let writer = create_new_writer(schema.clone())?; @@ -119,6 +120,7 @@ where upload_interval, max_buffer_size, last_upload_time: Instant::now(), + processor_name, }) } @@ -128,47 +130,54 @@ where changes: ParquetDataGeneric, ) -> Result<()> { let parquet_structs = changes.data; - self.transaction_version_to_struct_count - .extend(changes.transaction_version_to_struct_count); - + let processor_name = self.processor_name.clone(); for parquet_struct in parquet_structs { let size_of_struct = allocative::size_of_unique(&parquet_struct); PARQUET_STRUCT_SIZE - .with_label_values(&[ParquetType::TABLE_NAME]) + .with_label_values(&[&processor_name, ParquetType::TABLE_NAME]) .set(size_of_struct as i64); self.buffer_size_bytes += size_of_struct; self.buffer.push(parquet_struct); if self.buffer_size_bytes >= self.max_buffer_size { - info!("Max buffer size reached, uploading to GCS."); + debug!( + table_name = ParquetType::TABLE_NAME, + buffer_size = self.buffer_size_bytes, + max_buffer_size = self.max_buffer_size, + "Max buffer size reached, uploading to GCS." + ); if let Err(e) = self.upload_buffer(gcs_client).await { error!("Failed to upload buffer: {}", e); return Err(e); } self.last_upload_time = Instant::now(); } + } - if self.last_upload_time.elapsed() >= self.upload_interval { - info!( - "Time has elapsed more than {} since last upload.", - self.upload_interval.as_secs() - ); - if let Err(e) = self.upload_buffer(gcs_client).await { - error!("Failed to upload buffer: {}", e); - return Err(e); - } - self.last_upload_time = Instant::now(); + if self.last_upload_time.elapsed() >= self.upload_interval { + info!( + "Time has elapsed more than {} since last upload for {}", + self.upload_interval.as_secs(), + ParquetType::TABLE_NAME + ); + if let Err(e) = self.upload_buffer(gcs_client).await { + error!("Failed to upload buffer: {}", e); + return Err(e); } + self.last_upload_time = Instant::now(); } - PARQUET_HANDLER_BUFFER_SIZE - .with_label_values(&[ParquetType::TABLE_NAME]) - .set(self.buffer.len() as i64); + PARQUET_HANDLER_CURRENT_BUFFER_SIZE + .with_label_values(&[&self.processor_name, ParquetType::TABLE_NAME]) + .set(self.buffer_size_bytes as i64); + Ok(()) } async fn upload_buffer(&mut self, gcs_client: &GCSClient) -> Result<()> { + // This is to cover the case when interval duration has passed but buffer is empty if self.buffer.is_empty() { + debug!("Buffer is empty, skipping upload."); return Ok(()); } let start_version = self @@ -183,9 +192,7 @@ where let end_version = last.version(); let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp()); - let txn_version_to_struct_count = - process_struct_count_map(&self.buffer, &mut self.transaction_version_to_struct_count); - + let parquet_processed_transactions = build_parquet_processed_transactions(&self.buffer); let struct_buffer = std::mem::take(&mut self.buffer); let mut row_group_writer = self @@ -206,12 +213,6 @@ where .into_inner() .context("Failed to get inner buffer")?; - debug!( - table_name = ParquetType::TABLE_NAME, - start_version = start_version, - end_version = end_version, - "Max buffer size reached, uploading to GCS." - ); let bucket_root = PathBuf::from(&self.bucket_root); upload_parquet_to_gcs( @@ -220,6 +221,7 @@ where ParquetType::TABLE_NAME, &self.bucket_name, &bucket_root, + self.processor_name.clone(), ) .await?; @@ -229,7 +231,9 @@ where start_version, end_version, last_transaction_timestamp: Some(last_transaction_timestamp), - txn_version_to_struct_count, + txn_version_to_struct_count: None, + parquet_processed_structs: Some(parquet_processed_transactions), + table_name: ParquetType::TABLE_NAME.to_string(), }; self.gap_detector_sender @@ -243,19 +247,18 @@ where } } -fn process_struct_count_map( +fn build_parquet_processed_transactions( buffer: &[ParquetType], - txn_version_to_struct_count: &mut AHashMap, ) -> AHashMap { let mut txn_version_to_struct_count_for_gap_detector = AHashMap::new(); for item in buffer.iter() { let version = item.version(); - if let Some(count) = txn_version_to_struct_count.get(&(version)) { - txn_version_to_struct_count_for_gap_detector.insert(version, *count); - txn_version_to_struct_count.remove(&(version)); - } + txn_version_to_struct_count_for_gap_detector + .entry(version) + .and_modify(|count| *count += 1) + .or_insert(1); } txn_version_to_struct_count_for_gap_detector } diff --git a/rust/processor/src/bq_analytics/mod.rs b/rust/processor/src/bq_analytics/mod.rs index 6a37c0eb3..12de00836 100644 --- a/rust/processor/src/bq_analytics/mod.rs +++ b/rust/processor/src/bq_analytics/mod.rs @@ -30,7 +30,10 @@ pub struct ParquetProcessingResult { pub start_version: i64, pub end_version: i64, pub last_transaction_timestamp: Option, - pub txn_version_to_struct_count: AHashMap, + pub txn_version_to_struct_count: Option>, + // This is used to store the processed structs in the parquet file + pub parquet_processed_structs: Option>, + pub table_name: String, } #[derive(Debug)] @@ -115,13 +118,14 @@ where "[Parquet Handler] Starting parquet handler loop", ); - let mut parquet_manager = GenericParquetHandler::new( + let mut parquet_handler = GenericParquetHandler::new( bucket_name.clone(), bucket_root.clone(), new_gap_detector_sender.clone(), ParquetType::schema(), upload_interval, max_buffer_size, + processor_name.clone(), ) .expect("Failed to create parquet manager"); @@ -135,7 +139,7 @@ where loop { match parquet_receiver.recv().await { Ok(txn_pb_res) => { - let result = parquet_manager.handle(&gcs_client, txn_pb_res).await; + let result = parquet_handler.handle(&gcs_client, txn_pb_res).await; match result { Ok(_) => { diff --git a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs index 22f10715d..513557783 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs @@ -384,15 +384,16 @@ impl Transaction { if let Some(a) = block_metadata { block_metadata_txns.push(a.clone()); - // transaction_version_to_struct_count.entry(a.version).and_modify(|e| *e += 1); } - wscs.append(&mut wsc_list); if !wsc_list.is_empty() { transaction_version_to_struct_count - .entry(wsc_list[0].txn_version) - .and_modify(|e| *e += wsc_list.len() as i64); + .entry(txn.txn_version) + .and_modify(|e| *e += wsc_list.len() as i64) + .or_insert(wsc_list.len() as i64); } + wscs.append(&mut wsc_list); + wsc_details.append(&mut wsc_detail_list); } (txns, block_metadata_txns, wscs, wsc_details) diff --git a/rust/processor/src/gap_detectors/gap_detector.rs b/rust/processor/src/gap_detectors/gap_detector.rs index b4b6d44e5..262b948e2 100644 --- a/rust/processor/src/gap_detectors/gap_detector.rs +++ b/rust/processor/src/gap_detectors/gap_detector.rs @@ -8,6 +8,7 @@ use crate::{ use ahash::AHashMap; use anyhow::Result; +#[derive(Clone)] pub struct DefaultGapDetector { next_version_to_process: u64, seen_versions: AHashMap, diff --git a/rust/processor/src/gap_detectors/mod.rs b/rust/processor/src/gap_detectors/mod.rs index 07ef999f1..c93b19929 100644 --- a/rust/processor/src/gap_detectors/mod.rs +++ b/rust/processor/src/gap_detectors/mod.rs @@ -2,7 +2,7 @@ use crate::{ bq_analytics::ParquetProcessingResult, gap_detectors::{ gap_detector::{DefaultGapDetector, DefaultGapDetectorResult}, - parquet_gap_detector::{ParquetFileGapDetector, ParquetFileGapDetectorResult}, + parquet_gap_detector::{ParquetFileGapDetectorInner, ParquetFileGapDetectorResult}, }, processors::{DefaultProcessingResult, Processor, ProcessorTrait}, utils::counters::{PARQUET_PROCESSOR_DATA_GAP_COUNT, PROCESSOR_DATA_GAP_COUNT}, @@ -11,7 +11,8 @@ use crate::{ use anyhow::Result; use enum_dispatch::enum_dispatch; use kanal::AsyncReceiver; -use tracing::{error, info}; +use std::sync::{Arc, Mutex}; + pub mod gap_detector; pub mod parquet_gap_detector; @@ -21,9 +22,10 @@ pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 500; const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; #[enum_dispatch(GapDetectorTrait)] +#[derive(Clone)] pub enum GapDetector { DefaultGapDetector, - ParquetFileGapDetector, + ParquetFileGapDetector(Arc>), // made this singleton to avoid cloning structs to count map for every parquet handler } pub enum GapDetectorResult { @@ -48,6 +50,7 @@ pub trait GapDetectorTrait: Send { fn process_versions(&mut self, result: ProcessingResult) -> Result; } +#[derive(Debug, Clone)] pub enum ProcessingResult { DefaultProcessingResult(DefaultProcessingResult), ParquetProcessingResult(ParquetProcessingResult), @@ -60,7 +63,7 @@ pub async fn create_gap_detector_status_tracker_loop( gap_detection_batch_size: u64, ) { let processor_name = processor.name(); - info!( + tracing::info!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, "[Parser] Starting gap detector task", @@ -111,7 +114,7 @@ pub async fn create_gap_detector_status_tracker_loop( } }, Err(e) => { - error!( + tracing::error!( processor_name, service_type = PROCESSOR_SERVICE_TYPE, error = ?e, @@ -122,7 +125,7 @@ pub async fn create_gap_detector_status_tracker_loop( } }, Ok(ProcessingResult::ParquetProcessingResult(result)) => { - info!( + tracing::info!( processor_name, service_type = PROCESSOR_SERVICE_TYPE, "[ParquetGapDetector] received parquet gap detector task", @@ -138,22 +141,25 @@ pub async fn create_gap_detector_status_tracker_loop( .set(res.num_gaps as i64); // we need a new gap detection batch size if res.num_gaps >= gap_detection_batch_size { - tracing::debug!( - processor_name, + tracing::warn!( + processor_name, gap_start_version = res.next_version_to_process, num_gaps = res.num_gaps, - "[Parser] Processed {gap_detection_batch_size} batches with a gap", - ); + "[Parser] Processed batches with a gap", + ); // We don't panic as everything downstream will panic if it doesn't work/receive } if last_update_time.elapsed().as_secs() >= UPDATE_PROCESSOR_STATUS_SECS { - tracing::info!("Updating last processed version"); + tracing::info!( + last_processed_version = res.next_version_to_process, + "Updating last processed version" + ); processor .update_last_processed_version( - res.start_version, + res.next_version_to_process, res.last_transaction_timestamp, ) .await @@ -169,7 +175,7 @@ pub async fn create_gap_detector_status_tracker_loop( } }, Err(e) => { - error!( + tracing::error!( processor_name, service_type = PROCESSOR_SERVICE_TYPE, error = ?e, @@ -180,7 +186,7 @@ pub async fn create_gap_detector_status_tracker_loop( } }, Err(e) => { - info!( + tracing::info!( processor_name, service_type = PROCESSOR_SERVICE_TYPE, error = ?e, diff --git a/rust/processor/src/gap_detectors/parquet_gap_detector.rs b/rust/processor/src/gap_detectors/parquet_gap_detector.rs index 19d5a1d30..794a5f9f1 100644 --- a/rust/processor/src/gap_detectors/parquet_gap_detector.rs +++ b/rust/processor/src/gap_detectors/parquet_gap_detector.rs @@ -2,84 +2,162 @@ // // SPDX-License-Identifier: Apache-2.0 use crate::gap_detectors::{GapDetectorResult, GapDetectorTrait, ProcessingResult}; -use ahash::AHashMap; -use anyhow::Result; -use std::cmp::max; +use ahash::{AHashMap, AHashSet}; +use anyhow::{Context, Result}; +use std::{ + cmp::{max, min}, + sync::{Arc, Mutex}, +}; use tracing::{debug, info}; -pub struct ParquetFileGapDetector { +impl GapDetectorTrait for Arc> { + fn process_versions(&mut self, result: ProcessingResult) -> Result { + let mut detector = self.lock().unwrap(); + detector.process_versions(result) + } +} + +#[derive(Clone)] +pub struct ParquetFileGapDetectorInner { next_version_to_process: i64, version_counters: AHashMap, + seen_versions: AHashSet, max_version: i64, } pub struct ParquetFileGapDetectorResult { pub next_version_to_process: u64, pub num_gaps: u64, - pub start_version: u64, pub last_transaction_timestamp: Option, } -impl ParquetFileGapDetector { +impl ParquetFileGapDetectorInner { pub fn new(starting_version: u64) -> Self { Self { next_version_to_process: starting_version as i64, version_counters: AHashMap::new(), + seen_versions: AHashSet::new(), max_version: 0, } } + + pub fn update_struct_map( + &mut self, + txn_version_to_struct_count: AHashMap, + start_version: i64, + end_version: i64, + ) { + for version in start_version..=end_version { + if let Some(count) = txn_version_to_struct_count.get(&version) { + if self.version_counters.contains_key(&version) { + *self.version_counters.get_mut(&version).unwrap() += *count; + } else { + self.version_counters.insert(version, *count); + } + } else { + // this is the case when file gets uploaded before it gets processed here. + if !self.version_counters.contains_key(&version) { + self.version_counters.insert(version, 0); + } + // if there is version populated already, meaning that there are some other structs that are being processed for this version. we don't do anything + // b/c we don't have any of the structs we are processing for this txn version. + } + } + self.max_version = max(self.max_version, end_version); + + // b/c of the case where file gets uploaded first, we should check if we have to update last_success_version for this processor + self.update_next_version_to_process( + min(self.next_version_to_process, end_version), + "all_table", + ); + } + + /// This function updates the next version to process based on the current version counters. + /// It will increment the next version to process if the current version is fully processed. + /// It will also remove the version from the version counters if it is fully processed. + /// what it means to be fully processed is that all the structs for that version processed, i.e. count = 0. + /// Note that for tables other than transactions, it won't be always the latest txn version since we update this value with + /// Thus we will keep the latest version_to_process in the db with the min(max version of latest table files per processor) + /// that has been uploaded to GCS. so whenever we restart the processor, it may generate some duplicates rows, and we are okay with that. + pub fn update_next_version_to_process(&mut self, end_version: i64, table_name: &str) { + // this has to start checking with this value all the time, since this is the value that will be stored in the db as well. + // maybe there could be an improvement to be more performant. but priortizing the data integrity as of now. + let mut current_version = self.next_version_to_process; + + while current_version <= end_version { + #[allow(clippy::collapsible_else_if)] + if self.version_counters.contains_key(¤t_version) { + while let Some(&count) = self.version_counters.get(¤t_version) { + if current_version > end_version { + // we shouldn't update further b/c we haven't uploaded the files containing versions after end_version. + break; + } + if count == 0 { + self.version_counters.remove(¤t_version); + self.seen_versions.insert(current_version); // seen_version holds the txns version that we have processed already + current_version += 1; + self.next_version_to_process += 1; + } else { + break; + } + } + } else { + if self.seen_versions.contains(¤t_version) { + debug!( + "Version {} already processed, skipping and current next_version {} ", + current_version, self.next_version_to_process + ); + self.next_version_to_process = + max(self.next_version_to_process, current_version + 1); + } else { + // this is the case where we haven't updated the map yet, while the file gets uploaded first. the bigger file size we will have, + // the less chance we will see this as upload takes longer time. And map population is done before the upload. + debug!( + current_version = current_version, + "No struct count found for version. This shouldn't happen b/c we already added default count for this version." + ); + } + } + current_version += 1; + } + debug!( + next_version_to_process = self.next_version_to_process, + table_name = table_name, + "Updated the next_version_to_process.", + ); + } } -impl GapDetectorTrait for ParquetFileGapDetector { + +impl GapDetectorTrait for ParquetFileGapDetectorInner { fn process_versions(&mut self, result: ProcessingResult) -> Result { - // Update counts of structures for each transaction version let result = match result { ProcessingResult::ParquetProcessingResult(r) => r, _ => panic!("Invalid result type"), }; - for (version, count) in result.txn_version_to_struct_count.iter() { - if !self.version_counters.contains_key(version) { - // info!("Inserting version {} with count {} into parquet gap detector", version, count); - self.version_counters.insert(*version, *count); - } - self.max_version = max(self.max_version, *version); + let parquet_processed_structs = result + .parquet_processed_structs + .context("Missing parquet processed transactions")?; + info!( + start_version = result.start_version, + end_version = result.end_version, + "Parquet file has been uploaded." + ); - *self.version_counters.entry(*version).or_default() -= 1; - } - - // Update next version to process and move forward - let mut current_version = result.start_version; - - while current_version <= result.end_version { - match self.version_counters.get_mut(¤t_version) { - Some(count) => { - if *count == 0 && current_version == self.next_version_to_process { - while let Some(&count) = - self.version_counters.get(&self.next_version_to_process) - { - if count == 0 { - self.version_counters.remove(&self.next_version_to_process); // Remove the fully processed version - self.next_version_to_process += 1; // Increment to the next version - info!("Version {} fully processed. Next version to process updated to {}", self.next_version_to_process - 1, self.next_version_to_process); - } else { - break; - } - } - } - }, - None => { - // TODO: validate this that we shouldn't reach this b/c we already added default count. - // or it could mean that we have duplicates. - debug!("No struct count found for version {}", current_version); - }, + for (version, count) in parquet_processed_structs.iter() { + if let Some(entry) = self.version_counters.get_mut(version) { + *entry -= count; + } else { + // if not hasn't been updated, we can populate this count with the negative value and pass through + self.version_counters.insert(*version, -count); } - current_version += 1; // Move to the next version in sequence } + self.update_next_version_to_process(result.end_version, &result.table_name); + Ok(GapDetectorResult::ParquetFileGapDetectorResult( ParquetFileGapDetectorResult { next_version_to_process: self.next_version_to_process as u64, num_gaps: (self.max_version - self.next_version_to_process) as u64, - start_version: result.start_version as u64, last_transaction_timestamp: result.last_transaction_timestamp, }, )) diff --git a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs index 7849f5bdd..2557870b9 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs @@ -9,7 +9,7 @@ use crate::{ db::common::models::default_models::{ parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, + parquet_move_tables::{TableItem, TableMetadata}, parquet_transactions::{Transaction as ParquetTransaction, TransactionModel}, parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, }, @@ -186,7 +186,6 @@ impl ProcessorTrait for ParquetDefaultProcessor { let mr_parquet_data = ParquetDataGeneric { data: move_resources, - transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), }; self.move_resource_sender @@ -196,36 +195,26 @@ impl ProcessorTrait for ParquetDefaultProcessor { let wsc_parquet_data = ParquetDataGeneric { data: write_set_changes, - transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), }; self.wsc_sender .send(wsc_parquet_data) .await .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; - let t_parquet_data = ParquetDataGeneric { - data: transactions, - transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), - }; + let t_parquet_data = ParquetDataGeneric { data: transactions }; self.transaction_sender .send(t_parquet_data) .await .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; - let ti_parquet_data = ParquetDataGeneric { - data: table_items, - transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), - }; + let ti_parquet_data = ParquetDataGeneric { data: table_items }; self.table_item_sender .send(ti_parquet_data) .await .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; - let mm_parquet_data = ParquetDataGeneric { - data: move_modules, - transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), - }; + let mm_parquet_data = ParquetDataGeneric { data: move_modules }; self.move_module_sender .send(mm_parquet_data) @@ -234,7 +223,6 @@ impl ProcessorTrait for ParquetDefaultProcessor { let tm_parquet_data = ParquetDataGeneric { data: table_metadata, - transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), }; self.table_metadata_sender @@ -247,7 +235,9 @@ impl ProcessorTrait for ParquetDefaultProcessor { start_version: start_version as i64, end_version: end_version as i64, last_transaction_timestamp: last_transaction_timestamp.clone(), - txn_version_to_struct_count: AHashMap::new(), + txn_version_to_struct_count: Some(transaction_version_to_struct_count), + parquet_processed_structs: None, + table_name: "".to_string(), }, )) } @@ -280,7 +270,6 @@ pub fn process_transactions( let mut move_modules = vec![]; let mut move_resources = vec![]; let mut table_items = vec![]; - let mut current_table_items = AHashMap::new(); let mut table_metadata: AHashMap = AHashMap::new(); for detail in wsc_details { @@ -289,7 +278,8 @@ pub fn process_transactions( move_modules.push(module.clone()); transaction_version_to_struct_count .entry(module.txn_version) - .and_modify(|e| *e += 1); + .and_modify(|e| *e += 1) + .or_insert(1); }, WriteSetChangeDetail::Resource(resource) => { transaction_version_to_struct_count @@ -297,23 +287,13 @@ pub fn process_transactions( .and_modify(|e| *e += 1); move_resources.push(resource); }, - WriteSetChangeDetail::Table(item, current_item, metadata) => { + WriteSetChangeDetail::Table(item, _current_item, metadata) => { let txn_version = item.txn_version; - transaction_version_to_struct_count .entry(txn_version) .and_modify(|e| *e += 1); table_items.push(item); - current_table_items.insert( - ( - current_item.table_handle.clone(), - current_item.key_hash.clone(), - ), - current_item, - ); - // transaction_version_to_struct_count.entry(current_item.last_transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2 - if let Some(meta) = metadata { table_metadata.insert(meta.handle.clone(), meta); transaction_version_to_struct_count @@ -324,14 +304,8 @@ pub fn process_transactions( } } - // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes - let mut current_table_items = current_table_items - .into_values() - .collect::>(); let mut table_metadata = table_metadata.into_values().collect::>(); // Sort by PK - current_table_items - .sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash))); table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); ( diff --git a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs index 31ebcd35a..76ef1f5c3 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs @@ -120,10 +120,7 @@ impl ProcessorTrait for ParquetFungibleAssetProcessor { let (fungible_asset_balances, coin_supply) = parse_v2_coin(&transactions, &mut transaction_version_to_struct_count).await; - let parquet_coin_supply = ParquetDataGeneric { - data: coin_supply, - transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), - }; + let parquet_coin_supply = ParquetDataGeneric { data: coin_supply }; self.coin_supply_sender .send(parquet_coin_supply) @@ -132,7 +129,6 @@ impl ProcessorTrait for ParquetFungibleAssetProcessor { let parquet_fungible_asset_balances = ParquetDataGeneric { data: fungible_asset_balances, - transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), }; self.fungible_asset_balances_sender @@ -145,7 +141,9 @@ impl ProcessorTrait for ParquetFungibleAssetProcessor { start_version: start_version as i64, end_version: end_version as i64, last_transaction_timestamp: last_transaction_timestamp.clone(), - txn_version_to_struct_count: AHashMap::new(), + txn_version_to_struct_count: Some(transaction_version_to_struct_count), + parquet_processed_structs: None, + table_name: "".to_string(), }, )) } diff --git a/rust/processor/src/utils/counters.rs b/rust/processor/src/utils/counters.rs index f4a6a57e1..d6a90c4db 100644 --- a/rust/processor/src/utils/counters.rs +++ b/rust/processor/src/utils/counters.rs @@ -268,17 +268,28 @@ pub static PROCESSOR_UNKNOWN_TYPE_COUNT: Lazy = Lazy::new(|| { /// Parquet struct size pub static PARQUET_STRUCT_SIZE: Lazy = Lazy::new(|| { register_int_gauge_vec!("indexer_parquet_struct_size", "Parquet struct size", &[ + "processor_name", "parquet_type" ]) .unwrap() }); /// Parquet handler buffer size -pub static PARQUET_HANDLER_BUFFER_SIZE: Lazy = Lazy::new(|| { +pub static PARQUET_HANDLER_CURRENT_BUFFER_SIZE: Lazy = Lazy::new(|| { register_int_gauge_vec!( "indexer_parquet_handler_buffer_size", "Parquet handler buffer size", - &["parquet_type"] // TODO: add something like task_index + &["processor_name", "parquet_type"] + ) + .unwrap() +}); + +/// Size of the parquet file +pub static PARQUET_BUFFER_SIZE: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "indexer_parquet_size", + "Size of Parquet buffer to upload", + &["processor_name", "parquet_type"] ) .unwrap() }); diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 7ccb9b61f..a54a19833 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -6,7 +6,7 @@ use crate::{ db::common::models::{ledger_info::LedgerInfo, processor_status::ProcessorStatusQuery}, gap_detectors::{ create_gap_detector_status_tracker_loop, gap_detector::DefaultGapDetector, - parquet_gap_detector::ParquetFileGapDetector, GapDetector, ProcessingResult, + parquet_gap_detector::ParquetFileGapDetectorInner, GapDetector, ProcessingResult, }, grpc_stream::TransactionsPBResponse, processors::{ @@ -51,7 +51,10 @@ use anyhow::{Context, Result}; use aptos_moving_average::MovingAverage; use bitflags::bitflags; use kanal::AsyncSender; -use std::collections::HashSet; +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, +}; use tokio::task::JoinHandle; use tracing::{debug, error, info}; use url::Url; @@ -327,14 +330,17 @@ impl Worker { ); let gap_detector = if is_parquet_processor { - GapDetector::ParquetFileGapDetector(ParquetFileGapDetector::new(starting_version)) + GapDetector::ParquetFileGapDetector(Arc::new(Mutex::new( + ParquetFileGapDetectorInner::new(starting_version), + ))) } else { GapDetector::DefaultGapDetector(DefaultGapDetector::new(starting_version)) }; + let gap_detector_clone = gap_detector.clone(); tokio::spawn(async move { create_gap_detector_status_tracker_loop( - gap_detector, + gap_detector_clone, gap_detector_receiver, processor, gap_detection_batch_size, @@ -360,7 +366,12 @@ impl Worker { let mut processor_tasks = vec![fetcher_task]; for task_index in 0..concurrent_tasks { let join_handle: JoinHandle<()> = self - .launch_processor_task(task_index, receiver.clone(), gap_detector_sender.clone()) + .launch_processor_task( + task_index, + receiver.clone(), + gap_detector_sender.clone(), + gap_detector.clone(), + ) .await; processor_tasks.push(join_handle); } @@ -384,6 +395,7 @@ impl Worker { task_index: usize, receiver: kanal::AsyncReceiver, gap_detector_sender: AsyncSender, + mut gap_detector: GapDetector, ) -> JoinHandle<()> { let processor_name = self.processor_config.name(); let stream_address = self.indexer_grpc_data_service_address.to_string(); @@ -629,8 +641,36 @@ impl Worker { .await .expect("[Parser] Failed to send versions to gap detector"); }, - ProcessingResult::ParquetProcessingResult(_) => { - debug!("parquet processing result doesn't need to be handled here"); + ProcessingResult::ParquetProcessingResult(processing_result) => { + // we need to pupulate the map here so then we don't have to pass multiple times + let parquet_gap_detector = match &mut gap_detector { + GapDetector::ParquetFileGapDetector(gap_detector) => { + gap_detector + }, + _ => panic!("Invalid gap detector type"), + }; + + let num_processed = (last_txn_version - first_txn_version) + 1; + + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[ + processor_name, + step, + label, + &task_index_str, + ]) + .inc_by(num_processed); + + if let Some(txn_version_to_struct_count) = + processing_result.txn_version_to_struct_count + { + let mut detector = parquet_gap_detector.lock().unwrap(); + detector.update_struct_map( + txn_version_to_struct_count, + processing_result.start_version, + processing_result.end_version, + ); + } }, } }, diff --git a/rust/server-framework/src/lib.rs b/rust/server-framework/src/lib.rs index a81361fc9..6e9a6101c 100644 --- a/rust/server-framework/src/lib.rs +++ b/rust/server-framework/src/lib.rs @@ -9,6 +9,7 @@ use prometheus::{Encoder, TextEncoder}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; #[cfg(target_os = "linux")] use std::convert::Infallible; +#[allow(deprecated)] use std::{fs::File, io::Read, panic::PanicInfo, path::PathBuf, process}; use tokio::runtime::Handle; use tracing::error; @@ -110,6 +111,7 @@ pub struct CrashInfo { /// Tokio's default behavior is to catch panics and ignore them. Invoking this function will /// ensure that all subsequent thread panics (even Tokio threads) will report the /// details/backtrace and then exit. +#[allow(deprecated)] pub fn setup_panic_handler() { std::panic::set_hook(Box::new(move |pi: &PanicInfo<'_>| { handle_panic(pi); @@ -117,6 +119,7 @@ pub fn setup_panic_handler() { } // Formats and logs panic information +#[allow(deprecated)] fn handle_panic(panic_info: &PanicInfo<'_>) { // The Display formatter for a PanicInfo contains the message, payload and location. let details = format!("{}", panic_info);