Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parquet default processor and clean up a bit #480

Merged
merged 3 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ where
) -> Result<()> {
let parquet_structs = changes.data;
let processor_name = self.processor_name.clone();

if self.last_upload_time.elapsed() >= self.upload_interval {
info!(
"Time has elapsed more than {} since last upload for {}",
self.upload_interval.as_secs(),
ParquetType::TABLE_NAME
);
if let Err(e) = self.upload_buffer(gcs_client).await {
error!("Failed to upload buffer: {}", e);
return Err(e);
}
self.last_upload_time = Instant::now();
}

for parquet_struct in parquet_structs {
let size_of_struct = allocative::size_of_unique(&parquet_struct);
PARQUET_STRUCT_SIZE
Expand Down Expand Up @@ -252,7 +266,12 @@ where
parquet_processed_structs: Some(parquet_processed_transactions),
table_name: ParquetType::TABLE_NAME.to_string(),
};

info!(
table_name = ParquetType::TABLE_NAME,
start_version = start_version,
end_version = end_version,
"Uploaded parquet to GCS and sending result to gap detector."
);
self.gap_detector_sender
.send(ProcessingResult::ParquetProcessingResult(
parquet_processing_result,
Expand Down
5 changes: 0 additions & 5 deletions rust/processor/src/gap_detectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ pub async fn create_gap_detector_status_tracker_loop(
}
},
Ok(ProcessingResult::ParquetProcessingResult(result)) => {
tracing::info!(
processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
"[ParquetGapDetector] received parquet gap detector task",
);
match gap_detector
.process_versions(ProcessingResult::ParquetProcessingResult(result))
{
Expand Down
85 changes: 39 additions & 46 deletions rust/processor/src/gap_detectors/parquet_gap_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::gap_detectors::{GapDetectorResult, GapDetectorTrait, ProcessingResult
use ahash::{AHashMap, AHashSet};
use anyhow::Result;
use std::{
cmp::{max, min},
cmp::max,
sync::{Arc, Mutex},
};
use tracing::{debug, info};
Expand Down Expand Up @@ -64,62 +64,55 @@ impl ParquetFileGapDetectorInner {
}
}
self.max_version = max(self.max_version, end_version);

// b/c of the case where file gets uploaded first, we should check if we have to update last_success_version for this processor
self.update_next_version_to_process(
min(self.next_version_to_process, end_version),
"all_table",
);
}

/// This function updates the next version to process based on the current version counters.
/// It will increment the next version to process if the current version is fully processed.
/// It will also remove the version from the version counters if it is fully processed.
/// what it means to be fully processed is that all the structs for that version processed, i.e. count = 0.
/// Note that for tables other than transactions, it won't be always the latest txn version since we update this value with
/// Thus we will keep the latest version_to_process in the db with the min(max version of latest table files per processor)
/// that has been uploaded to GCS. so whenever we restart the processor, it may generate some duplicates rows, and we are okay with that.
/// This function updates the `next_version_to_process` based on the current version counters.
/// It increments the `next_version_to_process` if the current version is fully processed, which means
/// that all the structs for that version have been processed, i.e., `count = 0`.
/// If a version is fully processed, it removes the version from the version counters and adds it to the `seen_versions`.
/// For tables other than transactions, the latest version to process may not always be the most recent transaction version
/// since this value is updated based on the minimum of the maximum versions of the latest table files per processor
/// that have been uploaded to GCS. Therefore, when the processor restarts, some duplicate rows may be generated, which is acceptable.
/// The function also ensures that the current version starts checking from the `next_version_to_process`
/// value stored in the database. While there might be potential performance improvements,
/// the current implementation prioritizes data integrity.
/// The function also handles cases where a version is already processed or where no struct count
/// is found for a version, providing appropriate logging for these scenarios.
pub fn update_next_version_to_process(&mut self, end_version: i64, table_name: &str) {
// this has to start checking with this value all the time, since this is the value that will be stored in the db as well.
// maybe there could be an improvement to be more performant. but priortizing the data integrity as of now.
let mut current_version = self.next_version_to_process;

while current_version <= end_version {
#[allow(clippy::collapsible_else_if)]
if self.version_counters.contains_key(&current_version) {
while let Some(&count) = self.version_counters.get(&current_version) {
if current_version > end_version {
// we shouldn't update further b/c we haven't uploaded the files containing versions after end_version.
break;
}
if count == 0 {
self.version_counters.remove(&current_version);
self.seen_versions.insert(current_version); // seen_version holds the txns version that we have processed already
current_version += 1;
self.next_version_to_process += 1;
} else {
break;
}
}
} else {
if self.seen_versions.contains(&current_version) {
debug!(
"Version {} already processed, skipping and current next_version {} ",
current_version, self.next_version_to_process
);
self.next_version_to_process =
max(self.next_version_to_process, current_version + 1);
// If the current version has a struct count entry
if let Some(&count) = self.version_counters.get(&current_version) {
if count == 0 {
self.version_counters.remove(&current_version);
self.seen_versions.insert(current_version);
self.next_version_to_process += 1;
} else {
// this is the case where we haven't updated the map yet, while the file gets uploaded first. the bigger file size we will have,
// the less chance we will see this as upload takes longer time. And map population is done before the upload.
debug!(
current_version = current_version,
"No struct count found for version. This shouldn't happen b/c we already added default count for this version."
);
// Stop processing if the version is not yet complete
break;
}
} else if self.seen_versions.contains(&current_version) {
// If the version is already seen and processed
debug!(
"Version {} already processed, skipping and current next_version {} ",
current_version, self.next_version_to_process
);
self.next_version_to_process =
max(self.next_version_to_process, current_version + 1);
} else {
// If the version is neither in seen_versions nor version_counters
debug!(
current_version = current_version,
"No struct count found for version. This shouldn't happen b/c we already added default count for this version."
);
}

current_version += 1;
}

debug!(
next_version_to_process = self.next_version_to_process,
table_name = table_name,
Expand Down Expand Up @@ -155,7 +148,8 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner {
info!(
start_version = result.start_version,
end_version = result.end_version,
"Parquet file has been uploaded."
table_name = &result.table_name,
"[Parquet Gap Detector] Processing versions after parquet file upload."
);

for (version, count) in parquet_processed_structs.iter() {
Expand All @@ -166,7 +160,6 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner {
self.version_counters.insert(*version, -count);
}
}

self.update_next_version_to_process(result.end_version, &result.table_name);

Ok(GapDetectorResult::ParquetFileGapDetectorResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
db::common::models::default_models::{
parquet_move_modules::MoveModule,
parquet_move_resources::MoveResource,
parquet_move_tables::{TableItem, TableMetadata},
parquet_move_tables::TableItem,
parquet_transactions::{Transaction as ParquetTransaction, TransactionModel},
parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel},
},
Expand Down Expand Up @@ -51,7 +51,6 @@ pub struct ParquetDefaultProcessor {
wsc_sender: AsyncSender<ParquetDataGeneric<WriteSetChangeModel>>,
table_item_sender: AsyncSender<ParquetDataGeneric<TableItem>>,
move_module_sender: AsyncSender<ParquetDataGeneric<MoveModule>>,
table_metadata_sender: AsyncSender<ParquetDataGeneric<TableMetadata>>,
}

// TODO: Since each table item has different size allocated, the pace of being backfilled to PQ varies a lot.
Expand Down Expand Up @@ -113,24 +112,13 @@ impl ParquetDefaultProcessor {
config.parquet_upload_interval_in_secs(),
);

let table_metadata_sender = create_parquet_handler_loop::<TableMetadata>(
new_gap_detector_sender.clone(),
ProcessorName::ParquetDefaultProcessor.into(),
config.bucket_name.clone(),
config.bucket_root.clone(),
config.parquet_handler_response_channel_size,
config.max_buffer_size,
config.parquet_upload_interval_in_secs(),
);

Self {
connection_pool,
transaction_sender,
move_resource_sender,
wsc_sender,
table_item_sender,
move_module_sender,
table_metadata_sender,
}
}
}
Expand All @@ -139,13 +127,12 @@ impl Debug for ParquetDefaultProcessor {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
write!(
f,
"ParquetProcessor {{ capacity of trnasactions channel: {:?}, capacity of move resource channel: {:?}, capacity of wsc channel: {:?}, capacity of table items channel: {:?}, capacity of move_module channel: {:?}, capacity of table_metadata channel: {:?} }}",
"ParquetProcessor {{ capacity of trnasactions channel: {:?}, capacity of move resource channel: {:?}, capacity of wsc channel: {:?}, capacity of table items channel: {:?}, capacity of move_module channel: {:?}}}",
&self.transaction_sender.capacity(),
&self.move_resource_sender.capacity(),
&self.wsc_sender.capacity(),
&self.table_item_sender.capacity(),
&self.move_module_sender.capacity(),
&self.table_metadata_sender.capacity(),
)
}
}
Expand All @@ -166,14 +153,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();

let (
(
move_resources,
write_set_changes,
transactions,
table_items,
move_modules,
table_metadata,
),
(move_resources, write_set_changes, transactions, table_items, move_modules),
transaction_version_to_struct_count,
) = tokio::task::spawn_blocking(move || process_transactions(transactions))
.await
Expand Down Expand Up @@ -216,15 +196,6 @@ impl ProcessorTrait for ParquetDefaultProcessor {
.await
.map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?;

let tm_parquet_data = ParquetDataGeneric {
data: table_metadata,
};

self.table_metadata_sender
.send(tm_parquet_data)
.await
.map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?;

Ok(ProcessingResult::ParquetProcessingResult(
ParquetProcessingResult {
start_version: start_version as i64,
Expand All @@ -251,7 +222,6 @@ pub fn process_transactions(
Vec<TransactionModel>,
Vec<TableItem>,
Vec<MoveModule>,
Vec<TableMetadata>,
),
AHashMap<i64, i64>,
) {
Expand All @@ -265,7 +235,6 @@ pub fn process_transactions(
let mut move_modules = vec![];
let mut move_resources = vec![];
let mut table_items = vec![];
let mut table_metadata: AHashMap<String, TableMetadata> = AHashMap::new();

for detail in wsc_details {
match detail {
Expand All @@ -283,35 +252,23 @@ pub fn process_transactions(
.and_modify(|e| *e += 1);
move_resources.push(resource);
},
WriteSetChangeDetail::Table(item, _current_item, metadata) => {
WriteSetChangeDetail::Table(item, _current_item, _) => {
let txn_version = item.txn_version;
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1);
table_items.push(item);

if let Some(meta) = metadata {
table_metadata.insert(meta.handle.clone(), meta);
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1);
}
},
}
}

let mut table_metadata = table_metadata.into_values().collect::<Vec<TableMetadata>>();
// Sort by PK
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));

(
(
move_resources,
write_set_changes,
txns,
table_items,
move_modules,
table_metadata,
),
transaction_version_to_struct_count,
)
Expand Down
Loading