-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SDK-parquet] parquet sized buffer and gcs handler #602
[SDK-parquet] parquet sized buffer and gcs handler #602
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
a0cf889
to
1f2a615
Compare
9bff5f3
to
9c6dc91
Compare
0592350
to
4fc6f8c
Compare
472b366
to
daf3f6e
Compare
4fc6f8c
to
0d05560
Compare
daf3f6e
to
f77aaac
Compare
0d05560
to
9a4a1e2
Compare
f77aaac
to
eb396de
Compare
9a4a1e2
to
2514fd7
Compare
2f91ffa
to
2dfdb15
Compare
c2e7723
to
6f12cd4
Compare
2dfdb15
to
1bc8672
Compare
4d7551c
to
c59ea51
Compare
ef311b0
to
8f43e54
Compare
c59ea51
to
f1ca3f9
Compare
f1ca3f9
to
4d56db7
Compare
rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs
Outdated
Show resolved
Hide resolved
rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs
Outdated
Show resolved
Hide resolved
rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs
Outdated
Show resolved
Hide resolved
rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs
Outdated
Show resolved
Hide resolved
rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs
Outdated
Show resolved
Hide resolved
rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs
Outdated
Show resolved
Hide resolved
rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs
Outdated
Show resolved
Hide resolved
rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs
Outdated
Show resolved
Hide resolved
rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs
Outdated
Show resolved
Hide resolved
|
||
#[async_trait] | ||
pub trait Uploadable { | ||
async fn handle_buffer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be called upload_buffer
? handle
is very generic-sounding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, upload makes more sense!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like this was addressed so opening back up!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops I think Imissed, updated!
d0a5380
to
946b726
Compare
|
||
#[tokio::test] | ||
#[allow(clippy::needless_return)] | ||
async fn test_parquet_buffer_step_trigger_upload() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
pub fn update_current_batch_metadata(&mut self, cur_batch_metadata: &TransactionMetadata) { | ||
if let Some(buffer_metadata) = &mut self.current_batch_metadata { | ||
// Update metadata fields with the current batch's end information | ||
buffer_metadata.end_version = cur_batch_metadata.end_version; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we set, the new versions, should we add some validation that there are not gaps in data (buffer_metadata.end_version + 1 = cur_batch_metadata.start_version? It seems like everything is parsed in order 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it shouldn't happen but it's good to add a safety here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to avoid panics if possible and return a Result
{ | ||
internal_buffers: HashMap<ParquetTypeEnum, ParquetBuffer>, | ||
pub poll_interval: Duration, | ||
pub buffer_uploader: U, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does buffer_uploader
need to be a trait? It looks like the type will always be GCSUploader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, we can remove the generics
// if it wasn't uploaded -> we update only end_version, size, and last timestamp | ||
if file_uploaded { | ||
if let Some(buffer_metadata) = &mut buffer.current_batch_metadata { | ||
buffer_metadata.start_version = cur_batch_metadata.start_version; | ||
buffer_metadata.start_transaction_timestamp = | ||
cur_batch_metadata.start_transaction_timestamp.clone(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still necessary if we already did line 147?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it's slightly different b/c here we are updating the start_version and start_trxn_timestamp, which only needs to be updated after we upload the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WOuldn't 2nd part of the function (lines 45-46) take care of updating after uploading the file too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh right, if we set none
after upload, it will be handled when we actually handle the current batch.
e939fee
to
2198641
Compare
c7c2586
to
168ac53
Compare
4455b67
to
61ecc68
Compare
168ac53
to
40b32d4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good!!
pub fn update_current_batch_metadata(&mut self, cur_batch_metadata: &TransactionMetadata) { | ||
if let Some(buffer_metadata) = &mut self.current_batch_metadata { | ||
// Update metadata fields with the current batch's end information | ||
buffer_metadata.end_version = cur_batch_metadata.end_version; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to avoid panics if possible and return a Result
40b32d4
to
191f514
Compare
191f514
to
55389e1
Compare
61ecc68
to
552ecc7
Compare
55389e1
to
6c2a18e
Compare
updated test with concrete GCSUploader instance, verified that it won't upload anything since data is empty. |
6c2a18e
to
d8954bd
Compare
d8954bd
to
b482e4e
Compare
add a parquet specific step that stores parquet structs in a buffer and triggers uploads using gcs_handler which can handle any of parquet types defined in the enum