Skip to content

Commit

Permalink
add temp commit for version tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 12, 2024
1 parent a7860a0 commit 4f4f188
Show file tree
Hide file tree
Showing 5 changed files with 411 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ use processor::{
},
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use aptos_indexer_processor_sdk::common_steps::{DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, VersionTrackerStep};
use tracing::{debug, info};
use crate::steps::common::processor_status_saver::get_parquet_processor_status_saver;
use crate::steps::parquet_default_processor::parquet_version_tracker_step::ParquetVersionTrackerStep;


const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

Expand Down Expand Up @@ -196,12 +200,18 @@ impl ProcessorTrait for ParquetDefaultProcessor {
buffer_uploader,
);

let parquet_version_tracker_step = ParquetVersionTrackerStep::new(
get_parquet_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(parquet_default_extractor.into_runnable_step(), channel_size)
.connect_to(default_size_buffer_step.into_runnable_step(), channel_size)
.connect_to(parquet_version_tracker_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

// (Optional) Parse the results
Expand Down
168 changes: 167 additions & 1 deletion rust/sdk-processor/src/steps/common/processor_status_saver.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use crate::{
config::indexer_processor_config::IndexerProcessorConfig,
db::common::models::{
Expand All @@ -15,6 +16,8 @@ use aptos_indexer_processor_sdk::{
use async_trait::async_trait;
use diesel::{upsert::excluded, ExpressionMethods};
use processor::schema::{backfill_processor_status, processor_status};
use crate::steps::parquet_default_processor::parquet_version_tracker_step::ParquetProcessorStatusSaver;


pub fn get_processor_status_saver(
conn_pool: ArcDbPool,
Expand All @@ -40,6 +43,29 @@ pub fn get_processor_status_saver(
}
}

pub fn get_parquet_processor_status_saver(
conn_pool: ArcDbPool,
config: IndexerProcessorConfig
) -> ParquetProcessorStatusSaverEnum {
if let Some(backfill_config) = config.backfill_config {
let txn_stream_cfg = config.transaction_stream_config;
let backfill_start_version = txn_stream_cfg.starting_version;
let backfill_end_version = txn_stream_cfg.request_ending_version;
let backfill_alias = backfill_config.backfill_alias.clone();
ParquetProcessorStatusSaverEnum::Backfill {
conn_pool,
backfill_alias,
backfill_start_version,
backfill_end_version,
}
} else {
let processor_name = config.processor_config.name().to_string();
ParquetProcessorStatusSaverEnum::Default {
conn_pool,
processor_name,
}
}}

pub enum ProcessorStatusSaverEnum {
Default {
conn_pool: ArcDbPool,
Expand All @@ -51,7 +77,116 @@ pub enum ProcessorStatusSaverEnum {
backfill_start_version: Option<u64>,
backfill_end_version: Option<u64>,
},
// Parquet {}
}

pub enum ParquetProcessorStatusSaverEnum {
Default {
conn_pool: ArcDbPool,
processor_name: String,
},
Backfill {
conn_pool: ArcDbPool,
backfill_alias: String,
backfill_start_version: Option<u64>,
backfill_end_version: Option<u64>,
},
}

#[async_trait]
impl ParquetProcessorStatusSaver for ParquetProcessorStatusSaverEnum {
async fn save_parquet_processor_status(
&self,
last_success_batch: &TransactionContext<()>,
table_name: &str,
) -> Result<(), ProcessorError> {
let end_timestamp = last_success_batch
.metadata
.end_transaction_timestamp
.as_ref()
.map(|t| parse_timestamp(t, last_success_batch.metadata.end_version as i64))
.map(|t| t.naive_utc());
match self {
ParquetProcessorStatusSaverEnum::Default {
conn_pool,
processor_name,
} => {
let status = ProcessorStatus {
processor: processor_name.clone() + table_name,
last_success_version: last_success_batch.metadata.end_version as i64,
last_transaction_timestamp: end_timestamp,
};

// Save regular processor status to the database
execute_with_better_error(
conn_pool.clone(),
diesel::insert_into(processor_status::table)
.values(&status)
.on_conflict(processor_status::processor)
.do_update()
.set((
processor_status::last_success_version
.eq(excluded(processor_status::last_success_version)),
processor_status::last_updated.eq(excluded(processor_status::last_updated)),
processor_status::last_transaction_timestamp
.eq(excluded(processor_status::last_transaction_timestamp)),
)),
Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "),
)
.await?;

Ok(())
},
ParquetProcessorStatusSaverEnum::Backfill {
conn_pool,
backfill_alias,
backfill_start_version,
backfill_end_version,
} => {
let lst_success_version = last_success_batch.metadata.end_version as i64;
let backfill_status = if backfill_end_version.is_some_and(|backfill_end_version| {
lst_success_version >= backfill_end_version as i64
}) {
BackfillStatus::Complete
} else {
BackfillStatus::InProgress
};
let backfill_end_version_mapped = backfill_end_version.map(|v| v as i64);
let status = BackfillProcessorStatus {
backfill_alias: backfill_alias.clone(),
backfill_status,
last_success_version: lst_success_version,
last_transaction_timestamp: end_timestamp,
backfill_start_version: backfill_start_version.unwrap_or(0) as i64,
backfill_end_version: backfill_end_version_mapped,
};
execute_with_better_error(
conn_pool.clone(),
diesel::insert_into(backfill_processor_status::table)
.values(&status)
.on_conflict(backfill_processor_status::backfill_alias)
.do_update()
.set((
backfill_processor_status::backfill_status
.eq(excluded(backfill_processor_status::backfill_status)),
backfill_processor_status::last_success_version
.eq(excluded(backfill_processor_status::last_success_version)),
backfill_processor_status::last_updated
.eq(excluded(backfill_processor_status::last_updated)),
backfill_processor_status::last_transaction_timestamp.eq(excluded(
backfill_processor_status::last_transaction_timestamp,
)),
backfill_processor_status::backfill_start_version
.eq(excluded(backfill_processor_status::backfill_start_version)),
backfill_processor_status::backfill_end_version
.eq(excluded(backfill_processor_status::backfill_end_version)),
)),
Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "),
)
.await?;
Ok(())
},
}
}
}

#[async_trait]
Expand Down Expand Up @@ -146,6 +281,37 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum {
.await?;
Ok(())
},
// ProcessorStatusSaverEnum::Parquet {
// conn_pool,
// processor_name,
// } => {
// // TODO: rename to _table
// let status = ProcessorStatus {
// processor: processor_name.clone(),
// last_success_version: last_success_batch.metadata.end_version as i64,
// last_transaction_timestamp: end_timestamp,
// };
//
// // Save regular processor status to the database
// execute_with_better_error(
// conn_pool.clone(),
// diesel::insert_into(processor_status::table)
// .values(&status)
// .on_conflict(processor_status::processor)
// .do_update()
// .set((
// processor_status::last_success_version
// .eq(excluded(processor_status::last_success_version)),
// processor_status::last_updated.eq(excluded(processor_status::last_updated)),
// processor_status::last_transaction_timestamp
// .eq(excluded(processor_status::last_transaction_timestamp)),
// )),
// Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "),
// )
// .await?;
//
// Ok(())
// },
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pub mod gcs_handler;
pub mod generic_parquet_buffer_handler;
pub mod parquet_default_extractor;
pub mod size_buffer;
pub mod parquet_version_tracker_step;
// pub mod timed_size_buffer;
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use aptos_indexer_processor_sdk::{
traits::{
pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable,
},
types::transaction_context::TransactionContext,
utils::errors::ProcessorError,
};
use anyhow::Result;
use async_trait::async_trait;
use std::marker::PhantomData;
use std::collections::HashMap;
use crate::parquet_processors::ParquetTypeEnum;
use aptos_indexer_processor_sdk::types::transaction_context::TransactionMetadata;
use diesel::table;


// pub const DEFAULT_UPDATE_PROCESSOR_STATUS_SECS: u64 = 1;
/// The `ParquetProcessorStatusSaver` trait object should be implemented in order to save the latest successfully
/// processed transaction versino to storage. I.e., persisting the `processor_status` to storage.
#[async_trait]
pub trait ParquetProcessorStatusSaver {
// T represents the transaction type that the processor is tracking.
async fn save_parquet_processor_status(
&self,
last_success_batch: &TransactionContext<()>,
table_name: &str,
) -> Result<(), ProcessorError>;
}

/// Tracks the versioned processing of sequential transactions, ensuring no gaps
/// occur between them.
///
/// Important: this step assumes ordered transactions. Please use the `OrederByVersionStep` before this step
/// if the transactions are not ordered.
pub struct ParquetVersionTrackerStep<S>
where
Self: Sized + Send + 'static,
S: ParquetProcessorStatusSaver + Send + 'static,
{
// Last successful batch of sequentially processed transactions. Includes metadata to write to storage.
last_success_batch: HashMap<ParquetTypeEnum, TransactionContext<()>>,
polling_interval_secs: u64,
processor_status_saver: S,
}

impl<S> ParquetVersionTrackerStep<S>
where
Self: Sized + Send + 'static,
S: ParquetProcessorStatusSaver + Send + 'static,
{
pub fn new(processor_status_saver: S, polling_interval_secs: u64) -> Self {
Self {
last_success_batch: HashMap::new(),
processor_status_saver,
polling_interval_secs,
}
}

async fn save_processor_status(&mut self) -> Result<(), ProcessorError> {
for (parquet_type, last_success_batch) in &self.last_success_batch {
let table_name = parquet_type.to_string();
println!("Saving processor status for table: {}", table_name);
self.processor_status_saver
.save_parquet_processor_status(last_success_batch, &table_name)
.await?;
}
Ok(())
}
}

#[async_trait]
impl<S> Processable for ParquetVersionTrackerStep<S>
where
Self: Sized + Send + 'static,
S: ParquetProcessorStatusSaver + Send + 'static,
{
type Input = HashMap<ParquetTypeEnum, TransactionMetadata>;
type Output = HashMap<ParquetTypeEnum, TransactionMetadata>;
type RunType = PollableAsyncRunType;

async fn process(
&mut self,
current_batch: TransactionContext<Self::Input>,
) -> Result<Option<TransactionContext<Self::Output>>, ProcessorError> {
// Initialize a new map to store the processed metadata
let mut processed_data = HashMap::new();

// Check for version gap before processing each key-value pair
let upload_result = current_batch.data;
for (parquet_type, current_metadata) in &upload_result {
// we need to have a map of last_sucess_bath for parquet-Type as well.
// if there is a last_success_batch for the current parquet-Type then we need to check the version gap
if let Some(last_success) = self.last_success_batch.get(parquet_type) {
if last_success.metadata.end_version + 1 != current_metadata.start_version {
return Err(ProcessorError::ProcessError {
message: format!(
"Gap detected for {:?} starting from version: {}",
parquet_type,
current_metadata.start_version
),
});
}
}

processed_data.insert(parquet_type.clone(), current_metadata.clone());

// Update last_success_batch for the current key
self.last_success_batch.insert(
parquet_type.clone(),
TransactionContext {
data: (),
metadata: current_metadata.clone(),
},
);

}

// Pass through the current batch with updated metadata
Ok(Some(TransactionContext {
data: processed_data,
metadata: current_batch.metadata.clone(),
}))
}

async fn cleanup(
&mut self,
) -> Result<Option<Vec<TransactionContext<Self::Output>>>, ProcessorError> {
// Save the last successful batch to the database
self.save_processor_status().await?;
Ok(None)
}
}

#[async_trait]
impl<S> PollableAsyncStep for ParquetVersionTrackerStep<S>
where
Self: Sized + Send + Sync + 'static,
S: ParquetProcessorStatusSaver + Send + Sync + 'static,
{
fn poll_interval(&self) -> std::time::Duration {
std::time::Duration::from_secs(self.polling_interval_secs)
}

async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<HashMap<ParquetTypeEnum, TransactionMetadata>>>>, ProcessorError> {
// TODO: Add metrics for gap count
self.save_processor_status().await?;
// Nothing should be returned
Ok(None)
}
}

impl<S> NamedStep for ParquetVersionTrackerStep<S>
where
Self: Sized + Send + 'static,
S: ParquetProcessorStatusSaver + Send + 'static,
{
fn name(&self) -> String {
format!("ParquetVersionTrackerStep")
}
}
Loading

0 comments on commit 4f4f188

Please sign in to comment.