From 24d11970b5b976c965b45ba776dedc0b6402c8fa Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Mon, 26 Aug 2024 15:12:50 -0700 Subject: [PATCH 1/5] add migration files --- .../2024-08-20-224736_fa_migration_fix_part1/down.sql | 11 +++++++++++ .../2024-08-20-224736_fa_migration_fix_part1/up.sql | 4 ++++ rust/processor/src/db/postgres/schema.rs | 4 ---- 3 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/down.sql create mode 100644 rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/up.sql diff --git a/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/down.sql b/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/down.sql new file mode 100644 index 000000000..0e92c79f7 --- /dev/null +++ b/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/down.sql @@ -0,0 +1,11 @@ +-- Your SQL goes here +-- removing generated fields because we're redoing them +ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed +ADD COLUMN IF NOT EXISTS asset_type VARCHAR(1000) GENERATED ALWAYS AS (COALESCE(asset_type_v2, asset_type_v1)) STORED; +ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed +ADD COLUMN IF NOT EXISTS token_standard VARCHAR(10) GENERATED ALWAYS AS ( + CASE + WHEN asset_type_v2 IS NOT NULL THEN 'v2' + ELSE 'v1' + END + ) STORED; \ No newline at end of file diff --git a/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/up.sql b/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/up.sql new file mode 100644 index 000000000..bf6528785 --- /dev/null +++ b/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/up.sql @@ -0,0 +1,4 @@ +-- Your SQL goes here +-- removing generated fields because we're redoing them +ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS asset_type; +ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS token_standard; \ No newline at end of file diff --git a/rust/processor/src/db/postgres/schema.rs b/rust/processor/src/db/postgres/schema.rs index 8c9508479..e88cc7871 100644 --- a/rust/processor/src/db/postgres/schema.rs +++ b/rust/processor/src/db/postgres/schema.rs @@ -673,10 +673,6 @@ diesel::table! { last_transaction_timestamp_v2 -> Nullable, last_transaction_timestamp -> Nullable, inserted_at -> Timestamp, - #[max_length = 1000] - asset_type -> Nullable, - #[max_length = 10] - token_standard -> Nullable, } } From 74e1a49f93260a1b6e1d18b76d0b389cddfe0a22 Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Mon, 26 Aug 2024 15:22:47 -0700 Subject: [PATCH 2/5] add killswitch to unified balance --- .../src/processors/fungible_asset_processor.rs | 14 +++++++++++--- rust/processor/src/worker.rs | 1 + 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 68f4a51b2..2620846c8 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -372,9 +372,17 @@ impl ProcessorTrait for FungibleAssetProcessor { let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); - let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = current_unified_fungible_asset_balances - .into_iter() - .partition(|x| x.is_primary.is_none()); + // if flag turned on we need to + let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = if self + .deprecated_tables + .contains(TableFlags::CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES) + { + (vec![], vec![]) + } else { + current_unified_fungible_asset_balances + .into_iter() + .partition(|x| x.is_primary.is_none()) + }; if self .deprecated_tables diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index b821ecb88..adbc7fa2a 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -83,6 +83,7 @@ bitflags! { const FUNGIBLE_ASSET_BALANCES = 1 << 6; const CURRENT_FUNGIBLE_ASSET_BALANCES = 1 << 7; const COIN_SUPPLY = 1 << 8; + const CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES = 1 << 24; // Objects const OBJECTS = 1 << 9; From f4123efe55482ced165413d8daf81328a88cdd0f Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Tue, 27 Aug 2024 14:49:57 -0700 Subject: [PATCH 3/5] lint --- .../processors/fungible_asset_processor.rs | 2 +- rust/processor/src/processors/mod.rs | 19 ++++++++++++------- rust/processor/src/utils/database.rs | 13 +++++++------ .../src/config/processor_config.rs | 1 + rust/sdk-processor/src/utils/database.rs | 7 +++++-- 5 files changed, 26 insertions(+), 16 deletions(-) diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 2620846c8..f80133a42 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -372,7 +372,7 @@ impl ProcessorTrait for FungibleAssetProcessor { let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); - // if flag turned on we need to + // if flag turned on we need to not include any value in the table let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = if self .deprecated_tables .contains(TableFlags::CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES) diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index 4b47306b5..f30819bcb 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -155,6 +155,7 @@ pub trait ProcessorTrait: Send + Sync + Debug { } /// This enum captures the configs for all the different processors that are defined. +/// /// The configs for each processor should only contain configuration specific to that /// processor. For configuration that is common to all processors, put it in /// IndexerGrpcProcessorConfig. @@ -226,14 +227,16 @@ impl ProcessorConfig { } } -/// This enum contains all the processors defined in this crate. We use enum_dispatch -/// as it is more efficient than using dynamic dispatch (Box) and +/// This enum contains all the processors defined in this crate. +/// +/// We use enum_dispatch as it is more efficient than using dynamic dispatch (Box) and /// it enables nice safety checks like in we do in `test_processor_names_complete`. -#[enum_dispatch(ProcessorTrait)] -#[derive(Debug)] -// To ensure that the variants of ProcessorConfig and Processor line up, in the testing +/// +/// // To ensure that the variants of ProcessorConfig and Processor line up, in the testing // build path we derive EnumDiscriminants on this enum as well and make sure the two // sets of variants match up in `test_processor_names_complete`. +#[enum_dispatch(ProcessorTrait)] +#[derive(Debug)] #[cfg_attr( test, derive(strum::EnumDiscriminants), @@ -271,8 +274,10 @@ mod test { use strum::VariantNames; /// This test exists to make sure that when a new processor is added, it is added - /// to both Processor and ProcessorConfig. To make sure this passes, make sure the - /// variants are in the same order (lexicographical) and the names match. + /// to both Processor and ProcessorConfig. + /// + /// To make sure this passes, make sure the variants are in the same order + /// (lexicographical) and the names match. #[test] fn test_processor_names_complete() { assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS); diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index 411ce46c9..128553159 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -1,9 +1,6 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -//! Database-related functions -#![allow(clippy::extra_unused_lifetimes)] - use crate::utils::util::remove_null_bytes; use ahash::AHashMap; use diesel::{ @@ -33,7 +30,9 @@ pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("src/db/postgres/mi pub const DEFAULT_MAX_POOL_SIZE: u32 = 150; #[derive(QueryId)] -/// Using this will append a where clause at the end of the string upsert function, e.g. +/// Using this will append a where clause at the end of the string upsert function +/// +/// e.g. /// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version" /// This is needed when we want to maintain a table with only the latest state pub struct UpsertFilterLatestTransactionQuery { @@ -41,7 +40,8 @@ pub struct UpsertFilterLatestTransactionQuery { where_clause: Option<&'static str>, } -// the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit +/// the max is actually u16::MAX but we see that when the size is too big we get +/// an overflow error so reducing it a bit pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize; /// This function will clean the data for postgres. Currently it has support for removing @@ -190,7 +190,8 @@ where res } -/// Returns the entry for the config hashmap, or the default field count for the insert +/// Returns the entry for the config hashmap, or the default field count for the insert. +/// /// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX), /// we default to chunk an array of items based on how many columns are in the table. pub fn get_config_table_chunk_size( diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 24e2be86b..cf0795a7b 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -2,6 +2,7 @@ use crate::processors::events_processor::EventsProcessorConfig; use serde::{Deserialize, Serialize}; /// This enum captures the configs for all the different processors that are defined. +/// /// The configs for each processor should only contain configuration specific to that /// processor. For configuration that is common to all processors, put it in /// IndexerGrpcProcessorConfig. diff --git a/rust/sdk-processor/src/utils/database.rs b/rust/sdk-processor/src/utils/database.rs index dd9de0c54..294c8d41c 100644 --- a/rust/sdk-processor/src/utils/database.rs +++ b/rust/sdk-processor/src/utils/database.rs @@ -34,7 +34,9 @@ pub const MIGRATIONS: EmbeddedMigrations = pub const DEFAULT_MAX_POOL_SIZE: u32 = 150; #[derive(QueryId)] -/// Using this will append a where clause at the end of the string upsert function, e.g. +/// Using this will append a where clause at the end of the string upsert function +/// +/// e.g. /// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version" /// This is needed when we want to maintain a table with only the latest state pub struct UpsertFilterLatestTransactionQuery { @@ -191,7 +193,8 @@ where res } -/// Returns the entry for the config hashmap, or the default field count for the insert +/// Returns the entry for the config hashmap, or the default field count for the insert. +/// /// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX), /// we default to chunk an array of items based on how many columns are in the table. pub fn get_config_table_chunk_size( From a9bd17bfaa3c1ca26323997e600d43eaee3a56e2 Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Tue, 27 Aug 2024 14:58:41 -0700 Subject: [PATCH 4/5] lint more --- rust/processor/src/utils/database.rs | 4 ++-- rust/sdk-processor/src/config/processor_config.rs | 2 +- rust/sdk-processor/src/utils/database.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index 128553159..ad6a280cb 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -31,7 +31,7 @@ pub const DEFAULT_MAX_POOL_SIZE: u32 = 150; #[derive(QueryId)] /// Using this will append a where clause at the end of the string upsert function -/// +/// /// e.g. /// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version" /// This is needed when we want to maintain a table with only the latest state @@ -191,7 +191,7 @@ where } /// Returns the entry for the config hashmap, or the default field count for the insert. -/// +/// /// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX), /// we default to chunk an array of items based on how many columns are in the table. pub fn get_config_table_chunk_size( diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index cf0795a7b..541458a1a 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -2,7 +2,7 @@ use crate::processors::events_processor::EventsProcessorConfig; use serde::{Deserialize, Serialize}; /// This enum captures the configs for all the different processors that are defined. -/// +/// /// The configs for each processor should only contain configuration specific to that /// processor. For configuration that is common to all processors, put it in /// IndexerGrpcProcessorConfig. diff --git a/rust/sdk-processor/src/utils/database.rs b/rust/sdk-processor/src/utils/database.rs index 294c8d41c..38d01a128 100644 --- a/rust/sdk-processor/src/utils/database.rs +++ b/rust/sdk-processor/src/utils/database.rs @@ -35,7 +35,7 @@ pub const DEFAULT_MAX_POOL_SIZE: u32 = 150; #[derive(QueryId)] /// Using this will append a where clause at the end of the string upsert function -/// +/// /// e.g. /// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version" /// This is needed when we want to maintain a table with only the latest state @@ -194,7 +194,7 @@ where } /// Returns the entry for the config hashmap, or the default field count for the insert. -/// +/// /// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX), /// we default to chunk an array of items based on how many columns are in the table. pub fn get_config_table_chunk_size( From 3b93e64b436b42c54f1be2bcaa16223dc0db3cb2 Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Tue, 27 Aug 2024 15:11:27 -0700 Subject: [PATCH 5/5] lint --- rust/processor/src/processors/mod.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index f30819bcb..d51544302 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -155,22 +155,22 @@ pub trait ProcessorTrait: Send + Sync + Debug { } /// This enum captures the configs for all the different processors that are defined. -/// +/// /// The configs for each processor should only contain configuration specific to that /// processor. For configuration that is common to all processors, put it in /// IndexerGrpcProcessorConfig. #[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)] #[serde(tag = "type", rename_all = "snake_case")] -// What is all this strum stuff? Let me explain. -// -// Previously we had consts called NAME in each module and a function called `name` on -// the ProcessorTrait. As such it was possible for this name to not match the snake case -// representation of the struct name. By using strum we can have a single source for -// processor names derived from the enum variants themselves. -// -// That's what this strum_discriminants stuff is, it uses macro magic to generate the -// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this -// generation logic, e.g. to make sure we use snake_case. +/// What is all this strum stuff? Let me explain. +/// +/// Previously we had consts called NAME in each module and a function called `name` on +/// the ProcessorTrait. As such it was possible for this name to not match the snake case +/// representation of the struct name. By using strum we can have a single source for +/// processor names derived from the enum variants themselves. +/// +/// That's what this strum_discriminants stuff is, it uses macro magic to generate the +/// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this +/// generation logic, e.g. to make sure we use snake_case. #[strum(serialize_all = "snake_case")] #[strum_discriminants( derive( @@ -228,10 +228,10 @@ impl ProcessorConfig { } /// This enum contains all the processors defined in this crate. -/// +/// /// We use enum_dispatch as it is more efficient than using dynamic dispatch (Box) and /// it enables nice safety checks like in we do in `test_processor_names_complete`. -/// +/// /// // To ensure that the variants of ProcessorConfig and Processor line up, in the testing // build path we derive EnumDiscriminants on this enum as well and make sure the two // sets of variants match up in `test_processor_names_complete`. @@ -275,7 +275,7 @@ mod test { /// This test exists to make sure that when a new processor is added, it is added /// to both Processor and ProcessorConfig. - /// + /// /// To make sure this passes, make sure the variants are in the same order /// (lexicographical) and the names match. #[test]