Skip to content

Commit

Permalink
use canonical json for payload
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed May 31, 2024
1 parent bafaa5c commit ab36ec5
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 84 deletions.
14 changes: 14 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,5 @@ num = "0.4.0"
google-cloud-storage = "0.13.0"
hyper = { version = "0.14.18", features = ["full"] }
lazy_static = "1.4.0"
parquet_derive = { version = "51.0.0" }
parquet_derive = { version = "51.0.0" }
canonical_json = "0.5.0"
1 change: 1 addition & 0 deletions rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ google-cloud-storage = { workspace = true }
hyper = { workspace = true }
lazy_static = { workspace = true }
parquet_derive = { workspace = true }
canonical_json = { workspace = true }

[features]
libpq = ["diesel/postgres"]
Expand Down
3 changes: 3 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct IndexerGrpcProcessorConfig {
pub transaction_filter: TransactionFilter,

pub parquet_bucket_name: Option<String>,

pub is_parquet_processor: Option<bool>,
}

impl IndexerGrpcProcessorConfig {
Expand Down Expand Up @@ -120,6 +122,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
self.grpc_response_item_timeout_in_secs,
gcs_client,
self.parquet_bucket_name.clone(),
self.is_parquet_processor,
)
.await
.context("Failed to build worker")?;
Expand Down
4 changes: 4 additions & 0 deletions rust/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ extern crate parquet;
#[macro_use]
extern crate parquet_derive;

#[macro_use]
extern crate canonical_json;


pub use config::IndexerGrpcProcessorConfig;

mod config;
Expand Down
11 changes: 7 additions & 4 deletions rust/processor/src/models/default_models/parquet_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,12 @@ impl Transaction {
let payload_cleaned = get_clean_payload(payload, version);
let payload_type = get_payload_type(payload);

let serialized_payload = serde_json::to_string(&payload_cleaned).unwrap(); // Handle errors as needed
// let serialized_payload = serde_json::to_string(&payload_cleaned).unwrap(); // Handle errors as needed)
let serialized_payload = payload_cleaned.map(|payload| canonical_json::to_string(&payload).unwrap());
(
Self::from_transaction_info_with_data(
transaction_info,
Some(serialized_payload),
serialized_payload,
Some(payload_type),
version,
transaction_type,
Expand All @@ -238,12 +239,14 @@ impl Transaction {
let payload = genesis_txn.payload.as_ref().unwrap();
let payload_cleaned = get_clean_writeset(payload, version);
// It's genesis so no big deal
let serialized_payload = serde_json::to_string(&payload_cleaned).unwrap(); // Handle errors as needed
// let serialized_payload = serde_json::to_string(&payload_cleaned).unwrap(); // Handle errors as needed
let serialized_payload = payload_cleaned.map(|payload| canonical_json::to_string(&payload).unwrap());

let payload_type = None;
(
Self::from_transaction_info_with_data(
transaction_info,
Some(serialized_payload),
serialized_payload,
payload_type,
version,
transaction_type,
Expand Down
72 changes: 24 additions & 48 deletions rust/processor/src/parquet_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
};

use ahash::AHashMap;
use tracing::{error, info};
use tracing::{info};
use google_cloud_storage::{
client::Client as GCSClient,
};
Expand All @@ -22,7 +22,7 @@ pub async fn create_parquet_handler_loop(
let processor_name = processor_name.to_owned();

for (parquet_type, receiver) in parquet_type_to_receiver {
let mut parquet_manager = build_parquet_manager(new_gap_detector_sender.clone(), bucket_name.clone(), parquet_type.clone());
let parquet_manager = build_parquet_manager(new_gap_detector_sender.clone(), bucket_name.clone(), parquet_type.clone());
info!(
processor_name = processor_name.clone(),
service_type = PROCESSOR_SERVICE_TYPE,
Expand All @@ -34,52 +34,7 @@ pub async fn create_parquet_handler_loop(
let processor_name = processor_name.clone();
// spawan a thread per struct and per struct , we know the type of the struct and we can write it to the parquet file.
tokio::spawn(async move {
loop {
let txn_pb_res = match receiver.recv().await {
Ok(txn_pb_res) => {
info!(
processor_name = processor_name.clone(),
service_type = PROCESSOR_SERVICE_TYPE,
"[Parquet Handler] Received transaction data from processor",
);
txn_pb_res
},
Err(_e) => {
error!(
processor_name = processor_name.clone(),
service_type = PROCESSOR_SERVICE_TYPE,
error = ?_e,
"[Parquet Handler] Parquet handler channel has been closed",
);
ParquetData { // maybe add a flag that can tell it's empty
data: Vec::new(),
last_transaction_timestamp: None,
transaction_version_to_struct_count: AHashMap::new(),
}
},
};

let result = parquet_manager.handle_transaction(&gcs_client, txn_pb_res).await;

match result {
Ok(result) => {
info!(
start_version = result.start_version,
end_version = result.end_version,
"[Parquet Handler] successfully processed transactions");
},
Err(e) => {
error!(
processor_name = processor_name.clone(),
service_type = PROCESSOR_SERVICE_TYPE,
error = ?e,
"[Parquet Handler] Error processing parquet files",
);
panic!("[Parquet Handler] Error processing parquet files");
},
};

}
process_transactions(receiver, parquet_manager, processor_name, gcs_client).await;
});
}
}
Expand All @@ -95,4 +50,25 @@ pub fn build_parquet_manager(
"write_set_changes" => ParquetManager::WriteSetChangesParquetHandler(WriteSetChangesParquetHandler::new(bucket_name.clone(), new_gap_detector_sender.clone())),
_ => panic!("Invalid parquet type"),
}
}

async fn process_transactions(receiver: kanal::AsyncReceiver<ParquetData>, mut parquet_manager: ParquetManager, _processor_name: String, gcs_client: Arc<GCSClient>) {
loop {
let txn_pb_res = receiver.recv().await.unwrap_or_else(|_e| {
// log_error(&processor_name, &e.to_string());
ParquetData {
data: Vec::new(),
last_transaction_timestamp: None,
transaction_version_to_struct_count: AHashMap::new(),
}
});

let result = parquet_manager.handle_transaction(&gcs_client, txn_pb_res).await;
if let Err(_e) = result {
// log_error(&processor_name, &e.to_string());
panic!("[Parquet Handler] Error processing parquet files");
} else {
// log_info(&processor_name, &result.unwrap());
}
}
}
47 changes: 16 additions & 31 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct Worker {
pub grpc_response_item_timeout_in_secs: u64,
pub gcs_client: Arc<GCSClient>,
pub bucket_name: Option<String>,
pub is_parquet_processor: Option<bool>,
}

impl Worker {
Expand All @@ -77,6 +78,7 @@ impl Worker {
grpc_response_item_timeout_in_secs: u64,
gcs_client: Arc<GCSClient>,
bucket_name: Option<String>,
is_parquet_processor: Option<bool>,
) -> Result<Self> {
let processor_name = processor_config.name();
info!(processor_name = processor_name, "[Parser] Kicking off");
Expand Down Expand Up @@ -114,7 +116,8 @@ impl Worker {
transaction_filter,
grpc_response_item_timeout_in_secs,
gcs_client,
bucket_name
bucket_name,
is_parquet_processor,
})
}

Expand Down Expand Up @@ -228,35 +231,29 @@ impl Worker {
.await
});

// Create a channel for resource types to send parquet data to the parquet handler
let _parquet_sender_and_receiver = build_async_senders_and_receivers_for_processors(&self.processor_config);
let mut parquet_type_to_sender: AHashMap<String, kanal::AsyncSender<ParquetData>> = AHashMap::new();
let mut parquet_type_to_receiver: AHashMap<String, kanal::AsyncReceiver<ParquetData>> = AHashMap::new();
for resource_type in RESOURCE_TYPES {
for resource_type in RESOURCE_TYPES {
let (sender, receiver) = kanal::bounded_async::<ParquetData>(BUFFER_SIZE);
parquet_type_to_sender.insert(resource_type.to_string(), sender);
parquet_type_to_receiver.insert(resource_type.to_string(), receiver);
}

info!("parquet_type_to_sender size: {}", parquet_type_to_sender.len());
info!("parquet_type_to_receiver size: {}", parquet_type_to_receiver.len());
// maybe we can check the config here as well and build a vector of asyncSender/receivers here so that they can be passed to the processor.


// Create a gap detector task that will panic if there is a gap in the processing
let (gap_detector_sender, gap_detector_receiver) =
kanal::bounded_async::<ProcessingResult>(BUFFER_SIZE);
let (new_gap_detector_sender, new_gap_detector_receiver) =
kanal::bounded_async::<ParquetProcessingResult>(BUFFER_SIZE);

if processor_name == "parquet_processor" {
let parquet_gap_detection_batch_size = self.parquet_gap_detection_batch_size;
let processor = build_processor(
&self.processor_config,
self.per_table_chunk_sizes.clone(),
self.db_pool.clone(),
parquet_type_to_sender.clone(),
);
// TOOD: this will be removed once we have a new gap detector logic added to the existing gap detector
let processor = build_processor(
&self.processor_config,
self.per_table_chunk_sizes.clone(),
self.db_pool.clone(),
parquet_type_to_sender.clone(),
);
if self.is_parquet_processor.unwrap_or(false) {
let parquet_gap_detection_batch_size: u64 = self.parquet_gap_detection_batch_size;
tokio::spawn(async move {
crate::parquet_gap_detector::create_parquet_file_gap_detector_status_tracker_loop(
new_gap_detector_receiver,
Expand All @@ -268,14 +265,6 @@ impl Worker {
});
} else {
let gap_detection_batch_size = self.gap_detection_batch_size;
let processor = build_processor(
&self.processor_config,
self.per_table_chunk_sizes.clone(),
self.db_pool.clone(),
parquet_type_to_sender.clone(),
);
// per processor, we know how many sturcts we can get out of

tokio::spawn(async move {
crate::gap_detector::create_gap_detector_status_tracker_loop(
gap_detector_receiver,
Expand All @@ -288,8 +277,6 @@ impl Worker {
}

let gcs_client = self.gcs_client.clone();
info!("bucket_name: {:?}", self.bucket_name.clone().unwrap().to_string());
// refactor this to send all channels to the parquet manager for each processor
let bucket_name = self.bucket_name.clone().unwrap().to_string();

tokio::spawn(async move {
Expand Down Expand Up @@ -327,7 +314,7 @@ impl Worker {
receiver.clone(),
gap_detector_sender.clone(),
parquet_type_to_sender.clone(),
processor_name == "parquet_processor",
self.is_parquet_processor.unwrap_or(false),
)
.await;
processor_tasks.push(join_handle);
Expand Down Expand Up @@ -786,7 +773,6 @@ pub fn build_processor(
config: &ProcessorConfig,
per_table_chunk_sizes: AHashMap<String, usize>,
db_pool: PgDbPool,
// parquet_senders: Vec<kanal::AsyncSender<ParquetData>>,
parquet_type_to_sender: AHashMap<String, kanal::AsyncSender<ParquetData>>,
) -> Processor {
match config {
Expand Down Expand Up @@ -850,12 +836,11 @@ pub fn build_processor(
}
}


pub fn build_async_senders_and_receivers_for_processors(config: &ProcessorConfig) -> Vec<(kanal::AsyncSender<ParquetData>, kanal::AsyncReceiver<ParquetData>)> {
let mut senders_and_receivers: Vec<(kanal::AsyncSender<ParquetData>, kanal::AsyncReceiver<ParquetData>)> = Vec::new();
match config {
ProcessorConfig::ParquetProcessor(_) => {
for _ in 0..2 { // TODO: fix so that it's not hardcoded
for _ in 0..RESOURCE_TYPES.len() { // TODO: fix so that it's not hardcoded
senders_and_receivers.push(kanal::bounded_async::<ParquetData>(100));
}
},
Expand Down

0 comments on commit ab36ec5

Please sign in to comment.