Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/2][FA migration] Drop generated fields and add KS for unified #493

Merged
merged 5 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Your SQL goes here
-- removing generated fields because we're redoing them
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed
ADD COLUMN IF NOT EXISTS asset_type VARCHAR(1000) GENERATED ALWAYS AS (COALESCE(asset_type_v2, asset_type_v1)) STORED;
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed
ADD COLUMN IF NOT EXISTS token_standard VARCHAR(10) GENERATED ALWAYS AS (
CASE
WHEN asset_type_v2 IS NOT NULL THEN 'v2'
ELSE 'v1'
END
) STORED;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here
-- removing generated fields because we're redoing them
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS asset_type;
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS token_standard;
4 changes: 0 additions & 4 deletions rust/processor/src/db/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,6 @@ diesel::table! {
last_transaction_timestamp_v2 -> Nullable<Timestamp>,
last_transaction_timestamp -> Nullable<Timestamp>,
inserted_at -> Timestamp,
#[max_length = 1000]
asset_type -> Nullable<Varchar>,
#[max_length = 10]
token_standard -> Nullable<Varchar>,
}
}

Expand Down
14 changes: 11 additions & 3 deletions rust/processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,17 @@ impl ProcessorTrait for FungibleAssetProcessor {
let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = current_unified_fungible_asset_balances
.into_iter()
.partition(|x| x.is_primary.is_none());
// if flag turned on we need to not include any value in the table
let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = if self
.deprecated_tables
.contains(TableFlags::CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES)
{
(vec![], vec![])
} else {
current_unified_fungible_asset_balances
.into_iter()
.partition(|x| x.is_primary.is_none())
};

if self
.deprecated_tables
Expand Down
39 changes: 22 additions & 17 deletions rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,22 @@ pub trait ProcessorTrait: Send + Sync + Debug {
}

/// This enum captures the configs for all the different processors that are defined.
///
/// The configs for each processor should only contain configuration specific to that
/// processor. For configuration that is common to all processors, put it in
/// IndexerGrpcProcessorConfig.
#[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)]
#[serde(tag = "type", rename_all = "snake_case")]
// What is all this strum stuff? Let me explain.
//
// Previously we had consts called NAME in each module and a function called `name` on
// the ProcessorTrait. As such it was possible for this name to not match the snake case
// representation of the struct name. By using strum we can have a single source for
// processor names derived from the enum variants themselves.
//
// That's what this strum_discriminants stuff is, it uses macro magic to generate the
// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
// generation logic, e.g. to make sure we use snake_case.
/// What is all this strum stuff? Let me explain.
///
/// Previously we had consts called NAME in each module and a function called `name` on
/// the ProcessorTrait. As such it was possible for this name to not match the snake case
/// representation of the struct name. By using strum we can have a single source for
/// processor names derived from the enum variants themselves.
///
/// That's what this strum_discriminants stuff is, it uses macro magic to generate the
/// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
/// generation logic, e.g. to make sure we use snake_case.
#[strum(serialize_all = "snake_case")]
#[strum_discriminants(
derive(
Expand Down Expand Up @@ -226,14 +227,16 @@ impl ProcessorConfig {
}
}

/// This enum contains all the processors defined in this crate. We use enum_dispatch
/// as it is more efficient than using dynamic dispatch (Box<dyn ProcessorTrait>) and
/// This enum contains all the processors defined in this crate.
///
/// We use enum_dispatch as it is more efficient than using dynamic dispatch (Box<dyn ProcessorTrait>) and
/// it enables nice safety checks like in we do in `test_processor_names_complete`.
#[enum_dispatch(ProcessorTrait)]
#[derive(Debug)]
// To ensure that the variants of ProcessorConfig and Processor line up, in the testing
///
/// // To ensure that the variants of ProcessorConfig and Processor line up, in the testing
// build path we derive EnumDiscriminants on this enum as well and make sure the two
// sets of variants match up in `test_processor_names_complete`.
#[enum_dispatch(ProcessorTrait)]
#[derive(Debug)]
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
Expand Down Expand Up @@ -271,8 +274,10 @@ mod test {
use strum::VariantNames;

/// This test exists to make sure that when a new processor is added, it is added
/// to both Processor and ProcessorConfig. To make sure this passes, make sure the
/// variants are in the same order (lexicographical) and the names match.
/// to both Processor and ProcessorConfig.
///
/// To make sure this passes, make sure the variants are in the same order
/// (lexicographical) and the names match.
#[test]
fn test_processor_names_complete() {
assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS);
Expand Down
13 changes: 7 additions & 6 deletions rust/processor/src/utils/database.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

//! Database-related functions
#![allow(clippy::extra_unused_lifetimes)]

use crate::utils::util::remove_null_bytes;
use ahash::AHashMap;
use diesel::{
Expand Down Expand Up @@ -33,15 +30,18 @@ pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("src/db/postgres/mi
pub const DEFAULT_MAX_POOL_SIZE: u32 = 150;

#[derive(QueryId)]
/// Using this will append a where clause at the end of the string upsert function, e.g.
/// Using this will append a where clause at the end of the string upsert function
///
/// e.g.
/// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version"
/// This is needed when we want to maintain a table with only the latest state
pub struct UpsertFilterLatestTransactionQuery<T> {
query: T,
where_clause: Option<&'static str>,
}

// the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit
/// the max is actually u16::MAX but we see that when the size is too big we get
/// an overflow error so reducing it a bit
pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize;

/// This function will clean the data for postgres. Currently it has support for removing
Expand Down Expand Up @@ -190,7 +190,8 @@ where
res
}

/// Returns the entry for the config hashmap, or the default field count for the insert
/// Returns the entry for the config hashmap, or the default field count for the insert.
///
/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX),
/// we default to chunk an array of items based on how many columns are in the table.
pub fn get_config_table_chunk_size<T: field_count::FieldCount>(
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ bitflags! {
const FUNGIBLE_ASSET_BALANCES = 1 << 6;
const CURRENT_FUNGIBLE_ASSET_BALANCES = 1 << 7;
const COIN_SUPPLY = 1 << 8;
const CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES = 1 << 24;

// Objects
const OBJECTS = 1 << 9;
Expand Down
1 change: 1 addition & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::processors::events_processor::EventsProcessorConfig;
use serde::{Deserialize, Serialize};

/// This enum captures the configs for all the different processors that are defined.
///
/// The configs for each processor should only contain configuration specific to that
/// processor. For configuration that is common to all processors, put it in
/// IndexerGrpcProcessorConfig.
Expand Down
7 changes: 5 additions & 2 deletions rust/sdk-processor/src/utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ pub const MIGRATIONS: EmbeddedMigrations =
pub const DEFAULT_MAX_POOL_SIZE: u32 = 150;

#[derive(QueryId)]
/// Using this will append a where clause at the end of the string upsert function, e.g.
/// Using this will append a where clause at the end of the string upsert function
///
/// e.g.
/// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version"
/// This is needed when we want to maintain a table with only the latest state
pub struct UpsertFilterLatestTransactionQuery<T> {
Expand Down Expand Up @@ -191,7 +193,8 @@ where
res
}

/// Returns the entry for the config hashmap, or the default field count for the insert
/// Returns the entry for the config hashmap, or the default field count for the insert.
///
/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX),
/// we default to chunk an array of items based on how many columns are in the table.
pub fn get_config_table_chunk_size<T: field_count::FieldCount>(
Expand Down
Loading