From fc65c590e2021a8076868f889fa74e904fd56ec0 Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Tue, 27 Aug 2024 18:51:29 -0700 Subject: [PATCH] [1/2][FA migration] Drop generated fields and add KS for unified (#493) * add migration files * add killswitch to unified balance * lint * lint more * lint --- .../down.sql | 11 ++++++ .../up.sql | 4 ++ rust/processor/src/db/postgres/schema.rs | 4 -- .../processors/fungible_asset_processor.rs | 14 +++++-- rust/processor/src/processors/mod.rs | 39 +++++++++++-------- rust/processor/src/utils/database.rs | 13 ++++--- rust/processor/src/worker.rs | 1 + .../src/config/processor_config.rs | 1 + rust/sdk-processor/src/utils/database.rs | 7 +++- 9 files changed, 62 insertions(+), 32 deletions(-) create mode 100644 rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/down.sql create mode 100644 rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/up.sql diff --git a/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/down.sql b/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/down.sql new file mode 100644 index 000000000..0e92c79f7 --- /dev/null +++ b/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/down.sql @@ -0,0 +1,11 @@ +-- Your SQL goes here +-- removing generated fields because we're redoing them +ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed +ADD COLUMN IF NOT EXISTS asset_type VARCHAR(1000) GENERATED ALWAYS AS (COALESCE(asset_type_v2, asset_type_v1)) STORED; +ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed +ADD COLUMN IF NOT EXISTS token_standard VARCHAR(10) GENERATED ALWAYS AS ( + CASE + WHEN asset_type_v2 IS NOT NULL THEN 'v2' + ELSE 'v1' + END + ) STORED; \ No newline at end of file diff --git a/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/up.sql b/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/up.sql new file mode 100644 index 000000000..bf6528785 --- /dev/null +++ b/rust/processor/src/db/postgres/migrations/2024-08-20-224736_fa_migration_fix_part1/up.sql @@ -0,0 +1,4 @@ +-- Your SQL goes here +-- removing generated fields because we're redoing them +ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS asset_type; +ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS token_standard; \ No newline at end of file diff --git a/rust/processor/src/db/postgres/schema.rs b/rust/processor/src/db/postgres/schema.rs index 8c9508479..e88cc7871 100644 --- a/rust/processor/src/db/postgres/schema.rs +++ b/rust/processor/src/db/postgres/schema.rs @@ -673,10 +673,6 @@ diesel::table! { last_transaction_timestamp_v2 -> Nullable, last_transaction_timestamp -> Nullable, inserted_at -> Timestamp, - #[max_length = 1000] - asset_type -> Nullable, - #[max_length = 10] - token_standard -> Nullable, } } diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 68f4a51b2..f80133a42 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -372,9 +372,17 @@ impl ProcessorTrait for FungibleAssetProcessor { let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); - let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = current_unified_fungible_asset_balances - .into_iter() - .partition(|x| x.is_primary.is_none()); + // if flag turned on we need to not include any value in the table + let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = if self + .deprecated_tables + .contains(TableFlags::CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES) + { + (vec![], vec![]) + } else { + current_unified_fungible_asset_balances + .into_iter() + .partition(|x| x.is_primary.is_none()) + }; if self .deprecated_tables diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index 4b47306b5..d51544302 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -155,21 +155,22 @@ pub trait ProcessorTrait: Send + Sync + Debug { } /// This enum captures the configs for all the different processors that are defined. +/// /// The configs for each processor should only contain configuration specific to that /// processor. For configuration that is common to all processors, put it in /// IndexerGrpcProcessorConfig. #[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)] #[serde(tag = "type", rename_all = "snake_case")] -// What is all this strum stuff? Let me explain. -// -// Previously we had consts called NAME in each module and a function called `name` on -// the ProcessorTrait. As such it was possible for this name to not match the snake case -// representation of the struct name. By using strum we can have a single source for -// processor names derived from the enum variants themselves. -// -// That's what this strum_discriminants stuff is, it uses macro magic to generate the -// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this -// generation logic, e.g. to make sure we use snake_case. +/// What is all this strum stuff? Let me explain. +/// +/// Previously we had consts called NAME in each module and a function called `name` on +/// the ProcessorTrait. As such it was possible for this name to not match the snake case +/// representation of the struct name. By using strum we can have a single source for +/// processor names derived from the enum variants themselves. +/// +/// That's what this strum_discriminants stuff is, it uses macro magic to generate the +/// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this +/// generation logic, e.g. to make sure we use snake_case. #[strum(serialize_all = "snake_case")] #[strum_discriminants( derive( @@ -226,14 +227,16 @@ impl ProcessorConfig { } } -/// This enum contains all the processors defined in this crate. We use enum_dispatch -/// as it is more efficient than using dynamic dispatch (Box) and +/// This enum contains all the processors defined in this crate. +/// +/// We use enum_dispatch as it is more efficient than using dynamic dispatch (Box) and /// it enables nice safety checks like in we do in `test_processor_names_complete`. -#[enum_dispatch(ProcessorTrait)] -#[derive(Debug)] -// To ensure that the variants of ProcessorConfig and Processor line up, in the testing +/// +/// // To ensure that the variants of ProcessorConfig and Processor line up, in the testing // build path we derive EnumDiscriminants on this enum as well and make sure the two // sets of variants match up in `test_processor_names_complete`. +#[enum_dispatch(ProcessorTrait)] +#[derive(Debug)] #[cfg_attr( test, derive(strum::EnumDiscriminants), @@ -271,8 +274,10 @@ mod test { use strum::VariantNames; /// This test exists to make sure that when a new processor is added, it is added - /// to both Processor and ProcessorConfig. To make sure this passes, make sure the - /// variants are in the same order (lexicographical) and the names match. + /// to both Processor and ProcessorConfig. + /// + /// To make sure this passes, make sure the variants are in the same order + /// (lexicographical) and the names match. #[test] fn test_processor_names_complete() { assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS); diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index 411ce46c9..ad6a280cb 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -1,9 +1,6 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -//! Database-related functions -#![allow(clippy::extra_unused_lifetimes)] - use crate::utils::util::remove_null_bytes; use ahash::AHashMap; use diesel::{ @@ -33,7 +30,9 @@ pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("src/db/postgres/mi pub const DEFAULT_MAX_POOL_SIZE: u32 = 150; #[derive(QueryId)] -/// Using this will append a where clause at the end of the string upsert function, e.g. +/// Using this will append a where clause at the end of the string upsert function +/// +/// e.g. /// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version" /// This is needed when we want to maintain a table with only the latest state pub struct UpsertFilterLatestTransactionQuery { @@ -41,7 +40,8 @@ pub struct UpsertFilterLatestTransactionQuery { where_clause: Option<&'static str>, } -// the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit +/// the max is actually u16::MAX but we see that when the size is too big we get +/// an overflow error so reducing it a bit pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize; /// This function will clean the data for postgres. Currently it has support for removing @@ -190,7 +190,8 @@ where res } -/// Returns the entry for the config hashmap, or the default field count for the insert +/// Returns the entry for the config hashmap, or the default field count for the insert. +/// /// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX), /// we default to chunk an array of items based on how many columns are in the table. pub fn get_config_table_chunk_size( diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index b821ecb88..adbc7fa2a 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -83,6 +83,7 @@ bitflags! { const FUNGIBLE_ASSET_BALANCES = 1 << 6; const CURRENT_FUNGIBLE_ASSET_BALANCES = 1 << 7; const COIN_SUPPLY = 1 << 8; + const CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES = 1 << 24; // Objects const OBJECTS = 1 << 9; diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 24e2be86b..541458a1a 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -2,6 +2,7 @@ use crate::processors::events_processor::EventsProcessorConfig; use serde::{Deserialize, Serialize}; /// This enum captures the configs for all the different processors that are defined. +/// /// The configs for each processor should only contain configuration specific to that /// processor. For configuration that is common to all processors, put it in /// IndexerGrpcProcessorConfig. diff --git a/rust/sdk-processor/src/utils/database.rs b/rust/sdk-processor/src/utils/database.rs index dd9de0c54..38d01a128 100644 --- a/rust/sdk-processor/src/utils/database.rs +++ b/rust/sdk-processor/src/utils/database.rs @@ -34,7 +34,9 @@ pub const MIGRATIONS: EmbeddedMigrations = pub const DEFAULT_MAX_POOL_SIZE: u32 = 150; #[derive(QueryId)] -/// Using this will append a where clause at the end of the string upsert function, e.g. +/// Using this will append a where clause at the end of the string upsert function +/// +/// e.g. /// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version" /// This is needed when we want to maintain a table with only the latest state pub struct UpsertFilterLatestTransactionQuery { @@ -191,7 +193,8 @@ where res } -/// Returns the entry for the config hashmap, or the default field count for the insert +/// Returns the entry for the config hashmap, or the default field count for the insert. +/// /// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX), /// we default to chunk an array of items based on how many columns are in the table. pub fn get_config_table_chunk_size(