Skip to content

Commit

Permalink
sync changes with token v2 processor
Browse files Browse the repository at this point in the history
avbel committed Aug 16, 2024
1 parent 382c655 commit 61ef41e
Showing 1 changed file with 74 additions and 9 deletions.
83 changes: 74 additions & 9 deletions rust/processor/src/processors/mercato_token_v2_processor.rs
Original file line number Diff line number Diff line change
@@ -5,16 +5,12 @@ use crate::{
},
token_models::tokens::{TableHandleToOwner, TableMetadataForToken},
token_v2_models::{
v2_collections::{CollectionV2, CurrentCollectionV2, CurrentCollectionV2PK},
v2_token_datas::{CurrentTokenDataV2, CurrentTokenDataV2PK, TokenDataV2},
v2_token_metadata::{CurrentTokenV2Metadata, CurrentTokenV2MetadataPK},
v2_token_ownerships::{
v1_token_royalty::CurrentTokenRoyaltyV1, v2_collections::{CollectionV2, CurrentCollectionV2, CurrentCollectionV2PK}, v2_token_datas::{CurrentTokenDataV2, CurrentTokenDataV2PK, TokenDataV2}, v2_token_metadata::{CurrentTokenV2Metadata, CurrentTokenV2MetadataPK}, v2_token_ownerships::{
CurrentTokenOwnershipV2, CurrentTokenOwnershipV2PK, NFTOwnershipV2,
TokenOwnershipV2,
},
v2_token_utils::{
}, v2_token_utils::{
AptosCollection, Burn, BurnEvent, ConcurrentSupply, FixedSupply, MintEvent, PropertyMapModel, TokenIdentifiers, TokenV2, TokenV2Burned, TokenV2Minted, TransferEvent, UnlimitedSupply
},
}
},
},
schema,
@@ -91,6 +87,7 @@ async fn insert_to_db(
current_token_ownerships_v2: &[CurrentTokenOwnershipV2],
current_deleted_token_ownerships_v2: &[CurrentTokenOwnershipV2],
current_token_v2_metadata: &[CurrentTokenV2Metadata],
current_token_royalties_v1: &[CurrentTokenRoyaltyV1],
per_table_chunk_sizes: &AHashMap<String, usize>,
) -> Result<(), diesel::result::Error> {
tracing::trace!(
@@ -137,7 +134,7 @@ async fn insert_to_db(
),
);
let ct_v2 = execute_in_chunks(
conn,
conn.clone(),
insert_current_token_v2_metadatas_query,
current_token_v2_metadata,
get_config_table_chunk_size::<CurrentTokenV2Metadata>(
@@ -146,20 +143,32 @@ async fn insert_to_db(
),
);

let ctr_v1 = execute_in_chunks(
conn,
insert_current_token_royalties_v1_query,
current_token_royalties_v1,
get_config_table_chunk_size::<CurrentTokenRoyaltyV1>(
"current_token_royalty_v1",
per_table_chunk_sizes,
),
);

let (
cc_v2_res,
ctd_v2_res,
cto_v2_res,
cdto_v2_res,
ct_v2_res,
) = tokio::join!(cc_v2, ctd_v2, cto_v2, cdto_v2, ct_v2);
ctr_v1_res,
) = tokio::join!(cc_v2, ctd_v2, cto_v2, cdto_v2, ct_v2, ctr_v1);

for res in [
cc_v2_res,
ctd_v2_res,
cto_v2_res,
cdto_v2_res,
ct_v2_res,
ctr_v1_res,
] {
res?;
}
@@ -194,6 +203,7 @@ fn insert_current_collections_v2_query(
table_handle_v1.eq(excluded(table_handle_v1)),
token_standard.eq(excluded(token_standard)),
last_transaction_version.eq(excluded(last_transaction_version)),
collection_properties.eq(excluded(collection_properties)),
last_transaction_timestamp.eq(excluded(last_transaction_timestamp)),
inserted_at.eq(excluded(inserted_at)),
)),
@@ -278,9 +288,15 @@ fn insert_current_deleted_token_ownerships_v2_query(
.do_update()
.set((
amount.eq(excluded(amount)),
table_type_v1.eq(excluded(table_type_v1)),
token_properties_mutated_v1.eq(excluded(token_properties_mutated_v1)),
is_soulbound_v2.eq(excluded(is_soulbound_v2)),
token_standard.eq(excluded(token_standard)),
is_fungible_v2.eq(excluded(is_fungible_v2)),
last_transaction_version.eq(excluded(last_transaction_version)),
last_transaction_timestamp.eq(excluded(last_transaction_timestamp)),
inserted_at.eq(excluded(inserted_at)),
non_transferrable_by_owner.eq(excluded(non_transferrable_by_owner)),
)),
Some(" WHERE current_token_ownerships_v2.last_transaction_version <= excluded.last_transaction_version "),
)
@@ -309,6 +325,30 @@ fn insert_current_token_v2_metadatas_query(
)
}

fn insert_current_token_royalties_v1_query(
items_to_insert: Vec<CurrentTokenRoyaltyV1>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Option<&'static str>,
) {
use schema::current_token_royalty_v1::dsl::*;

(
diesel::insert_into(schema::current_token_royalty_v1::table)
.values(items_to_insert)
.on_conflict(token_data_id)
.do_update()
.set((
payee_address.eq(excluded(payee_address)),
royalty_points_numerator.eq(excluded(royalty_points_numerator)),
royalty_points_denominator.eq(excluded(royalty_points_denominator)),
last_transaction_version.eq(excluded(last_transaction_version)),
last_transaction_timestamp.eq(excluded(last_transaction_timestamp)),
)),
Some(" WHERE current_token_royalty_v1.last_transaction_version <= excluded.last_transaction_version "),
)
}

#[async_trait]
impl ProcessorTrait for MercatoTokenV2Processor {
fn name(&self) -> &'static str {
@@ -347,6 +387,7 @@ impl ProcessorTrait for MercatoTokenV2Processor {
current_token_ownerships_v2,
current_deleted_token_ownerships_v2,
current_token_v2_metadata,
current_token_royalties_v1,
) = parse_v2_token(
&transactions,
&table_handle_to_owner,
@@ -384,6 +425,7 @@ impl ProcessorTrait for MercatoTokenV2Processor {
&current_nft_ownerships_v2,
&current_deleted_nft_ownerships_v2,
&current_token_v2_metadata,
&current_token_royalties_v1,
&self.per_table_chunk_sizes,
)
.await;
@@ -435,6 +477,7 @@ async fn parse_v2_token(
Vec<CurrentTokenOwnershipV2>,
Vec<CurrentTokenOwnershipV2>, // deleted token ownerships
Vec<CurrentTokenV2Metadata>,
Vec<CurrentTokenRoyaltyV1>,
) {
// Token V2 and V1 combined
let mut current_collections_v2: AHashMap<CurrentCollectionV2PK, CurrentCollectionV2> =
@@ -455,6 +498,8 @@ async fn parse_v2_token(
// Basically token properties
let mut current_token_v2_metadata: AHashMap<CurrentTokenV2MetadataPK, CurrentTokenV2Metadata> =
AHashMap::new();
let mut current_token_royalties_v1: AHashMap<CurrentTokenDataV2PK, CurrentTokenRoyaltyV1> =
AHashMap::new();

// Code above is inefficient (multiple passthroughs) so I'm approaching MercatoMercatoTokenV2 with a cleaner code structure
for txn in transactions {
@@ -618,6 +663,19 @@ async fn parse_v2_token(
current_token_data,
);
}
if let Some(current_token_royalty) =
CurrentTokenRoyaltyV1::get_v1_from_write_table_item(
table_item,
txn_version,
txn_timestamp,
)
.unwrap()
{
current_token_royalties_v1.insert(
current_token_royalty.token_data_id.clone(),
current_token_royalty,
);
}
if let Some((_token_ownership, current_token_ownership)) =
TokenOwnershipV2::get_v1_from_write_table_item(
table_item,
@@ -850,6 +908,10 @@ async fn parse_v2_token(
let mut current_deleted_token_ownerships_v2 = current_deleted_token_ownerships_v2
.into_values()
.collect::<Vec<CurrentTokenOwnershipV2>>();
let mut current_token_royalties_v1 = current_token_royalties_v1
.into_values()
.collect::<Vec<CurrentTokenRoyaltyV1>>();


// Sort by PK
current_collections_v2.sort_by(|a, b| a.collection_id.cmp(&b.collection_id));
@@ -886,11 +948,14 @@ async fn parse_v2_token(
))
});

current_token_royalties_v1.sort();

(
current_collections_v2,
current_token_datas_v2,
current_token_ownerships_v2,
current_deleted_token_ownerships_v2,
current_token_v2_metadata,
current_token_royalties_v1,
)
}

0 comments on commit 61ef41e

Please sign in to comment.