Skip to content

Commit

Permalink
[1/2][FA migration] Drop generated fields and add KS for unified (#493)
Browse files Browse the repository at this point in the history
* add migration files

* add killswitch to unified balance

* lint

* lint more

* lint
  • Loading branch information
bowenyang007 authored Aug 28, 2024
1 parent c16b31b commit fc65c59
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Your SQL goes here
-- removing generated fields because we're redoing them
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed
ADD COLUMN IF NOT EXISTS asset_type VARCHAR(1000) GENERATED ALWAYS AS (COALESCE(asset_type_v2, asset_type_v1)) STORED;
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed
ADD COLUMN IF NOT EXISTS token_standard VARCHAR(10) GENERATED ALWAYS AS (
CASE
WHEN asset_type_v2 IS NOT NULL THEN 'v2'
ELSE 'v1'
END
) STORED;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here
-- removing generated fields because we're redoing them
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS asset_type;
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS token_standard;
4 changes: 0 additions & 4 deletions rust/processor/src/db/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,6 @@ diesel::table! {
last_transaction_timestamp_v2 -> Nullable<Timestamp>,
last_transaction_timestamp -> Nullable<Timestamp>,
inserted_at -> Timestamp,
#[max_length = 1000]
asset_type -> Nullable<Varchar>,
#[max_length = 10]
token_standard -> Nullable<Varchar>,
}
}

Expand Down
14 changes: 11 additions & 3 deletions rust/processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,17 @@ impl ProcessorTrait for FungibleAssetProcessor {
let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = current_unified_fungible_asset_balances
.into_iter()
.partition(|x| x.is_primary.is_none());
// if flag turned on we need to not include any value in the table
let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = if self
.deprecated_tables
.contains(TableFlags::CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES)
{
(vec![], vec![])
} else {
current_unified_fungible_asset_balances
.into_iter()
.partition(|x| x.is_primary.is_none())
};

if self
.deprecated_tables
Expand Down
39 changes: 22 additions & 17 deletions rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,22 @@ pub trait ProcessorTrait: Send + Sync + Debug {
}

/// This enum captures the configs for all the different processors that are defined.
///
/// The configs for each processor should only contain configuration specific to that
/// processor. For configuration that is common to all processors, put it in
/// IndexerGrpcProcessorConfig.
#[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)]
#[serde(tag = "type", rename_all = "snake_case")]
// What is all this strum stuff? Let me explain.
//
// Previously we had consts called NAME in each module and a function called `name` on
// the ProcessorTrait. As such it was possible for this name to not match the snake case
// representation of the struct name. By using strum we can have a single source for
// processor names derived from the enum variants themselves.
//
// That's what this strum_discriminants stuff is, it uses macro magic to generate the
// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
// generation logic, e.g. to make sure we use snake_case.
/// What is all this strum stuff? Let me explain.
///
/// Previously we had consts called NAME in each module and a function called `name` on
/// the ProcessorTrait. As such it was possible for this name to not match the snake case
/// representation of the struct name. By using strum we can have a single source for
/// processor names derived from the enum variants themselves.
///
/// That's what this strum_discriminants stuff is, it uses macro magic to generate the
/// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
/// generation logic, e.g. to make sure we use snake_case.
#[strum(serialize_all = "snake_case")]
#[strum_discriminants(
derive(
Expand Down Expand Up @@ -226,14 +227,16 @@ impl ProcessorConfig {
}
}

/// This enum contains all the processors defined in this crate. We use enum_dispatch
/// as it is more efficient than using dynamic dispatch (Box<dyn ProcessorTrait>) and
/// This enum contains all the processors defined in this crate.
///
/// We use enum_dispatch as it is more efficient than using dynamic dispatch (Box<dyn ProcessorTrait>) and
/// it enables nice safety checks like in we do in `test_processor_names_complete`.
#[enum_dispatch(ProcessorTrait)]
#[derive(Debug)]
// To ensure that the variants of ProcessorConfig and Processor line up, in the testing
///
/// // To ensure that the variants of ProcessorConfig and Processor line up, in the testing
// build path we derive EnumDiscriminants on this enum as well and make sure the two
// sets of variants match up in `test_processor_names_complete`.
#[enum_dispatch(ProcessorTrait)]
#[derive(Debug)]
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
Expand Down Expand Up @@ -271,8 +274,10 @@ mod test {
use strum::VariantNames;

/// This test exists to make sure that when a new processor is added, it is added
/// to both Processor and ProcessorConfig. To make sure this passes, make sure the
/// variants are in the same order (lexicographical) and the names match.
/// to both Processor and ProcessorConfig.
///
/// To make sure this passes, make sure the variants are in the same order
/// (lexicographical) and the names match.
#[test]
fn test_processor_names_complete() {
assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS);
Expand Down
13 changes: 7 additions & 6 deletions rust/processor/src/utils/database.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

//! Database-related functions
#![allow(clippy::extra_unused_lifetimes)]

use crate::utils::util::remove_null_bytes;
use ahash::AHashMap;
use diesel::{
Expand Down Expand Up @@ -33,15 +30,18 @@ pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("src/db/postgres/mi
pub const DEFAULT_MAX_POOL_SIZE: u32 = 150;

#[derive(QueryId)]
/// Using this will append a where clause at the end of the string upsert function, e.g.
/// Using this will append a where clause at the end of the string upsert function
///
/// e.g.
/// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version"
/// This is needed when we want to maintain a table with only the latest state
pub struct UpsertFilterLatestTransactionQuery<T> {
query: T,
where_clause: Option<&'static str>,
}

// the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit
/// the max is actually u16::MAX but we see that when the size is too big we get
/// an overflow error so reducing it a bit
pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize;

/// This function will clean the data for postgres. Currently it has support for removing
Expand Down Expand Up @@ -190,7 +190,8 @@ where
res
}

/// Returns the entry for the config hashmap, or the default field count for the insert
/// Returns the entry for the config hashmap, or the default field count for the insert.
///
/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX),
/// we default to chunk an array of items based on how many columns are in the table.
pub fn get_config_table_chunk_size<T: field_count::FieldCount>(
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ bitflags! {
const FUNGIBLE_ASSET_BALANCES = 1 << 6;
const CURRENT_FUNGIBLE_ASSET_BALANCES = 1 << 7;
const COIN_SUPPLY = 1 << 8;
const CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES = 1 << 24;

// Objects
const OBJECTS = 1 << 9;
Expand Down
1 change: 1 addition & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::processors::events_processor::EventsProcessorConfig;
use serde::{Deserialize, Serialize};

/// This enum captures the configs for all the different processors that are defined.
///
/// The configs for each processor should only contain configuration specific to that
/// processor. For configuration that is common to all processors, put it in
/// IndexerGrpcProcessorConfig.
Expand Down
7 changes: 5 additions & 2 deletions rust/sdk-processor/src/utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ pub const MIGRATIONS: EmbeddedMigrations =
pub const DEFAULT_MAX_POOL_SIZE: u32 = 150;

#[derive(QueryId)]
/// Using this will append a where clause at the end of the string upsert function, e.g.
/// Using this will append a where clause at the end of the string upsert function
///
/// e.g.
/// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version"
/// This is needed when we want to maintain a table with only the latest state
pub struct UpsertFilterLatestTransactionQuery<T> {
Expand Down Expand Up @@ -191,7 +193,8 @@ where
res
}

/// Returns the entry for the config hashmap, or the default field count for the insert
/// Returns the entry for the config hashmap, or the default field count for the insert.
///
/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX),
/// we default to chunk an array of items based on how many columns are in the table.
pub fn get_config_table_chunk_size<T: field_count::FieldCount>(
Expand Down

0 comments on commit fc65c59

Please sign in to comment.