Skip to content

Commit

Permalink
fix parquet to start from where it left off when it's re-deployed (#468)
Browse files Browse the repository at this point in the history
* fix parquet to start from where it left off when it's re-deployed

* lint

* temp

* remove logs

* add log for dashboard

* add metrics/logs

* add more metrics

* remove mutable and let the buffer drop natively

* lint
  • Loading branch information
yuunlimm authored Jul 24, 2024
1 parent fa1ce49 commit a3a16af
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 156 deletions.
20 changes: 16 additions & 4 deletions rust/processor/src/bq_analytics/gcs_handler.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::bq_analytics::ParquetProcessorError;
use anyhow::Result;
use crate::{bq_analytics::ParquetProcessorError, utils::counters::PARQUET_BUFFER_SIZE};
use anyhow::{Context, Result};
use chrono::{Datelike, Timelike};
use google_cloud_storage::{
client::Client as GCSClient,
http::objects::upload::{Media, UploadObjectRequest, UploadType},
};
use hyper::Body;
use hyper::{body::HttpBody, Body};
use std::path::{Path, PathBuf};
use tokio::time::{sleep, timeout, Duration};
use tracing::{debug, error, info};

const MAX_RETRIES: usize = 3;
const INITIAL_DELAY_MS: u64 = 500;
const TIMEOUT_SECONDS: u64 = 300;
Expand All @@ -18,6 +19,7 @@ pub async fn upload_parquet_to_gcs(
table_name: &str,
bucket_name: &str,
bucket_root: &Path,
processor_name: String,
) -> Result<(), ParquetProcessorError> {
if buffer.is_empty() {
error!("The file is empty and has no data to upload.",);
Expand Down Expand Up @@ -57,6 +59,12 @@ pub async fn upload_parquet_to_gcs(

loop {
let data = Body::from(buffer.clone());
let size_hint = data.size_hint();
let size = size_hint.exact().context("Failed to get size hint")?;
PARQUET_BUFFER_SIZE
.with_label_values(&[&processor_name, table_name])
.set(size as i64);

let upload_result = timeout(
Duration::from_secs(TIMEOUT_SECONDS),
client.upload_object(&upload_request, data, &upload_type),
Expand All @@ -65,7 +73,11 @@ pub async fn upload_parquet_to_gcs(

match upload_result {
Ok(Ok(result)) => {
info!("File uploaded successfully to GCS: {}", result.name);
info!(
table_name = table_name,
file_name = result.name,
"File uploaded successfully to GCS",
);
return Ok(());
},
Ok(Err(e)) => {
Expand Down
75 changes: 39 additions & 36 deletions rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
bq_analytics::gcs_handler::upload_parquet_to_gcs,
gap_detectors::ProcessingResult,
utils::{
counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE},
counters::{PARQUET_HANDLER_CURRENT_BUFFER_SIZE, PARQUET_STRUCT_SIZE},
util::naive_datetime_to_timestamp,
},
};
Expand All @@ -23,7 +23,6 @@ use tracing::{debug, error, info};
#[derive(Debug, Default, Clone)]
pub struct ParquetDataGeneric<ParquetType> {
pub data: Vec<ParquetType>,
pub transaction_version_to_struct_count: AHashMap<i64, i64>,
}

pub trait NamedTable {
Expand Down Expand Up @@ -71,6 +70,7 @@ where
pub upload_interval: Duration,
pub max_buffer_size: usize,
pub last_upload_time: Instant,
pub processor_name: String,
}
fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {
let props = WriterProperties::builder()
Expand Down Expand Up @@ -103,6 +103,7 @@ where
schema: Arc<Type>,
upload_interval: Duration,
max_buffer_size: usize,
processor_name: String,
) -> Result<Self> {
// had to append unique id to avoid concurrent write issues
let writer = create_new_writer(schema.clone())?;
Expand All @@ -119,6 +120,7 @@ where
upload_interval,
max_buffer_size,
last_upload_time: Instant::now(),
processor_name,
})
}

Expand All @@ -128,47 +130,54 @@ where
changes: ParquetDataGeneric<ParquetType>,
) -> Result<()> {
let parquet_structs = changes.data;
self.transaction_version_to_struct_count
.extend(changes.transaction_version_to_struct_count);

let processor_name = self.processor_name.clone();
for parquet_struct in parquet_structs {
let size_of_struct = allocative::size_of_unique(&parquet_struct);
PARQUET_STRUCT_SIZE
.with_label_values(&[ParquetType::TABLE_NAME])
.with_label_values(&[&processor_name, ParquetType::TABLE_NAME])
.set(size_of_struct as i64);
self.buffer_size_bytes += size_of_struct;
self.buffer.push(parquet_struct);

if self.buffer_size_bytes >= self.max_buffer_size {
info!("Max buffer size reached, uploading to GCS.");
debug!(
table_name = ParquetType::TABLE_NAME,
buffer_size = self.buffer_size_bytes,
max_buffer_size = self.max_buffer_size,
"Max buffer size reached, uploading to GCS."
);
if let Err(e) = self.upload_buffer(gcs_client).await {
error!("Failed to upload buffer: {}", e);
return Err(e);
}
self.last_upload_time = Instant::now();
}
}

if self.last_upload_time.elapsed() >= self.upload_interval {
info!(
"Time has elapsed more than {} since last upload.",
self.upload_interval.as_secs()
);
if let Err(e) = self.upload_buffer(gcs_client).await {
error!("Failed to upload buffer: {}", e);
return Err(e);
}
self.last_upload_time = Instant::now();
if self.last_upload_time.elapsed() >= self.upload_interval {
info!(
"Time has elapsed more than {} since last upload for {}",
self.upload_interval.as_secs(),
ParquetType::TABLE_NAME
);
if let Err(e) = self.upload_buffer(gcs_client).await {
error!("Failed to upload buffer: {}", e);
return Err(e);
}
self.last_upload_time = Instant::now();
}

PARQUET_HANDLER_BUFFER_SIZE
.with_label_values(&[ParquetType::TABLE_NAME])
.set(self.buffer.len() as i64);
PARQUET_HANDLER_CURRENT_BUFFER_SIZE
.with_label_values(&[&self.processor_name, ParquetType::TABLE_NAME])
.set(self.buffer_size_bytes as i64);

Ok(())
}

async fn upload_buffer(&mut self, gcs_client: &GCSClient) -> Result<()> {
// This is to cover the case when interval duration has passed but buffer is empty
if self.buffer.is_empty() {
debug!("Buffer is empty, skipping upload.");
return Ok(());
}
let start_version = self
Expand All @@ -183,9 +192,7 @@ where
let end_version = last.version();
let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp());

let txn_version_to_struct_count =
process_struct_count_map(&self.buffer, &mut self.transaction_version_to_struct_count);

let parquet_processed_transactions = build_parquet_processed_transactions(&self.buffer);
let struct_buffer = std::mem::take(&mut self.buffer);

let mut row_group_writer = self
Expand All @@ -206,12 +213,6 @@ where
.into_inner()
.context("Failed to get inner buffer")?;

debug!(
table_name = ParquetType::TABLE_NAME,
start_version = start_version,
end_version = end_version,
"Max buffer size reached, uploading to GCS."
);
let bucket_root = PathBuf::from(&self.bucket_root);

upload_parquet_to_gcs(
Expand All @@ -220,6 +221,7 @@ where
ParquetType::TABLE_NAME,
&self.bucket_name,
&bucket_root,
self.processor_name.clone(),
)
.await?;

Expand All @@ -229,7 +231,9 @@ where
start_version,
end_version,
last_transaction_timestamp: Some(last_transaction_timestamp),
txn_version_to_struct_count,
txn_version_to_struct_count: None,
parquet_processed_structs: Some(parquet_processed_transactions),
table_name: ParquetType::TABLE_NAME.to_string(),
};

self.gap_detector_sender
Expand All @@ -243,19 +247,18 @@ where
}
}

fn process_struct_count_map<ParquetType: NamedTable + HasVersion>(
fn build_parquet_processed_transactions<ParquetType: NamedTable + HasVersion>(
buffer: &[ParquetType],
txn_version_to_struct_count: &mut AHashMap<i64, i64>,
) -> AHashMap<i64, i64> {
let mut txn_version_to_struct_count_for_gap_detector = AHashMap::new();

for item in buffer.iter() {
let version = item.version();

if let Some(count) = txn_version_to_struct_count.get(&(version)) {
txn_version_to_struct_count_for_gap_detector.insert(version, *count);
txn_version_to_struct_count.remove(&(version));
}
txn_version_to_struct_count_for_gap_detector
.entry(version)
.and_modify(|count| *count += 1)
.or_insert(1);
}
txn_version_to_struct_count_for_gap_detector
}
10 changes: 7 additions & 3 deletions rust/processor/src/bq_analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub struct ParquetProcessingResult {
pub start_version: i64,
pub end_version: i64,
pub last_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
pub txn_version_to_struct_count: AHashMap<i64, i64>,
pub txn_version_to_struct_count: Option<AHashMap<i64, i64>>,
// This is used to store the processed structs in the parquet file
pub parquet_processed_structs: Option<AHashMap<i64, i64>>,
pub table_name: String,
}

#[derive(Debug)]
Expand Down Expand Up @@ -115,13 +118,14 @@ where
"[Parquet Handler] Starting parquet handler loop",
);

let mut parquet_manager = GenericParquetHandler::new(
let mut parquet_handler = GenericParquetHandler::new(
bucket_name.clone(),
bucket_root.clone(),
new_gap_detector_sender.clone(),
ParquetType::schema(),
upload_interval,
max_buffer_size,
processor_name.clone(),
)
.expect("Failed to create parquet manager");

Expand All @@ -135,7 +139,7 @@ where
loop {
match parquet_receiver.recv().await {
Ok(txn_pb_res) => {
let result = parquet_manager.handle(&gcs_client, txn_pb_res).await;
let result = parquet_handler.handle(&gcs_client, txn_pb_res).await;

match result {
Ok(_) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,16 @@ impl Transaction {

if let Some(a) = block_metadata {
block_metadata_txns.push(a.clone());
// transaction_version_to_struct_count.entry(a.version).and_modify(|e| *e += 1);
}
wscs.append(&mut wsc_list);

if !wsc_list.is_empty() {
transaction_version_to_struct_count
.entry(wsc_list[0].txn_version)
.and_modify(|e| *e += wsc_list.len() as i64);
.entry(txn.txn_version)
.and_modify(|e| *e += wsc_list.len() as i64)
.or_insert(wsc_list.len() as i64);
}
wscs.append(&mut wsc_list);

wsc_details.append(&mut wsc_detail_list);
}
(txns, block_metadata_txns, wscs, wsc_details)
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/gap_detectors/gap_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
use ahash::AHashMap;
use anyhow::Result;

#[derive(Clone)]
pub struct DefaultGapDetector {
next_version_to_process: u64,
seen_versions: AHashMap<u64, DefaultProcessingResult>,
Expand Down
Loading

0 comments on commit a3a16af

Please sign in to comment.