Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK-parquet] consolidated step #599

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter,
Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter,
)]
pub struct MoveResource {
pub txn_version: i64,
Expand Down
49 changes: 49 additions & 0 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,55 @@ impl ParquetTypeStructs {
ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()),
}
}

pub fn get_table_name(&self) -> &'static str {
match self {
ParquetTypeStructs::MoveResource(_) => "move_resources",
ParquetTypeStructs::WriteSetChange(_) => "write_set_changes",
ParquetTypeStructs::Transaction(_) => "parquet_transactions",
ParquetTypeStructs::TableItem(_) => "table_items",
ParquetTypeStructs::MoveModule(_) => "move_modules",
}
}
}

pub trait ParquetStruct {}

impl ParquetStruct for MoveResource {}
impl ParquetStruct for WriteSetChangeModel {}
impl ParquetStruct for ParquetTransaction {}
impl ParquetStruct for TableItem {}
impl ParquetStruct for MoveModule {}

impl ParquetTypeStructs {
pub fn get_type(&self) -> ParquetTypeEnum {
match self {
ParquetTypeStructs::MoveResource(_) => ParquetTypeEnum::MoveResource,
ParquetTypeStructs::WriteSetChange(_) => ParquetTypeEnum::WriteSetChange,
ParquetTypeStructs::Transaction(_) => ParquetTypeEnum::Transaction,
ParquetTypeStructs::TableItem(_) => ParquetTypeEnum::TableItem,
ParquetTypeStructs::MoveModule(_) => ParquetTypeEnum::MoveModule,
}
}

/// Get a vector of trait object references to the inner structs
pub fn get_structs(&self) -> Vec<&dyn ParquetStruct> {
match self {
ParquetTypeStructs::MoveResource(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
ParquetTypeStructs::WriteSetChange(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
ParquetTypeStructs::Transaction(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
ParquetTypeStructs::TableItem(v) => v.iter().map(|s| s as &dyn ParquetStruct).collect(),
ParquetTypeStructs::MoveModule(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
}
}
}

#[cfg(test)]
Expand Down
117 changes: 44 additions & 73 deletions rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use crate::{
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
processor_config::ProcessorConfig,
},
parquet_processors::ParquetTypeEnum,
steps::parquet_default_processor::{
generic_parquet_buffer_handler::GenericParquetBufferHandler,
gcs_handler::{create_new_writer, GCSUploader},
parquet_default_extractor::ParquetDefaultExtractor,
size_buffer::SizeBufferStep,
},
utils::{
chain_id::check_or_update_chain_id,
Expand All @@ -16,29 +18,24 @@ use crate::{
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::{
timed_size_buffer_step::{TableConfig, TimedSizeBufferStep},
TransactionStreamStep,
},
test::steps::pass_through_step::PassThroughStep,
traits::{processor_trait::ProcessorTrait, IntoRunnableStep, RunnableAsyncStep},
common_steps::TransactionStreamStep,
traits::{processor_trait::ProcessorTrait, IntoRunnableStep},
};
use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig};
use parquet::record::RecordWriter;
use processor::db::common::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
use parquet::schema::types::Type;
use processor::{
bq_analytics::generic_parquet_processor::HasParquetSchema,
db::common::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
},
};
use std::{sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};
use aptos_indexer_processor_sdk::common_steps::{DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, VersionTrackerStep};
use tracing::{debug, info};
use std::collections::HashMap;
use crate::parquet_processors::ParquetTypeEnum;
use parquet::schema::types::Type;
use crate::steps::parquet_default_processor::gcs_handler::create_new_writer;
use crate::steps::parquet_default_processor::gcs_handler::GCSUploader;
use processor::bq_analytics::generic_parquet_processor::HasParquetSchema;
use crate::steps::parquet_default_processor::size_buffer::SizeBufferStep;
use crate::steps::common::processor_status_saver::get_parquet_processor_status_saver;
use crate::steps::parquet_default_processor::parquet_version_tracker_step::ParquetVersionTrackerStep;


const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";
Expand Down Expand Up @@ -73,14 +70,6 @@ impl ParquetDefaultProcessor {
}
}

type Input = (
Vec<ParquetTransaction>,
Vec<MoveResource>,
Vec<WriteSetChangeModel>,
Vec<TableItem>,
Vec<MoveModule>,
);

#[async_trait::async_trait]
impl ProcessorTrait for ParquetDefaultProcessor {
fn name(&self) -> &'static str {
Expand All @@ -102,6 +91,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
// Determine the processing mode (backfill or regular)
let is_backfill = self.config.backfill_config.is_some();

// TODO: Revisit when parquet verstion tracker is available.
// Query the starting version
let starting_version = if is_backfill {
get_starting_version(&self.config, self.db_pool.clone()).await?
Expand All @@ -117,6 +107,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
self.config.processor_config.name()
)
})?;

get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names)
.await?
};
Expand Down Expand Up @@ -174,16 +165,24 @@ impl ProcessorTrait for ParquetDefaultProcessor {

let parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>> = [
(ParquetTypeEnum::MoveResource, MoveResource::schema()),
(ParquetTypeEnum::WriteSetChange, WriteSetChangeModel::schema()),
(
ParquetTypeEnum::WriteSetChange,
WriteSetChangeModel::schema(),
),
(ParquetTypeEnum::Transaction, ParquetTransaction::schema()),
(ParquetTypeEnum::TableItem, TableItem::schema()),
(ParquetTypeEnum::MoveModule, MoveModule::schema()),
].into_iter().collect();

let parquet_type_to_writer = parquet_type_to_schemas.iter().map(|(key, schema)| {
let writer = create_new_writer(schema.clone()).expect("Failed to create writer");
(*key, writer)
}).collect();
]
.into_iter()
.collect();

let parquet_type_to_writer = parquet_type_to_schemas
.iter()
.map(|(key, schema)| {
let writer = create_new_writer(schema.clone()).expect("Failed to create writer");
(*key, writer)
})
.collect();

let buffer_uploader = GCSUploader::new(
gcs_client.clone(),
Expand All @@ -198,16 +197,22 @@ impl ProcessorTrait for ParquetDefaultProcessor {

let default_size_buffer_step = SizeBufferStep::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
buffer_uploader
buffer_uploader,
);

let parquet_version_tracker_step = ParquetVersionTrackerStep::new(
get_parquet_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(parquet_default_extractor.into_runnable_step(), channel_size)
.connect_to(default_size_buffer_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);
.connect_to(parquet_default_extractor.into_runnable_step(), channel_size)
.connect_to(default_size_buffer_step.into_runnable_step(), channel_size)
.connect_to(parquet_version_tracker_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

// (Optional) Parse the results
loop {
Expand All @@ -226,37 +231,3 @@ impl ProcessorTrait for ParquetDefaultProcessor {
}
}
}

pub trait ExtractResources<ParquetType> {
fn extract(self) -> Vec<ParquetType>;
}

impl ExtractResources<ParquetTransaction> for Input {
fn extract(self) -> Vec<ParquetTransaction> {
self.0
}
}

impl ExtractResources<MoveResource> for Input {
fn extract(self) -> Vec<MoveResource> {
self.1
}
}

impl ExtractResources<WriteSetChangeModel> for Input {
fn extract(self) -> Vec<WriteSetChangeModel> {
self.2
}
}

impl ExtractResources<TableItem> for Input {
fn extract(self) -> Vec<TableItem> {
self.3
}
}

impl ExtractResources<MoveModule> for Input {
fn extract(self) -> Vec<MoveModule> {
self.4
}
}
Loading
Loading