Skip to content

Commit

Permalink
enable time based parquet upload for parquet processor (#444)
Browse files Browse the repository at this point in the history
* git enable time based parquet upload for parquet processor

* move time check logic into parquet manager

* remove state

* flatten result

* remove file and directly write buffer
  • Loading branch information
yuunlimm authored Jul 10, 2024
1 parent 7123a38 commit 21e7519
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 248 deletions.
4 changes: 0 additions & 4 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,3 @@ parquet_derive = { version = "52.0.0" }
canonical_json = "0.5.0"
allocative = "0.3.3"
allocative_derive = "0.3.3"
uuid = { version = "1.8.0", features = ["v4"] }
1 change: 0 additions & 1 deletion rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ parquet_derive = { workspace = true }
canonical_json = { workspace = true }
allocative = { workspace = true }
allocative_derive = { workspace = true }
uuid = { workspace = true }

[features]
libpq = ["diesel/postgres"]
Expand Down
19 changes: 3 additions & 16 deletions rust/processor/src/bq_analytics/gcs_handler.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,24 @@
use crate::bq_analytics::ParquetProcessorError;
use anyhow::{anyhow, Result};
use anyhow::Result;
use chrono::{Datelike, Timelike};
use google_cloud_storage::{
client::Client as GCSClient,
http::objects::upload::{Media, UploadObjectRequest, UploadType},
};
use hyper::Body;
use std::path::{Path, PathBuf};
use tokio::io::AsyncReadExt; // for read_to_end()
use tokio::{
fs::File as TokioFile,
time::{sleep, timeout, Duration},
};
use tokio::time::{sleep, timeout, Duration};
use tracing::{debug, error, info};
const MAX_RETRIES: usize = 3;
const INITIAL_DELAY_MS: u64 = 500;
const TIMEOUT_SECONDS: u64 = 300;
pub async fn upload_parquet_to_gcs(
client: &GCSClient,
file_path: &Path,
buffer: Vec<u8>,
table_name: &str,
bucket_name: &str,
bucket_root: &Path,
) -> Result<(), ParquetProcessorError> {
let mut file = TokioFile::open(&file_path)
.await
.map_err(|e| anyhow!("Failed to open file for reading: {}", e))?;

let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
.await
.map_err(|e| anyhow!("Failed to read file: {}", e))?;

if buffer.is_empty() {
error!("The file is empty and has no data to upload.",);
return Err(ParquetProcessorError::Other(
Expand Down
228 changes: 119 additions & 109 deletions rust/processor/src/bq_analytics/generic_parquet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,16 @@ use crate::{
};
use ahash::AHashMap;
use allocative::Allocative;
use anyhow::{anyhow, Result};
use anyhow::{Context, Result};
use google_cloud_storage::client::Client as GCSClient;
use parquet::{
file::{properties::WriterProperties, writer::SerializedFileWriter},
record::RecordWriter,
schema::types::Type,
};
use std::{
fs::{remove_file, rename, File},
path::PathBuf,
sync::Arc,
};
use tracing::{debug, error};
use uuid::Uuid;
use std::{path::PathBuf, sync::Arc, time::Instant};
use tokio::time::Duration;
use tracing::{debug, error, info};

#[derive(Debug, Default, Clone)]
pub struct ParquetDataGeneric<ParquetType> {
Expand Down Expand Up @@ -63,65 +59,53 @@ where
ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
pub schema: Arc<parquet::schema::types::Type>,
pub writer: SerializedFileWriter<File>,
pub schema: Arc<Type>,
pub writer: SerializedFileWriter<Vec<u8>>,
pub buffer: Vec<ParquetType>,
pub buffer_size_bytes: usize,

pub transaction_version_to_struct_count: AHashMap<i64, i64>,
pub bucket_name: String,
pub bucket_root: String,
pub gap_detector_sender: kanal::AsyncSender<ProcessingResult>,
pub file_path: String,
pub upload_interval: Duration,
pub max_buffer_size: usize,
pub last_upload_time: Instant,
}

fn create_new_writer(
file_path: &str,
schema: Arc<parquet::schema::types::Type>,
) -> Result<SerializedFileWriter<File>> {
fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {
let props = WriterProperties::builder()
.set_compression(parquet::basic::Compression::LZ4)
.build();
let props_arc = Arc::new(props);
let file: File = File::options()
.create(true)
.truncate(true)
.write(true)
.open(file_path)?;

Ok(SerializedFileWriter::new(
file.try_clone()?,
schema,
props_arc,
)?)

SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer")
}

impl<ParquetType> ParquetHandler<ParquetType>
where
ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable,
for<'a> &'a [ParquetType]: RecordWriter<ParquetType>,
{
fn create_new_writer(&self) -> Result<SerializedFileWriter<File>> {
let file_path = &self.file_path;
create_new_writer(file_path, self.schema.clone())
fn create_new_writer(&self) -> Result<SerializedFileWriter<Vec<u8>>> {
create_new_writer(self.schema.clone())
}

fn close_writer(&mut self) -> Result<()> {
let mut writer = self.create_new_writer()?;
std::mem::swap(&mut self.writer, &mut writer);
writer.close()?;
Ok(())
fn close_writer(&mut self) -> Result<SerializedFileWriter<Vec<u8>>> {
let new_writer = self.create_new_writer()?;
let old_writer = std::mem::replace(&mut self.writer, new_writer);
Ok(old_writer)
}

pub fn new(
bucket_name: String,
bucket_root: String,
gap_detector_sender: kanal::AsyncSender<ProcessingResult>,
schema: Arc<Type>,
upload_interval: Duration,
max_buffer_size: usize,
) -> Result<Self> {
// had to append unique id to avoid concurrent write issues
let file_path = format!("{}_{}.parquet", ParquetType::TABLE_NAME, Uuid::new_v4());
let writer = create_new_writer(&file_path, schema.clone())?;
let writer = create_new_writer(schema.clone())?;

Ok(Self {
writer,
Expand All @@ -132,15 +116,16 @@ where
bucket_root,
gap_detector_sender,
schema,
file_path,
upload_interval,
max_buffer_size,
last_upload_time: Instant::now(),
})
}

pub async fn handle(
&mut self,
gcs_client: &GCSClient,
changes: ParquetDataGeneric<ParquetType>,
max_buffer_size: usize,
) -> Result<()> {
let parquet_structs = changes.data;
self.transaction_version_to_struct_count
Expand All @@ -153,84 +138,109 @@ where
.set(size_of_struct as i64);
self.buffer_size_bytes += size_of_struct;
self.buffer.push(parquet_struct);
}

// for now, it's okay to go little above the buffer_size, given that we will keep max size as 200 MB
if self.buffer_size_bytes >= max_buffer_size {
let start_version = self.buffer.first().unwrap().version();
let last = self.buffer.last().unwrap();
let end_version = last.version();
let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp());

let txn_version_to_struct_count = process_struct_count_map(
&self.buffer,
&mut self.transaction_version_to_struct_count,
);

let new_file_path: PathBuf = PathBuf::from(format!(
"{}_{}.parquet",
ParquetType::TABLE_NAME,
Uuid::new_v4()
));
rename(&self.file_path, &new_file_path)?; // this fixes an issue with concurrent file access issues

let struct_buffer = std::mem::take(&mut self.buffer);

let mut row_group_writer = self.writer.next_row_group()?;
struct_buffer
.as_slice()
.write_to_row_group(&mut row_group_writer)
.unwrap();
row_group_writer.close()?;
self.close_writer()?;

debug!(
table_name = ParquetType::TABLE_NAME,
start_version = start_version,
end_version = end_version,
"Max buffer size reached, uploading to GCS."
);
let bucket_root = PathBuf::from(&self.bucket_root);
let upload_result = upload_parquet_to_gcs(
gcs_client,
&new_file_path,
ParquetType::TABLE_NAME,
&self.bucket_name,
&bucket_root,
)
.await;
self.buffer_size_bytes = 0;
remove_file(&new_file_path)?;

return match upload_result {
Ok(_) => {
let parquet_processing_result = ParquetProcessingResult {
start_version,
end_version,
last_transaction_timestamp: Some(last_transaction_timestamp),
txn_version_to_struct_count,
};

self.gap_detector_sender
.send(ProcessingResult::ParquetProcessingResult(
parquet_processing_result,
))
.await
.expect("[Parser] Failed to send versions to gap detector");
Ok(())
},
Err(e) => {
error!("Failed to upload file to GCS: {}", e);
Err(anyhow!("Failed to upload file to GCS: {}", e))
},
};
if self.buffer_size_bytes >= self.max_buffer_size {
info!("Max buffer size reached, uploading to GCS.");
if let Err(e) = self.upload_buffer(gcs_client).await {
error!("Failed to upload buffer: {}", e);
return Err(e);
}
self.last_upload_time = Instant::now();
}

if self.last_upload_time.elapsed() >= self.upload_interval {
info!(
"Time has elapsed more than {} since last upload.",
self.upload_interval.as_secs()
);
if let Err(e) = self.upload_buffer(gcs_client).await {
error!("Failed to upload buffer: {}", e);
return Err(e);
}
self.last_upload_time = Instant::now();
}
}

PARQUET_HANDLER_BUFFER_SIZE
.with_label_values(&[ParquetType::TABLE_NAME])
.set(self.buffer.len() as i64);
Ok(())
}

async fn upload_buffer(&mut self, gcs_client: &GCSClient) -> Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
let start_version = self
.buffer
.first()
.context("Buffer is not empty but has no first element")?
.version();
let last = self
.buffer
.last()
.context("Buffer is not empty but has no last element")?;
let end_version = last.version();
let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp());

let txn_version_to_struct_count =
process_struct_count_map(&self.buffer, &mut self.transaction_version_to_struct_count);

let struct_buffer = std::mem::take(&mut self.buffer);

let mut row_group_writer = self
.writer
.next_row_group()
.context("Failed to get row group")?;

struct_buffer
.as_slice()
.write_to_row_group(&mut row_group_writer)
.context("Failed to write to row group")?;
row_group_writer
.close()
.context("Failed to close row group")?;

let old_writer = self.close_writer().context("Failed to close writer")?;
let upload_buffer = old_writer
.into_inner()
.context("Failed to get inner buffer")?;

debug!(
table_name = ParquetType::TABLE_NAME,
start_version = start_version,
end_version = end_version,
"Max buffer size reached, uploading to GCS."
);
let bucket_root = PathBuf::from(&self.bucket_root);

upload_parquet_to_gcs(
gcs_client,
upload_buffer,
ParquetType::TABLE_NAME,
&self.bucket_name,
&bucket_root,
)
.await?;

self.buffer_size_bytes = 0;

let parquet_processing_result = ParquetProcessingResult {
start_version,
end_version,
last_transaction_timestamp: Some(last_transaction_timestamp),
txn_version_to_struct_count,
};

self.gap_detector_sender
.send(ProcessingResult::ParquetProcessingResult(
parquet_processing_result,
))
.await
.expect("[Parser] Failed to send versions to gap detector");

Ok(())
}
}

fn process_struct_count_map<ParquetType: NamedTable + HasVersion>(
Expand Down
Loading

0 comments on commit 21e7519

Please sign in to comment.