Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed May 31, 2024
1 parent ab36ec5 commit 8c529e8
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 27 deletions.
1 change: 0 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,5 @@ parquet = "51.0.0"
num = "0.4.0"
google-cloud-storage = "0.13.0"
hyper = { version = "0.14.18", features = ["full"] }
lazy_static = "1.4.0"
parquet_derive = { version = "51.0.0" }
canonical_json = "0.5.0"
4 changes: 2 additions & 2 deletions rust/indexer-metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ async fn start_processor_status_fetch(url: String, chain_name: String) {
.set(processor.last_success_version as i64);
HASURA_API_LATEST_VERSION_TIMESTAMP
.with_label_values(&[&processor.processor, &chain_name])
.set(processor.last_updated.timestamp_micros() as f64 * 1e-6);
.set(processor.last_updated.and_utc().timestamp_micros() as f64 * 1e-6);
HASURA_API_LATEST_TRANSACTION_TIMESTAMP
.with_label_values(&[&processor.processor, &chain_name])
.set(
processor.last_transaction_timestamp.timestamp_micros() as f64
processor.last_transaction_timestamp.and_utc().timestamp_micros() as f64
* 1e-6,
);
let latency = system_time_now - processor.last_transaction_timestamp;
Expand Down
4 changes: 2 additions & 2 deletions rust/moving-average/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct MovingAverage {

impl MovingAverage {
pub fn new(window_millis: u64) -> Self {
let now = chrono::Utc::now().naive_utc().timestamp_millis() as u64;
let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64;
let mut queue = VecDeque::new();
queue.push_back((now, 0));
Self {
Expand All @@ -26,7 +26,7 @@ impl MovingAverage {
}

pub fn tick_now(&mut self, value: u64) {
let now = chrono::Utc::now().naive_utc().timestamp_millis() as u64;
let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64;
self.tick(now, value);
}

Expand Down
3 changes: 0 additions & 3 deletions rust/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ extern crate diesel;

// for parquet_derive
extern crate parquet;
#[macro_use]
extern crate parquet_derive;

#[macro_use]
extern crate canonical_json;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use parquet_derive::{ParquetRecordWriter};
pub struct MoveResource {
pub transaction_version: i64,
pub write_set_change_index: i64,
pub transaction_block_height: i64,
pub block_height: i64,
pub name: String,
pub address: String,
pub resource_type: String,
Expand All @@ -44,7 +44,7 @@ impl MoveResource {
write_resource: &WriteResource,
write_set_change_index: i64,
transaction_version: i64,
transaction_block_height: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let parsed_data = Self::convert_move_struct_tag(
Expand All @@ -55,7 +55,7 @@ impl MoveResource {
);
Self {
transaction_version,
transaction_block_height,
block_height,
write_set_change_index,
resource_type: write_resource.type_str.clone(),
name: parsed_data.name.clone(),
Expand All @@ -75,7 +75,7 @@ impl MoveResource {
delete_resource: &DeleteResource,
write_set_change_index: i64,
transaction_version: i64,
transaction_block_height: i64,
block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let parsed_data = Self::convert_move_struct_tag(
Expand All @@ -86,7 +86,7 @@ impl MoveResource {
);
Self {
transaction_version,
transaction_block_height,
block_height,
write_set_change_index,
resource_type: delete_resource.type_str.clone(),
name: parsed_data.name.clone(),
Expand Down
1 change: 0 additions & 1 deletion rust/processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ pub trait ParquetHandlerTrait: Send + Sync {
ParquetStruct::MoveResource(move_resource) => Some(move_resource.transaction_version as u64),
ParquetStruct::Transaction(transaction) => Some(transaction.version as u64),
ParquetStruct::WriteSetChange(write_set_change) => Some(write_set_change.transaction_version as u64),
_ => None,
};

if let Some(version) = current_version {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use std::{ fs::{File, remove_file, rename}, sync::Arc};
use super::{ParquetData, ParquetHandlerTrait, ParquetProcessingResult, ParquetStruct};


const MAX_FILE_SIZE: u64 = 500 * 1024 * 1024; // 500 MB in bytes, maybe reduce this to 300 MB. use different value for struct type
const BUCKET_REGULAR_TRAFFIC: &str = "devnet-airflow-continue";
const BUCKET_BACKFILL_TRAFFIC: &str = "devnet-airflow-backfill";

const MAX_FILE_SIZE: u64 = 500 * 1024 * 1024;

pub struct MoveResourceParquetHandler {
writer: Option<SerializedFileWriter<File>>,
Expand Down
8 changes: 4 additions & 4 deletions rust/processor/src/processors/parquet_default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ pub fn process_transactions(
match detail {
WriteSetChangeDetail::Module(module) => {
move_modules.push(module.clone());
// transaction_version_to_struct_count.entry(module.transaction_version).and_modify(|e| *e += 1);
// transaction_version_to_struct_count.entry(module.transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2
},
WriteSetChangeDetail::Resource(resource) => {
move_resources.push(ParquetStruct::MoveResource(resource.clone()));
transaction_version_to_struct_count.entry(resource.transaction_version).and_modify(|e| *e += 1);
},
WriteSetChangeDetail::Table(item, current_item, metadata) => {
table_items.push(item.clone());
// transaction_version_to_struct_count.entry(item.transaction_version).and_modify(|e| *e += 1);
// transaction_version_to_struct_count.entry(item.transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2

current_table_items.insert(
(
Expand All @@ -165,11 +165,11 @@ pub fn process_transactions(
),
current_item.clone(),
);
// transaction_version_to_struct_count.entry(current_item.last_transaction_version).and_modify(|e| *e += 1);
// transaction_version_to_struct_count.entry(current_item.last_transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2

if let Some(meta) = metadata {
table_metadata.insert(meta.handle.clone(), meta.clone());
// transaction_version_to_struct_count.entry(current_item.last_transaction_version).and_modify(|e| *e += 1);
// transaction_version_to_struct_count.entry(current_item.last_transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2
}
},
}
Expand Down
10 changes: 5 additions & 5 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct Worker {
pub transaction_filter: TransactionFilter,
pub grpc_response_item_timeout_in_secs: u64,
pub gcs_client: Arc<GCSClient>,
pub bucket_name: Option<String>,
pub parquet_bucket_name: Option<String>,
pub is_parquet_processor: Option<bool>,
}

Expand All @@ -77,7 +77,7 @@ impl Worker {
transaction_filter: TransactionFilter,
grpc_response_item_timeout_in_secs: u64,
gcs_client: Arc<GCSClient>,
bucket_name: Option<String>,
parquet_bucket_name: Option<String>,
is_parquet_processor: Option<bool>,
) -> Result<Self> {
let processor_name = processor_config.name();
Expand Down Expand Up @@ -116,7 +116,7 @@ impl Worker {
transaction_filter,
grpc_response_item_timeout_in_secs,
gcs_client,
bucket_name,
parquet_bucket_name,
is_parquet_processor,
})
}
Expand Down Expand Up @@ -277,15 +277,15 @@ impl Worker {
}

let gcs_client = self.gcs_client.clone();
let bucket_name = self.bucket_name.clone().unwrap().to_string();
let parquet_bucket_name = self.parquet_bucket_name.clone().unwrap().to_string();

tokio::spawn(async move {
crate::parquet_handler::create_parquet_handler_loop(
gcs_client,
new_gap_detector_sender.clone(),
parquet_type_to_receiver.clone(),
processor_name,
bucket_name.clone(),
parquet_bucket_name.clone(),
)
.await;
});
Expand Down

0 comments on commit 8c529e8

Please sign in to comment.