Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Parquet Events Processor to SDK #618

Merged
merged 10 commits into from
Dec 5, 2024
5 changes: 4 additions & 1 deletion rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ bitflags! {

// User transaction
const SIGNATURES = 1 << 23;

// More tables
const CURRENT_TABLE_ITEMS = 1 << 24;
const BLOCK_METADATA_TRANSACTIONS = 1 << 25;

// Events
const EVENTS = 1 << 24;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 24 is used by current table items. maybe this will likely happen again, should we assign specific range to each processor to prevent this conflict? like 20-30 -> default processor 30-40 -> fa. since it will be less than 10 tables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. that's a good idea.

}
Expand Down
11 changes: 11 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
dermanyang marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ use processor::{
parquet_write_set_changes::WriteSetChangeModel,
},
event_models::parquet_events::Event as EventPQ,
db::parquet::models::default_models::{
parquet_block_metadata_transactions::BlockMetadataTransaction,
parquet_move_modules::MoveModule,
parquet_move_resources::MoveResource,
parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata},
parquet_transactions::Transaction,
parquet_write_set_changes::WriteSetChangeModel,
event_models::parquet_events::Event as EventPQ,
},
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -119,6 +127,9 @@ impl ProcessorConfig {
TableItem::TABLE_NAME.to_string(),
MoveModule::TABLE_NAME.to_string(),
EventPQ::TABLE_NAME.to_string(),
BlockMetadataTransaction::TABLE_NAME.to_string(),
CurrentTableItem::TABLE_NAME.to_string(),
TableMetadata::TABLE_NAME.to_string(),
]),
_ => HashSet::new(), // Default case for unsupported processors
}
Expand Down
75 changes: 59 additions & 16 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use parquet::schema::types::Type;
use processor::{
db::parquet::models::{
default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem,
parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
parquet_block_metadata_transactions::BlockMetadataTransaction,
parquet_move_modules::MoveModule,
parquet_move_resources::MoveResource,
parquet_move_tables::{CurrentTableItem, TableItem},
parquet_table_metadata::TableMetadata,
parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
},
event_models::parquet_events::Event,
},
worker::TableFlags,
};
#[allow(unused_imports)]
Expand Down Expand Up @@ -63,6 +65,15 @@ pub enum ParquetTypeEnum {
TableItem,
MoveModule,
Event,
MoveResources,
WriteSetChanges,
Transactions,
TableItems,
MoveModules,
CurrentTableItems,
BlockMetadataTransactions,
TableMetadata,
Event,
}

/// Trait for handling various Parquet types.
Expand Down Expand Up @@ -108,13 +119,19 @@ macro_rules! impl_parquet_trait {
}

// Apply macro to supported types
impl_parquet_trait!(MoveResource, ParquetTypeEnum::MoveResource);
impl_parquet_trait!(WriteSetChangeModel, ParquetTypeEnum::WriteSetChange);
impl_parquet_trait!(ParquetTransaction, ParquetTypeEnum::Transaction);
impl_parquet_trait!(TableItem, ParquetTypeEnum::TableItem);
impl_parquet_trait!(MoveModule, ParquetTypeEnum::MoveModule);
impl_parquet_trait!(MoveResource, ParquetTypeEnum::MoveResources);
impl_parquet_trait!(WriteSetChangeModel, ParquetTypeEnum::WriteSetChanges);
impl_parquet_trait!(ParquetTransaction, ParquetTypeEnum::Transactions);
impl_parquet_trait!(TableItem, ParquetTypeEnum::TableItems);
impl_parquet_trait!(MoveModule, ParquetTypeEnum::MoveModules);
impl_parquet_trait!(CurrentTableItem, ParquetTypeEnum::CurrentTableItems);
impl_parquet_trait!(
BlockMetadataTransaction,
ParquetTypeEnum::BlockMetadataTransactions
);
impl_parquet_trait!(TableMetadata, ParquetTypeEnum::TableMetadata);
impl_parquet_trait!(Event, ParquetTypeEnum::Event);

#[derive(Debug, Clone)]
#[enum_dispatch(ParquetTypeTrait)]
pub enum ParquetTypeStructs {
Expand All @@ -124,16 +141,24 @@ pub enum ParquetTypeStructs {
TableItem(Vec<TableItem>),
MoveModule(Vec<MoveModule>),
Event(Vec<Event>),
CurrentTableItem(Vec<CurrentTableItem>),
BlockMetadataTransaction(Vec<BlockMetadataTransaction>),
TableMetadata(Vec<TableMetadata>),
}

impl ParquetTypeStructs {
pub fn default_for_type(parquet_type: &ParquetTypeEnum) -> Self {
match parquet_type {
ParquetTypeEnum::MoveResource => ParquetTypeStructs::MoveResource(Vec::new()),
ParquetTypeEnum::WriteSetChange => ParquetTypeStructs::WriteSetChange(Vec::new()),
ParquetTypeEnum::Transaction => ParquetTypeStructs::Transaction(Vec::new()),
ParquetTypeEnum::TableItem => ParquetTypeStructs::TableItem(Vec::new()),
ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()),
ParquetTypeEnum::MoveResources => ParquetTypeStructs::MoveResource(Vec::new()),
ParquetTypeEnum::WriteSetChanges => ParquetTypeStructs::WriteSetChange(Vec::new()),
ParquetTypeEnum::Transactions => ParquetTypeStructs::Transaction(Vec::new()),
ParquetTypeEnum::TableItems => ParquetTypeStructs::TableItem(Vec::new()),
ParquetTypeEnum::MoveModules => ParquetTypeStructs::MoveModule(Vec::new()),
ParquetTypeEnum::CurrentTableItems => ParquetTypeStructs::CurrentTableItem(Vec::new()),
ParquetTypeEnum::BlockMetadataTransactions => {
ParquetTypeStructs::BlockMetadataTransaction(Vec::new())
},
ParquetTypeEnum::TableMetadata => ParquetTypeStructs::TableMetadata(Vec::new()),
ParquetTypeEnum::Event => ParquetTypeStructs::Event(Vec::new()),
}
}
Expand Down Expand Up @@ -201,6 +226,24 @@ impl ParquetTypeStructs {
handle_append!(self_data, other_data)
},
(ParquetTypeStructs::Event(self_data), ParquetTypeStructs::Event(other_data)) => {
handle_append!(self_data, other_data)
},
(
ParquetTypeStructs::CurrentTableItem(self_data),
ParquetTypeStructs::CurrentTableItem(other_data),
) => {
handle_append!(self_data, other_data)
},
(
ParquetTypeStructs::BlockMetadataTransaction(self_data),
ParquetTypeStructs::BlockMetadataTransaction(other_data),
) => {
handle_append!(self_data, other_data)
},
(
ParquetTypeStructs::TableMetadata(self_data),
ParquetTypeStructs::TableMetadata(other_data),
) => {
handle_append!(self_data, other_data)
},
_ => Err(ProcessorError::ProcessError {
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.