Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parquet_ans_processor #455

Merged
merged 9 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/processor/src/db/common/models/ans_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
pub mod ans_lookup;
pub mod ans_lookup_v2;
pub mod ans_utils;

// parquet models
pub mod parquet_ans_lookup_v2;
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::{
ans_models::{
ans_lookup::{AnsPrimaryName, CurrentAnsPrimaryName},
ans_utils::SetReverseLookupEvent,
},
token_v2_models::v2_token_utils::TokenStandard,
},
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::Event;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Default, Debug, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct AnsPrimaryNameV2 {
pub txn_version: i64,
pub write_set_change_index: i64,
pub registered_address: String,
pub token_standard: String,
pub domain: Option<String>,
pub subdomain: Option<String>,
pub token_name: Option<String>,
pub is_deleted: bool,
#[allocative(skip)]
Comment on lines +28 to +36
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ying-w let me know if it's missing any fields

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing expiration_timestamp and subdomain_expiration_policy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtso do you know how to parse out the expiration timestamps?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primary names don't have expiration timestamp, only ans lookups do. This PR looks like it's missing moving ans_lookup_v2 to Parquet @yuunlimm

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also realized that this processor is still writing to the v1 ANS tables

all_current_ans_lookups,
all_ans_lookups,
all_current_ans_primary_names,
all_ans_primary_names,

We can include these in the deprecated tables and turn it off

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think ans_lookup_v2 was a part of parquet migration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ying-w do we need lookups in BQ?

Copy link
Contributor

@ying-w ying-w Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just realized this was for ans_primary_name_v2 not ans_lookup_v2

Yes we use both ans_lookup_v2 and current_ans_primary_name_v2 and current_ans_lookup_v2

if it's just removing ans_primary_name_v2 i think that is ok w/out moving to parquet but if going to remove current that would like the non-current to be moved to parquet

pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for AnsPrimaryNameV2 {
const TABLE_NAME: &'static str = "ans_primary_name_v2";
}

impl HasVersion for AnsPrimaryNameV2 {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for AnsPrimaryNameV2 {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct CurrentAnsPrimaryNameV2 {
pub registered_address: String,
pub token_standard: String,
pub domain: Option<String>,
pub subdomain: Option<String>,
pub token_name: Option<String>,
pub is_deleted: bool,
pub last_transaction_version: i64,
}

impl Ord for CurrentAnsPrimaryNameV2 {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.registered_address.cmp(&other.registered_address)
}
}

impl PartialOrd for CurrentAnsPrimaryNameV2 {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl CurrentAnsPrimaryNameV2 {
pub fn get_v2_from_v1(
v1_current_primary_name: CurrentAnsPrimaryName,
v1_primary_name: AnsPrimaryName,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, AnsPrimaryNameV2) {
(
Self {
registered_address: v1_current_primary_name.registered_address,
token_standard: TokenStandard::V1.to_string(),
domain: v1_current_primary_name.domain,
subdomain: v1_current_primary_name.subdomain,
token_name: v1_current_primary_name.token_name,
is_deleted: v1_current_primary_name.is_deleted,
last_transaction_version: v1_current_primary_name.last_transaction_version,
},
AnsPrimaryNameV2 {
txn_version: v1_primary_name.transaction_version,
write_set_change_index: v1_primary_name.write_set_change_index,
registered_address: v1_primary_name.registered_address,
token_standard: TokenStandard::V1.to_string(),
domain: v1_primary_name.domain,
subdomain: v1_primary_name.subdomain,
token_name: v1_primary_name.token_name,
is_deleted: v1_primary_name.is_deleted,
block_timestamp,
},
)
}

// Parse v2 primary name record from SetReverseLookupEvent
pub fn parse_v2_primary_name_record_from_event(
event: &Event,
txn_version: i64,
event_index: i64,
ans_v2_contract_address: &str,
block_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, AnsPrimaryNameV2)>> {
if let Some(set_reverse_lookup_event) =
SetReverseLookupEvent::from_event(event, ans_v2_contract_address, txn_version).unwrap()
{
if set_reverse_lookup_event.get_curr_domain_trunc().is_empty() {
// Handle case where the address's primary name is unset
return Ok(Some((
Self {
registered_address: set_reverse_lookup_event.get_account_addr().clone(),
token_standard: TokenStandard::V2.to_string(),
domain: None,
subdomain: None,
token_name: None,
last_transaction_version: txn_version,
is_deleted: true,
},
AnsPrimaryNameV2 {
txn_version,
write_set_change_index: -(event_index + 1),
registered_address: set_reverse_lookup_event.get_account_addr().clone(),
token_standard: TokenStandard::V2.to_string(),
domain: None,
subdomain: None,
token_name: None,
is_deleted: true,
block_timestamp,
},
)));
} else {
// Handle case where the address is set to a new primary name
return Ok(Some((
Self {
registered_address: set_reverse_lookup_event.get_account_addr().clone(),
token_standard: TokenStandard::V2.to_string(),
domain: Some(set_reverse_lookup_event.get_curr_domain_trunc()),
subdomain: Some(set_reverse_lookup_event.get_curr_subdomain_trunc()),
token_name: Some(set_reverse_lookup_event.get_curr_token_name()),
last_transaction_version: txn_version,
is_deleted: false,
},
AnsPrimaryNameV2 {
txn_version,
write_set_change_index: -(event_index + 1),
registered_address: set_reverse_lookup_event.get_account_addr().clone(),
token_standard: TokenStandard::V2.to_string(),
domain: Some(set_reverse_lookup_event.get_curr_domain_trunc()),
subdomain: Some(set_reverse_lookup_event.get_curr_subdomain_trunc()),
token_name: Some(set_reverse_lookup_event.get_curr_token_name()),
is_deleted: false,
block_timestamp,
},
)));
}
}
Ok(None)
}
}
4 changes: 4 additions & 0 deletions rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
db::common::models::processor_status::ProcessorStatus,
gap_detectors::ProcessingResult,
processors::parquet_processors::{
parquet_ans_processor::{ParquetAnsProcessor, ParquetAnsProcessorConfig},
parquet_default_processor::{ParquetDefaultProcessor, ParquetDefaultProcessorConfig},
parquet_fungible_asset_processor::{
ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig,
Expand Down Expand Up @@ -198,6 +199,7 @@ pub enum ProcessorConfig {
ParquetDefaultProcessor(ParquetDefaultProcessorConfig),
ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig),
ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig),
ParquetAnsProcessor(ParquetAnsProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -213,6 +215,7 @@ impl ProcessorConfig {
ProcessorConfig::ParquetDefaultProcessor(_)
| ProcessorConfig::ParquetFungibleAssetProcessor(_)
| ProcessorConfig::ParquetTransactionMetadataProcessor(_)
| ProcessorConfig::ParquetAnsProcessor(_)
)
}
}
Expand Down Expand Up @@ -250,6 +253,7 @@ pub enum Processor {
ParquetDefaultProcessor,
ParquetFungibleAssetProcessor,
ParquetTransactionMetadataProcessor,
ParquetAnsProcessor,
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion rust/processor/src/processors/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

pub mod parquet_ans_processor;
pub mod parquet_default_processor;

pub mod parquet_fungible_asset_processor;
pub mod parquet_transaction_metadata_processor;

Expand Down
Loading
Loading